您的位置:首页 > 其它

从spark streaming checkpoint文件中还原数据

2017-06-27 10:20 393 查看
checkpoint的最大的弊端在于,一旦你的流式程序代码或配置改变了,或者更新迭代新功能了,这个时候,你先停旧的sparkstreaming程序,然后新的程序打包编译后执行运行,会发现两种情况:

(1)启动报错,反序列化异常
(2)启动正常,但是运行的代码仍然是上一次的程序的代码。

如果直接把上次的checkpoint删除了,可以启动的新的程序,但是如果使用的是有状态计算(updateStateByKey),那么中间的状态数据就行丢失。

有人说删除checkpoint开头的的文件,保留数据文件,如下:

hadoop fs -rm /spark/check_point/应用名/checkpoint* 

经测试,新程序确实能启动,但新程序也不会读取上次的数据文件,而是从新开始计算。还是会丢中间数据。

能不能自定义读取上次的数据文件呢,查看数据文件的格式,是可以的。代码片断如下:

  private def objectFileKryo[T](path: String, sc: SparkConf)(implicit ct: ClassTag[T]) = {

    val kryoSerializer = new KryoSerializer(sc)

    val hadoopConf = new org.apache.hadoop.conf.Configuration()

    val hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

    val paths = getPath(hdfs, new org.apache.hadoop.fs.Path(path))

    val d = paths.flatMap { p =>

      {

        val r = hdfs.open(p)

        var by = ArrayBuffer[Byte]()

        while (r.available() > 0) {

          val b = r.readByte()

          by += (b)

        }

        val kryo = kryoSerializer.newKryo()

        val input = new Input()

        input.setBuffer(by.toArray)

        val array = ArrayBuffer[T]()

        while (input.available() > 0) {

          val data = kryo.readClassAndObject(input)

          val dataObject = data.asInstanceOf[T]

          array += dataObject

        }

        array

      }

    }

    d

  }

可以将读取的数据文件转化为对象,不过有个问题,读取的数据有重复的,

查看数据的时间,是不同的时间的状态数据,我们只取最新的就可以了。

val cacheState = new java.util.HashMap[String, StateBean]()

val hdfsData = objectFileKryo[(String, StateBean)](), sparkConf)

            hdfsData.foreach(f => {

                val cache = cacheState.get(f._1)

                if (cache == null) {

                  cacheState.put(f._1, f._2.cache)

                } else if (cache.time < f._2.cache.time) {

                  cacheState.put(f._1, f._2.cache)

                }

            })
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark