您的位置:首页 > 其它

Learningspark10--Spark流

2015-09-01 17:14 211 查看
spark提供了一个DStreams抽象。它是随时间到达的一列RDD。DStream可以由很多输入源如:Flume,Kafka,HDFS创建。他提供了两种操作:transformations产生新的DStream, output写数据到外部系统。除了RDD的操作外,还有sliding windows操作。

不像批处理,spark流应用需要额外的设置以操作24/7。为此spark流提供了主要的机制 checkpointing,它使spark流存储数据到可靠文件系统(例如HDFS)。

首先考虑一个

小例子

接收7777端口的文本数据,过滤包含单词error的行并打印。

对于maven 需要引入:

groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 1.2.0


Example 10-2. Scala streaming imports

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
Example 10-3. Java streaming imports
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;


scala程序代码:

// Create a StreamingContext with a 1-second batch size from a SparkConf
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream using data received after connecting to port 7777 on the
// local machine
val lines = ssc.socketTextStream("localhost", 7777)
// Filter our DStream for lines with "error"
val errorLines = lines.filter(_.contains("error"))
// Print out the lines with errors
errorLines.print()


java代码

// Create a StreamingContext with a 1-second batch size from a SparkConf
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream from all the input on port 7777
JavaDStream<String> lines = jssc.socketTextStream("localhost", 7777);
// Filter our DStream for lines with "error"
JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("error");
}});
// Print out the lines with errors
errorLines.print();


必须调用StreamingContext的start()方法开始接收数据,SparkStreaming将在SparkContext上调度Spark Jobs。这在一个分开的线程上,因此需要调用awaitTermination方法等待流计算结束。

scala

// Start our streaming context and wait for it to "finish"
ssc.start()
// Wait for the job to finish
ssc.awaitTermination()


java

// Start our streaming context and wait for it to "finish"
jssc.start();
// Wait for the job to finish
jssc.awaitTermination();


启动程序:

$ spark-submit --class com.oreilly.learningsparkexamples.scala.StreamingLogInput \
$ASSEMBLY_JAR local[4]
$ nc localhost 7777
<your input here>
# Lets you type input lines to send to the server


架构和抽象

spark流使用微批架构,流计算被视为小批数据上连续的批计算。规则时间区间内的数据组成新的批。每一个批形成一个RDD。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: