您的位置:首页 > 其它

Spark-StructStreaming-计算结果写入到文本文件

2018-01-05 13:40 344 查看

前言

主要讲述 StructStreaming将计算结构写入到文本文件

正文

package org.sun.IndustryBigDataAnalyticsPartform
import org.apache.spark.sql.SparkSession
import java.io._
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.Row
object DataSetTest{

def main(args:Array[String]){
val kafkaservers=args(0) //kafka客户端host地址
val topic=args(1) //topic的名称
val outputfile=args(2) //输出的文本名字
val spark=SparkSession
.builder
.appName("testWriteResultToText")
.master("local")
.getOrCreate()

import spark.implicits._
val lines = spark
.readStream
.format("kafka")    .option("kafka.bootstrap.servers",kafkaservers)
.option("subscribe", topic)
.load()
.selectExpr("cast(topic as String) ","cast(key as String)","CAST(value AS STRING)")
.as[(String,String,String)]
lines.createTempView("Originalkafka")
import spark.sql
val count=sql("select count(*) from Originalkafka group by value")
val query =count.writeStream
.outputMode("complete")
.foreach(new ForeachWriter[Row]{
var fos:FileOutputStream=null
def open(partitionId:Long,version:Long):Boolean={
try{
fos=new FileOutputStream(outputfile);
true
}
catch{
case e:Exception =>false
}
}
def process(record:Row):Unit={
fos.write(record.mkString.getBytes)
}
def close(errorOrNull:Throwable):Unit={
fos.close
}
})
.queryName("test")
.format("foreach")
.start()
query.awaitTermination()
}
}


注意:

1.
.format("foreach")
.foreach(new ForeachWriter[Row]{}
匹配,如果写的是
complete
的,就没有必要。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark