从spark streaming checkpoint文件中还原数据
2017-06-27 10:20
393 查看
checkpoint的最大的弊端在于,一旦你的流式程序代码或配置改变了,或者更新迭代新功能了,这个时候,你先停旧的sparkstreaming程序,然后新的程序打包编译后执行运行,会发现两种情况:
(1)启动报错,反序列化异常
(2)启动正常,但是运行的代码仍然是上一次的程序的代码。
如果直接把上次的checkpoint删除了,可以启动的新的程序,但是如果使用的是有状态计算(updateStateByKey),那么中间的状态数据就行丢失。
有人说删除checkpoint开头的的文件,保留数据文件,如下:
hadoop fs -rm /spark/check_point/应用名/checkpoint*
经测试,新程序确实能启动,但新程序也不会读取上次的数据文件,而是从新开始计算。还是会丢中间数据。
能不能自定义读取上次的数据文件呢,查看数据文件的格式,是可以的。代码片断如下:
private def objectFileKryo[T](path: String, sc: SparkConf)(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc)
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val paths = getPath(hdfs, new org.apache.hadoop.fs.Path(path))
val d = paths.flatMap { p =>
{
val r = hdfs.open(p)
var by = ArrayBuffer[Byte]()
while (r.available() > 0) {
val b = r.readByte()
by += (b)
}
val kryo = kryoSerializer.newKryo()
val input = new Input()
input.setBuffer(by.toArray)
val array = ArrayBuffer[T]()
while (input.available() > 0) {
val data = kryo.readClassAndObject(input)
val dataObject = data.asInstanceOf[T]
array += dataObject
}
array
}
}
d
}
可以将读取的数据文件转化为对象,不过有个问题,读取的数据有重复的,
查看数据的时间,是不同的时间的状态数据,我们只取最新的就可以了。
val cacheState = new java.util.HashMap[String, StateBean]()
val hdfsData = objectFileKryo[(String, StateBean)](), sparkConf)
hdfsData.foreach(f => {
val cache = cacheState.get(f._1)
if (cache == null) {
cacheState.put(f._1, f._2.cache)
} else if (cache.time < f._2.cache.time) {
cacheState.put(f._1, f._2.cache)
}
})
(1)启动报错,反序列化异常
(2)启动正常,但是运行的代码仍然是上一次的程序的代码。
如果直接把上次的checkpoint删除了,可以启动的新的程序,但是如果使用的是有状态计算(updateStateByKey),那么中间的状态数据就行丢失。
有人说删除checkpoint开头的的文件,保留数据文件,如下:
hadoop fs -rm /spark/check_point/应用名/checkpoint*
经测试,新程序确实能启动,但新程序也不会读取上次的数据文件,而是从新开始计算。还是会丢中间数据。
能不能自定义读取上次的数据文件呢,查看数据文件的格式,是可以的。代码片断如下:
private def objectFileKryo[T](path: String, sc: SparkConf)(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc)
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val paths = getPath(hdfs, new org.apache.hadoop.fs.Path(path))
val d = paths.flatMap { p =>
{
val r = hdfs.open(p)
var by = ArrayBuffer[Byte]()
while (r.available() > 0) {
val b = r.readByte()
by += (b)
}
val kryo = kryoSerializer.newKryo()
val input = new Input()
input.setBuffer(by.toArray)
val array = ArrayBuffer[T]()
while (input.available() > 0) {
val data = kryo.readClassAndObject(input)
val dataObject = data.asInstanceOf[T]
array += dataObject
}
array
}
}
d
}
可以将读取的数据文件转化为对象,不过有个问题,读取的数据有重复的,
查看数据的时间,是不同的时间的状态数据,我们只取最新的就可以了。
val cacheState = new java.util.HashMap[String, StateBean]()
val hdfsData = objectFileKryo[(String, StateBean)](), sparkConf)
hdfsData.foreach(f => {
val cache = cacheState.get(f._1)
if (cache == null) {
cacheState.put(f._1, f._2.cache)
} else if (cache.time < f._2.cache.time) {
cacheState.put(f._1, f._2.cache)
}
})
相关文章推荐
- 大数据Spark“蘑菇云”项目实战第63课: 广告点击系统高可用性和性能优化 checkpoint wal driver高可用 并行度配置
- Spark Streaming Checkpoint反序列化问题分析
- Spark Streaming checkpoint 实现状态的恢复实现
- SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件
- Spark Streaming metadata checkpoint
- spark streaming checkpoint
- Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果
- ALTER SYSTEM CHECKPOINT 影响数据文件的检查点
- Spark Streaming metadata checkpoint
- 在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉的解决方案
- C# 对sharepoint 列表的一些基本操作,包括添加/删除/查询/上传文件给sharepoint list添加数据
- mysql用.psc文件还原后没有数据
- RMAN备份与恢复(5)——将数据文件或表空间还原到新位置!
- 在现有数据库上还原时的数据文件处理示例.sql
- mysql 把备份数据文件还原后乱码处理方法
- C# 对sharepoint 列表的一些基本操作,包括添加/删除/查询/上传文件给sharepoint list添加数据
- 如何从单一的bak文件还原数据库的结果和数据
- sql命令还原数据库BAK备份文件到现有数据中
- PostgreSQL启动过程中的那些事十六:启动进程三:CheckPointGuts刷出共享内存里所有数据
- SQL2005无类型数据文件还原数据库