Learningspark10--Spark流
2015-09-01 17:14
211 查看
spark提供了一个DStreams抽象。它是随时间到达的一列RDD。DStream可以由很多输入源如:Flume,Kafka,HDFS创建。他提供了两种操作:transformations产生新的DStream, output写数据到外部系统。除了RDD的操作外,还有sliding windows操作。
不像批处理,spark流应用需要额外的设置以操作24/7。为此spark流提供了主要的机制 checkpointing,它使spark流存储数据到可靠文件系统(例如HDFS)。
首先考虑一个
对于maven 需要引入:
scala程序代码:
java代码
必须调用StreamingContext的start()方法开始接收数据,SparkStreaming将在SparkContext上调度Spark Jobs。这在一个分开的线程上,因此需要调用awaitTermination方法等待流计算结束。
scala
java
启动程序:
不像批处理,spark流应用需要额外的设置以操作24/7。为此spark流提供了主要的机制 checkpointing,它使spark流存储数据到可靠文件系统(例如HDFS)。
首先考虑一个
小例子
接收7777端口的文本数据,过滤包含单词error的行并打印。对于maven 需要引入:
groupId = org.apache.spark artifactId = spark-streaming_2.10 version = 1.2.0
Example 10-2. Scala streaming imports import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds Example 10-3. Java streaming imports import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations;
scala程序代码:
// Create a StreamingContext with a 1-second batch size from a SparkConf val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream using data received after connecting to port 7777 on the // local machine val lines = ssc.socketTextStream("localhost", 7777) // Filter our DStream for lines with "error" val errorLines = lines.filter(_.contains("error")) // Print out the lines with errors errorLines.print()
java代码
// Create a StreamingContext with a 1-second batch size from a SparkConf JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); // Create a DStream from all the input on port 7777 JavaDStream<String> lines = jssc.socketTextStream("localhost", 7777); // Filter our DStream for lines with "error" JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() { public Boolean call(String line) { return line.contains("error"); }}); // Print out the lines with errors errorLines.print();
必须调用StreamingContext的start()方法开始接收数据,SparkStreaming将在SparkContext上调度Spark Jobs。这在一个分开的线程上,因此需要调用awaitTermination方法等待流计算结束。
scala
// Start our streaming context and wait for it to "finish" ssc.start() // Wait for the job to finish ssc.awaitTermination()
java
// Start our streaming context and wait for it to "finish" jssc.start(); // Wait for the job to finish jssc.awaitTermination();
启动程序:
$ spark-submit --class com.oreilly.learningsparkexamples.scala.StreamingLogInput \ $ASSEMBLY_JAR local[4] $ nc localhost 7777 <your input here> # Lets you type input lines to send to the server
架构和抽象
spark流使用微批架构,流计算被视为小批数据上连续的批计算。规则时间区间内的数据组成新的批。每一个批形成一个RDD。相关文章推荐
- 胜负猜想
- 19.UIDatePicker日期选择器
- 20.友盟社会化分享
- Azure Automation (3) 定期将某个Azure订阅下的所有虚拟机开关机
- 【面试题】-100盏灯
- 浏览器禁用后退
- ios原生二维码扫描
- mongo-kafka: mongodb 数据到kafka的实时传输
- Quartz2d研究笔记
- Javascript模块化编程(一):模块的写法
- [工作笔记之三] 通过adb查看android手机CPU和内存占用情况
- 17.setValue和setObject的区别
- 18.NSUserDefaults支持类型
- 从经典的MVC模式到Web三层结构
- CSS中的行为(css中使用js文件)——expression
- QQ互联OAuth
- Office 转 PDF & PDF 转 SWF Windows版
- /etc/profile /etc/profile .bash_profile .bashrc解释
- OSAtomic原子操作
- 使用Sonatype Nexus搭建Maven私服后如何添加第三方JAR包?