Spark-StructStreaming-计算结果写入到文本文件
2018-01-05 13:40
344 查看
前言
主要讲述 StructStreaming将计算结构写入到文本文件正文
package org.sun.IndustryBigDataAnalyticsPartform import org.apache.spark.sql.SparkSession import java.io._ import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.Row object DataSetTest{ def main(args:Array[String]){ val kafkaservers=args(0) //kafka客户端host地址 val topic=args(1) //topic的名称 val outputfile=args(2) //输出的文本名字 val spark=SparkSession .builder .appName("testWriteResultToText") .master("local") .getOrCreate() import spark.implicits._ val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers",kafkaservers) .option("subscribe", topic) .load() .selectExpr("cast(topic as String) ","cast(key as String)","CAST(value AS STRING)") .as[(String,String,String)] lines.createTempView("Originalkafka") import spark.sql val count=sql("select count(*) from Originalkafka group by value") val query =count.writeStream .outputMode("complete") .foreach(new ForeachWriter[Row]{ var fos:FileOutputStream=null def open(partitionId:Long,version:Long):Boolean={ try{ fos=new FileOutputStream(outputfile); true } catch{ case e:Exception =>false } } def process(record:Row):Unit={ fos.write(record.mkString.getBytes) } def close(errorOrNull:Throwable):Unit={ fos.close } }) .queryName("test") .format("foreach") .start() query.awaitTermination() } }
注意:
1.
.format("foreach")与
.foreach(new ForeachWriter[Row]{}匹配,如果写的是
complete的,就没有必要。
相关文章推荐
- spark-structstreaming-结果数据存入hbase
- spark将计算结果写入到hdfs的两种方法
- kafka + spark streaming 实时读取计算 nginx 日志,存储结果到 mongodb/mysql
- Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果
- spark将计算结果写入到mysql中
- Spark将计算结果写入到Mysql中
- Spark将计算结果写入到Mysql中
- Spark Streaming之使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
- Spark将计算结果写入到Mysql中
- 第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码
- 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中
- Spark Streaming实时计算框架介绍
- 第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码
- Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码
- Spark Streaming实时计算框架介绍
- Spark入门实战系列--7.Spark Streaming(下)--实时流计算Spark Streaming实战
- 大数据IMF传奇行动绝密课程第118课:Spark Streaming性能优化:如何获得和持续使用足够的集群计算资源
- Spark Streaming场景应用- Spark Streaming计算模型及监控
- 实时流计算Spark Streaming原理介绍
- Spark Streaming实时计算框架介绍