第八十三讲 使用Scala实战Spark Streaming开发
2016-05-25 00:00
513 查看
摘要: Spark学习
代码及注意要点如下:
代码及注意要点如下:
package IMF.streaming /** * Created by Bindy on 16-5-24. * * @author DT大数据梦工厂-学员 */ object s83WordCountOnline { def main(args: Array[String]) { import org.apache.spark._ import org.apache.spark.streaming._ /** * 第一步:配置Sparkconf: * 1、至少有2条线程,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条 * 线程用于处理接收的数据(否则的话,无法有线程用于处理数据,随着时间的推移,内存和磁盘都会不堪重负。); * 2、对于集群而言,每个Executor一般肯定不止一个线程,对于处理Spark Streaming的应用程序而言,每个Executor一般 * 分配多少core比较合适?根据经验,5个左右的core是最佳的。 */ val conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnline") /** * 第二步:创建SparkStreamingContext * 1、这个是SparkStreaming一切的开始; * 2、状态恢复:可以基于SparkConf参数,也可以基于持久化的SparkStreamingContext进行状态恢复。典型的场景是Driver崩溃后 * 由于SparkStreaming具有连续不断的24小时不间断的运行,所以需要在Driver重新启动后从上次运行的状态恢复过来,此时的状态需要 * 基于曾经的CheckPoint。 */ val ssc = new StreamingContext(conf, Seconds(3)) /** * 第三步:创建Spark Streaming输入数据来源input stream: * 1,数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等; * 2,在这里我们指定数据来源与网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口的数据,当然,该端口 * 服务首先必须存在并且在后续会根据业务需要不断的有数据产生; * 3,如果经常在每间隔5秒钟没有数据的话不断的启动空的Job其实会造成调度资源的浪费,没有数据需要计算,所以企业级生产环境的代码在 * 具体提交Job前会判断是否有数据,如果没有,就不再提交Job。 */ val lines = ssc.socketTextStream("localhost", 9999) /** * 第四步:接下来就像对于RDD编程一样基于DStream进行编程!!!DStream是RDD产生的模板或者说是类。在Spark Streaming发生 * 计算前,其实质是把每个Batch的DStream的操作翻译成对RDD的操作。 */ val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) /** * 此处的print并不会直接触发Job的执行,因为现在的一切都是在Spark Streaming框架控制下,具体要触发,对于Spark Streaming * 而言,一般是基于设置的时间间隔Duration。Spark Streaming应用程序要想执行具体的Job,对DSteam就必须有output Stream * 操作,output Stream有很多类型的函数触发,例如print,saveAsTextFile,saveAsHadoopFiles等,最为重要的是foreachRDD, * 因为处理的结果一般都会放在Redis,DB,DashBoard等,foreachRDD主要就是完成这些功能的,而且可以随意的自定义具体数据放在哪里。 * */ wordCounts.print() /** * Spark Streaming执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的。当然,其内部有消息循环体用于 * 接收应用程序本身或者Executor中的消息。 */ ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Windows下Scala环境搭建
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- Windows7下安装Scala 2.9.2教程
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- Scala代码实现列出Hadoop 文件夹下面的所有文件
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- sbt创建web项目
- 使用java代码提交Spark的hive sql任务,run as java application
- XML 文件解析--含Unicode字符的XML文件
- Scala 学习随笔
- Scala 小程序记录(学习期间的代码片段)
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- wamp2.5 32b 升级php 到 php7