您的位置:首页 > 编程语言

<译>Spark Sreaming 编程指南

2016-01-08 13:01 411 查看

SparkStreaming编程指南

Overview

AQuickExample

BasicConcepts

Linking

InitializingStreamingContext

DiscretizedStreams(DStreams)

InputDStreamsandReceivers

TransformationsonDStreams

OutputOperationsonDStreams

DataFrameandSQLOperations

MLlibOperations

Caching/Persistence

Checkpointing

DeployingApplications

MonitoringApplications

PerformanceTuning

ReducingtheBatchProcessingTimes

SettingtheRightBatchInterval

MemoryTuning

Fault-toleranceSemantics

MigrationGuidefrom0.9.1orbelowto1.x

WheretoGofromHere

概述

SparkStreaming是Spark核心API的扩展,提供对实时数据的可扩展、高吞吐、容错的流式计算。数据可以从很多数据源摄入,比如Kafka,Flume,Twitter,ZeroMQ,Kinesis或则TCP套接字,数据可以被一些高层的带有复杂算法的方法处理,比如map、reduce、join和window.最后,被处理后的数据被输出到文件系统中、数据库中或实时仪表盘上。事实上,你可以在Spark流中使用Spark自带的graphprocessing和machinelearning。



在Spark内部,运作方式如下图。SparkSteaming接收实时的输入数据并把它们划分成batches,这些batches稍后会被SparkEngine处理生成最终的结果流。



SparkStreaming提供一个叫做离散流或DStream的高层抽象,它代表不间断的数据流。DStream既可以在来自数据源的数据流中被创建(比如Kafaka,Flume等),也可以在其它DStream中应用高层的操作。在内部,一个DStream就代表一个RDDs序列。

这篇编程指南展示了怎样开始写一个包含DStream的SparkStreaming程序。你可以使用Scala、java或Python(spark1.2引入),都会在这里展示。整篇文章中会有标签标识不同的代码片段。

注意:这里对于Python语言有几个既不相同又不适用的APIs。以下会高亮出来。

AQuickExample

在编写自己的SparkStreaming之前,我们浏览一下一个简单的SparkStreaming程序是怎样的。我们看下统计一个监听的TCP连接中text的单词个数的例子,你所要做的全部工作如下:

First,weimportthenamesoftheSparkStreamingclassesandsomeimplicitconversionsfromStreamingContextintoourenvironmentinordertoaddusefulmethodstootherclassesweneed(likeDStream).StreamingContextisthemainentrypointforallstreamingfunctionality.WecreatealocalStreamingContextwithtwoexecutionthreads,andabatchintervalof1second.

首先,我们把SparkStreaming的类和StreamingContext中的一些相关类import进来。StreamingContext是最主要的方法切入点,我们通过2个执行线程创建本地的StreamingContext和一个1秒为时间片的batch。

importorg.apache.spark.*;
importorg.apache.spark.api.java.function.*;
importorg.apache.spark.streaming.*;
importorg.apache.spark.streaming.api.java.*;
importscala.Tuple2;

//CreatealocalStreamingContextwithtwoworkingthreadandbatchintervalof1second
SparkConfconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
JavaStreamingContextjssc=newJavaStreamingContext(conf,Durations.seconds(1))


通过这些上下文,我们就创建了一个从TCP获取资源的DStream,并且已经指定了hostname(如localhost等)和端口号。

//CreateaDStreamthatwillconnecttohostname:port,likelocalhost:9999
JavaReceiverInputDStream<String>lines=jssc.socketTextStream("localhost",9999);


This
lines
DStreamrepresentsthestreamofdatathatwillbereceivedfromthedataserver.EachrecordinthisDStreamisalineoftext.Next,wewanttosplitthelinesbyspacecharactersintowords.

下面的lines表示将要从数据源接收到的数据流。这个DStream中的每一条记录就是一行text。接下来,我们想要将lines中的单词根据空格分开。

//Spliteachlineintowords
JavaDStream<String>words=lines.flatMap(
newFlatMapFunction<String,String>(){
@OverridepublicIterable<String>call(Stringx){
returnArrays.asList(x.split(""));
}
});


flatMap是一变多的DStream操作,它会将一条记录分成多个新的单词。在这个例子中,每一行将被分割成多个word,这些word组成的流就表示为wordsDStream。接下来,我们要对这些word进行累加。

//Counteachwordineachbatch
JavaPairDStream<String,Integer>pairs=words.mapToPair(
newPairFunction<String,String,Integer>(){
@OverridepublicTuple2<String,Integer>call(Strings){
returnnewTuple2<String,Integer>(s,1);
}
});
JavaPairDStream<String,Integer>wordCounts=pairs.reduceByKey(
newFunction2<Integer,Integer,Integer>(){
@OverridepublicIntegercall(Integeri1,Integeri2){
returni1+i2;
}
});

//PrintthefirsttenelementsofeachRDDgeneratedinthisDStreamtotheconsole
wordCounts.print();




The
words
DStreamisfurthermapped(one-to-onetransformation)toaDStreamof
(word,1)
pairs,whichisthenreducedtogetthefrequencyofwordsineachbatchofdata.Finally,
wordCounts.print()
willprintafewofthecountsgeneratedeverysecond.

Notethatwhentheselinesareexecuted,SparkStreamingonlysetsupthecomputationitwillperformwhenitisstarted,andnorealprocessinghasstartedyet.Tostarttheprocessingafterallthetransformationshavebeensetup,wefinallycall

ssc.start()//Startthecomputation
ssc.awaitTermination()//Waitforthecomputationtoterminate


ThecompletecodecanbefoundintheSparkStreamingexampleNetworkWordCount.

IfyouhavealreadydownloadedandbuiltSpark,youcanrunthisexampleasfollows.YouwillfirstneedtorunNetcat(asmallutilityfoundinmostUnix-likesystems)asadataserverbyusing

$nc-lk9999


Then,inadifferentterminal,youcanstarttheexamplebyusing

Scala

Java

Python

$./bin/run-examplestreaming.NetworkWordCountlocalhost9999


Then,anylinestypedintheterminalrunningthenetcatserverwillbecountedandprintedonscreeneverysecond.Itwilllooksomethinglikethefollowing.

#TERMINAL1:
#RunningNetcat

$nc-lk9999

helloworld

...


Scala

Java

Python

#TERMINAL2:RUNNINGNetworkWordCount

$./bin/run-examplestreaming.NetworkWordCountlocalhost9999
...
-------------------------------------------
Time:1357008430000ms
-------------------------------------------
(hello,1)
(world,1)
...


BasicConcepts

Next,wemovebeyondthesimpleexampleandelaborateonthebasicsofSparkStreaming.

Linking

SimilartoSpark,SparkStreamingisavailablethroughMavenCentral.TowriteyourownSparkStreamingprogram,youwillhavetoaddthefollowingdependencytoyourSBTorMavenproject.

Maven

SBT

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>


ForingestingdatafromsourceslikeKafka,Flume,andKinesisthatarenotpresentintheSparkStreamingcoreAPI,youwillhavetoaddthecorrespondingartifact
spark-streaming-xyz_2.10
tothedependencies.Forexample,someofthecommononesareasfollows.

SourceArtifact
Kafkaspark-streaming-kafka_2.10
Flumespark-streaming-flume_2.10
Kinesisspark-streaming-kinesis-asl_2.10[AmazonSoftwareLicense]
Twitterspark-streaming-twitter_2.10
ZeroMQspark-streaming-zeromq_2.10
MQTTspark-streaming-mqtt_2.10
Foranup-to-datelist,pleaserefertotheMavenrepositoryforthefulllistofsupportedsourcesandartifacts.

InitializingStreamingContext

ToinitializeaSparkStreamingprogram,aStreamingContextobjecthastobecreatedwhichisthemainentrypointofallSparkStreamingfunctionality.

Scala

Java

Python

AStreamingContextobjectcanbecreatedfromaSparkConfobject.

importorg.apache.spark._
importorg.apache.spark.streaming._

valconf=newSparkConf().setAppName(appName).setMaster(master)
valssc=newStreamingContext(conf,Seconds(1))


The
appName
parameterisanameforyourapplicationtoshowontheclusterUI.
master
isaSpark,MesosorYARNclusterURL,oraspecial“local[*]”stringtoruninlocalmode.Inpractice,whenrunningonacluster,youwillnotwanttohardcode
master
intheprogram,butratherlaunchtheapplicationwith
spark-submit
andreceiveitthere.However,forlocaltestingandunittests,youcanpass“local[*]”torunSparkStreamingin-process(detectsthenumberofcoresinthelocalsystem).NotethatthisinternallycreatesaSparkContext(startingpointofallSparkfunctionality)whichcanbeaccessedas
ssc.sparkContext
.

Thebatchintervalmustbesetbasedonthelatencyrequirementsofyourapplicationandavailableclusterresources.SeethePerformanceTuningsectionformoredetails.

A
StreamingContext
objectcanalsobecreatedfromanexisting
SparkContext
object.

importorg.apache.spark.streaming._

valsc=...//existingSparkContext
valssc=newStreamingContext(sc,Seconds(1))


Afteracontextisdefined,youhavetodothefollowing.

DefinetheinputsourcesbycreatinginputDStreams.

DefinethestreamingcomputationsbyapplyingtransformationandoutputoperationstoDStreams.

Startreceivingdataandprocessingitusing
streamingContext.start()
.

Waitfortheprocessingtobestopped(manuallyorduetoanyerror)using
streamingContext.awaitTermination()
.

Theprocessingcanbemanuallystoppedusing
streamingContext.stop()
.

Pointstoremember:

Onceacontexthasbeenstarted,nonewstreamingcomputationscanbesetuporaddedtoit.

Onceacontexthasbeenstopped,itcannotberestarted.

OnlyoneStreamingContextcanbeactiveinaJVMatthesametime.

stop()onStreamingContextalsostopstheSparkContext.TostoponlytheStreamingContext,settheoptionalparameterof
stop()
called
stopSparkContext
tofalse.

ASparkContextcanbere-usedtocreatemultipleStreamingContexts,aslongasthepreviousStreamingContextisstopped(withoutstoppingtheSparkContext)beforethenextStreamingContextiscreated.

离散流(DStreams)

离散流(也叫DStream)是SparkStreaming提供的最基本的抽象。它代表一个持续的数据流,无论是刚从数据源接收到的,或者是输入流被处理过后生成的新数据流。在内部,DStream代表着一连串的持续的RDD,如下图所示,每一个DStream里的RDD都包含着一定时间间隔的数据。



任何使用在DStream上的操作在底层都被转换为对RDD的操作。比如,在最早的wordcount例子中,将流中的句子转换为单词,flatmap方法就是使用在linesDStream上去生成wordsDStream的RDD。如下图所示



这些底层的RDD操作由Sparkengine来执行。对DStream的操作隐藏了许多细节,提供给开发者一个方便的高层API。这些操作都将在后面详细讨论

InputDStreams和Receivers

InputDStream代表刚从数据源获得的输入数据流。在quikexample中,lines就是InputDStream,它代表着从netcat服务器接收到的数据流。每一个输入流(除了fileStream,将在后面讨论)都关联着一个Receiver,它从数据源接收数据并存储到Spark内存中供消费。

SparkStreaming提供两种内置的streaming源

基本源:直接在sreamingContextAPI中可以使用的。例如:文件系统、socke连接、Akka

Advancedsources:源头如Kafka,Flume,Kinesis,Twitter等等,这些需要依赖其他包,linking

接下来会详细讨论

注意,如果你要在你的应用中并行地接收多个数据流,你可以创建多个InputDStream,这也将会创建多个receivers来接收多个流数据。但是注意Sparkworker/executor是一个长时间运行的task,因此它会占用分配给Spark应用的核数中的一个。所以,要确保SparkStreaming应用有足够的核数(或者线程,如果本地运行)来处理已经接收到的数据,同时也要保证有足够的核数来运行receiver(s)。

需要记住的点

本地运行时,不要使用"local"或"local[1]"作为masterURL。它们意味着只使用一个线程来跑task。如果使用了带有recevier的InputDStream(如sockets,Kafka,Flume等),那么仅有的这个线程就会去运行recevier,就没有线程来处理已经接收的数据了。因此,如果要本地运行,通常使用"local
",n>recevier的数量。(怎样设置masterURL,参见SparkProperties)

同理,如果在集群上运行,分配的cores应该大于recevier。

BasicSources

在之前的例子中,我们已经知道了ssc.socketTextStream(...)通过TCPsocket连接接收text数据来创建DStream,除此之外,StreamingContextAPI提供从文件系统、Akkaactors输入源来创建DStream的方法

文件流:从任何与HDFSAPI兼容的文件系统(如HDFS,S3,NFS等)的文件中读取数据,创建DStream:

    

streamingContext.fileStream<KeyClass,ValueClass,InputFormatClass>(dataDirectory);


  SparkStreaming会监控给出的目录dataDirectory并处理在此目录中创建的任何文件(不支持嵌套目录)。注意:

文件必须有相同的数据格式

文件必须通过原子操作移动到此目录或重命名

一旦完成移动,文件不能被改动。所以如果文件中的数据是连续增加的,新增的数据不会被读取。

对于简单的文件,有一个更简单的方法可以被使用:streamingContext.textFileStream(dataDirectory)。并且文件流不需要recevier,所以不需要分配多余的cores。

fileStream在PythonAPI中不适用,只有textFileStream可用

StreamsbasedonCustomActors:来从Akkaactors接收数据创建DStream,详情参见CustomReceiverGuide

PythonAPI:目前为止
actorStream
只适用于java和scalaAPI,还不适用于PythonAPI。

QueueofRDDsasaStream:为了使用测试数据测试SparkStreaming应用,也可以通过一个RDDs队列来创建DStream:streamingContext.queueStream(queueOfRDDs)。每一个队列里的RDD将会被当做一个数据batch来对待,像处理stream一样。

更多的关于socket,files,和actors的详情,参看API文档里的有关方法:scalaStreamingContext,JavaJavaStreamingContext,PythonStreamingContext.

AdvancedSources

Spark1.6.0中,Kafka,Kinesis,FlumeandMQTT在PythonAPI中都有用。

Thiscategoryofsourcesrequireinterfacingwithexternalnon-Sparklibraries,someofthemwithcomplexdependencies(e.g.,KafkaandFlume).Hence,tominimizeissuesrelatedtoversionconflictsofdependencies,thefunctionalitytocreateDStreamsfromthesesourceshasbeenmovedtoseparatelibrariesthatcanbelinkedtoexplicitlywhennecessary.Forexample,ifyouwanttocreateaDStreamusingdatafromTwitter’sstreamoftweets,youhavetodothefollowing:

这一类的source中需要扩展的非Spark类库,它们其中一些有着很复杂的依赖(像Kafka和Flume).因此,为了减小依赖间的版本冲突带来的问题,这些source被设计成分离的API,需要时再引入。举个例子,如果你想创建一个Twitter流,你将要以下步骤:

Linking:添加spark-streaming-twitter_2.10到SBT/Maven依赖中

Programming:导入TwtterUtils类并通过TwitterUtils.createStream创建DStream。

Deploying:生成一个包含有所有依赖的JAR,并部署这个JAR.详情见Deployingsection.

importorg.apache.spark.streaming.twitter.*;

TwitterUtils.createStream(jssc);


注意这些advancedsources在SparkShell,因此不能再Shell里测试.如果确实想在Shell里面使用,你需要下载相应的Maven依赖,并添加入classpath.

一些advancedsources如下:

Kafka:SparkStreaming1.6.0iscompatiblewithKafka0.8.2.1.SeetheKafkaIntegrationGuideformoredetails.

Flume:SparkStreaming1.6.0iscompatiblewithFlume1.6.0.SeetheFlumeIntegrationGuideformoredetails.

Kinesis:SparkStreaming1.6.0iscompatiblewithKinesisClientLibrary1.2.1.SeetheKinesisIntegrationGuideformoredetails.

Twitter:SparkStreaming’sTwitterUtilsusesTwitter4jtogetthepublicstreamoftweetsusingTwitter’sStreamingAPI.AuthenticationinformationcanbeprovidedbyanyofthemethodssupportedbyTwitter4Jlibrary.Youcaneithergetthepublicstream,orgetthefilteredstreambasedonakeywords.SeetheAPIdocumentation(Scala,Java)andexamples(TwitterPopularTagsandTwitterAlgebirdCMS).

CustomSources

PythonAPI目前为止暂时还不支持

InputDStreams也能被第三方数据源创建。你所要做的就是实现一个用户自定义的recevier来从custormsource接收数据,然后将数据推送给Spark。详情请参见CustomReceiverGuide

Receiver的可靠性

根据可靠性可将数据源分为两类。支持数据ack的数据源(比如Kafka和Flume)如果系统从这些可靠的数据源收到数据并正常ack,这就能保证没有数据回应为任何错误而丢失。以下是两类数据源:

可靠Receiver:可靠的recevier正确接到数据并存储到Spark里后,向数据源发送ack信息

不可靠Receiver:一个不可靠的recevier不发送ack信息给数据源。这可以用在不支持ack的数据源中,或者是使用可靠数据源但不想使用ack时。

关于如何写可靠的recevier,见CustomReceiverGuide.

TransformationsonDStreams

和RDDs一样,transformation允许输入数据中的数据被改动。DStreams在普通的SparkRDD上支持不少transformation。它们的其中一些如下:

TransformationMeaning
map(func)源DStream中的每一个元素在通过方法func的处理后返回一个新的DStream

flatMap(func)和map类似,但是每一个输入目标可能对应着0个或多个输出目标

filter(func)返回使func返回true的DStream,过滤器

repartition(numPartitions)通过创建更多或更少的分区来改变DStream的并行度

union(otherStream)返回一个包含源DStream和其它DStream的元素的集合的新的DStream

count()返回一个每个RDD只包含一个元素的单元素RDDDStream,这个RDD中的元素就是源DStream中每个RDD中元素个数的统计值

reduce(func)返回一个单元素RDDDStream,每一个RDD的元素就是源DStream中的每一个RDD的元素通过func方法聚合得到的。这个方法应该被设计成associative的,以便利于并行计算。

countByValue()当在元素类型为K的DStream上使用时,就会返回一个包含(K,Long)对的DStream,值就是源DStream中每个RDD中每个键出现的频率

reduceByKey(func,[numTasks])在含有(K,V)对的DStream上调用,返回每个K对应的V通过给定函数聚合之后的键值对DStream。注意:默认情况下,使用Spark默认的并行task数(本地情况默认为2,集群情况下通过配置文件属性spark.default.parallelism来配置)来进行分组。你可以通过numTask参数来设置不同的tasks数

join(otherStream,[numTasks])在包含(K,V)和(K,W)对的两条DStream上使用此方法,将返回包含(K,(V,W))对的DStream

cogroup(otherStream,[numTasks])在包含(K,V)和(K,W)对的DStream上调用,将会返回一个包含(K,Seq[V],Seq[W])的tuples

transform(func)使用func方法处理源DStream的每一个RDD,每个RDD生成一个新的RDD。这可以被用来处理DStream的任意RDD

updateStateByKey(func)通过给定的方法func作用于先前的状态和每个键对应的新值来改变DStream中每个键的状态,返回一个带有新状态的DSream。这能被用做维持每个键任意状态数据。

Afewofthesetransformationsareworthdiscussinginmoredetail.

UpdateStateByKeyOperation

The
updateStateByKey
operationallowsyoutomaintainarbitrarystatewhilecontinuouslyupdatingitwithnewinformation.Tousethis,youwillhavetodotwosteps.

Definethestate-Thestatecanbeanarbitrarydatatype.

Definethestateupdatefunction-Specifywithafunctionhowtoupdatethestateusingthepreviousstateandthenewvaluesfromaninputstream.

Ineverybatch,Sparkwillapplythestateupdatefunctionforallexistingkeys,regardlessofwhethertheyhavenewdatainabatchornot.Iftheupdatefunctionreturns
None
thenthekey-valuepairwillbeeliminated.

Let’sillustratethiswithanexample.Sayyouwanttomaintainarunningcountofeachwordseeninatextdatastream.Here,therunningcountisthestateanditisaninteger.Wedefinetheupdatefunctionas:

Scala

Java

Python

defupdateFunction(newValues:Seq[Int],runningCount:Option[Int]):Option[Int]={
valnewCount=...//addthenewvalueswiththepreviousrunningcounttogetthenewcount
Some(newCount)
}


ThisisappliedonaDStreamcontainingwords(say,the
pairs
DStreamcontaining
(word,1)
pairsintheearlierexample).

valrunningCounts=pairs.updateStateByKey[Int](updateFunction_)


Theupdatefunctionwillbecalledforeachword,with
newValues
havingasequenceof1’s(fromthe
(word,1)
pairs)andthe
runningCount
havingthepreviouscount.ForthecompleteScalacode,takealookattheexampleStatefulNetworkWordCount.scala.

Notethatusing
updateStateByKey
requiresthecheckpointdirectorytobeconfigured,whichisdiscussedindetailinthecheckpointingsection.

TransformOperation

The
transform
operation(alongwithitsvariationslike
transformWith
)allowsarbitraryRDD-to-RDDfunctionstobeappliedonaDStream.ItcanbeusedtoapplyanyRDDoperationthatisnotexposedintheDStreamAPI.Forexample,thefunctionalityofjoiningeverybatchinadatastreamwithanotherdatasetisnotdirectlyexposedintheDStreamAPI.However,youcaneasilyuse
transform
todothis.Thisenablesverypowerfulpossibilities.Forexample,onecandoreal-timedatacleaningbyjoiningtheinputdatastreamwithprecomputedspaminformation(maybegeneratedwithSparkaswell)andthenfilteringbasedonit.

Scala

Java

Python

valspamInfoRDD=ssc.sparkContext.newAPIHadoopRDD(...)//RDDcontainingspaminformation

valcleanedDStream=wordCounts.transform(rdd=>{
rdd.join(spamInfoRDD).filter(...)//joindatastreamwithspaminformationtododatacleaning
...
})


Notethatthesuppliedfunctiongetscalledineverybatchinterval.Thisallowsyoutodotime-varyingRDDoperations,thatis,RDDoperations,numberofpartitions,broadcastvariables,etc.canbechangedbetweenbatches.

WindowOperations

SparkStreamingalsoprovideswindowedcomputations,whichallowyoutoapplytransformationsoveraslidingwindowofdata.Thefollowingfigureillustratesthisslidingwindow.



Asshowninthefigure,everytimethewindowslidesoverasourceDStream,thesourceRDDsthatfallwithinthewindowarecombinedandoperatedupontoproducetheRDDsofthewindowedDStream.Inthisspecificcase,theoperationisappliedoverthelast3timeunitsofdata,andslidesby2timeunits.Thisshowsthatanywindowoperationneedstospecifytwoparameters.

windowlength-Thedurationofthewindow(3inthefigure).

slidinginterval-Theintervalatwhichthewindowoperationisperformed(2inthefigure).

ThesetwoparametersmustbemultiplesofthebatchintervalofthesourceDStream(1inthefigure).

Let’sillustratethewindowoperationswithanexample.Say,youwanttoextendtheearlierexamplebygeneratingwordcountsoverthelast30secondsofdata,every10seconds.Todothis,wehavetoapplythe
reduceByKey
operationonthe
pairs
DStreamof
(word,1)
pairsoverthelast30secondsofdata.Thisisdoneusingtheoperation
reduceByKeyAndWindow
.

Scala

Java

Python

//Reducelast30secondsofdata,every10seconds
valwindowedWordCounts=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))


Someofthecommonwindowoperationsareasfollows.Alloftheseoperationstakethesaidtwoparameters-windowLengthandslideInterval.

TransformationMeaning
window(windowLength,slideInterval)ReturnanewDStreamwhichiscomputedbasedonwindowedbatchesofthesourceDStream.
countByWindow(windowLength,slideInterval)Returnaslidingwindowcountofelementsinthestream.
reduceByWindow(func,windowLength,slideInterval)Returnanewsingle-elementstream,createdbyaggregatingelementsinthestreamoveraslidingintervalusingfunc.Thefunctionshouldbeassociativesothatitcanbecomputedcorrectlyinparallel.
reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])WhencalledonaDStreamof(K,V)pairs,returnsanewDStreamof(K,V)pairswherethevaluesforeachkeyareaggregatedusingthegivenreducefunctionfuncoverbatchesinaslidingwindow.Note:Bydefault,thisusesSpark'sdefaultnumberofparalleltasks(2forlocalmode,andinclustermodethenumberisdeterminedbytheconfigproperty
spark.default.parallelism
)todothegrouping.Youcanpassanoptional
numTasks
argumenttosetadifferentnumberoftasks.
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])Amoreefficientversionoftheabove
reduceByKeyAndWindow()
wherethereducevalueofeachwindowiscalculatedincrementallyusingthereducevaluesofthepreviouswindow.Thisisdonebyreducingthenewdatathatenterstheslidingwindow,and“inversereducing”theolddatathatleavesthewindow.Anexamplewouldbethatof“adding”and“subtracting”countsofkeysasthewindowslides.However,itisapplicableonlyto“invertiblereducefunctions”,thatis,thosereducefunctionswhichhaveacorresponding“inversereduce”function(takenasparameterinvFunc).Likein
reduceByKeyAndWindow
,thenumberofreducetasksisconfigurablethroughanoptionalargument.Notethatcheckpointingmustbeenabledforusingthisoperation.

countByValueAndWindow(windowLength,slideInterval,[numTasks])WhencalledonaDStreamof(K,V)pairs,returnsanewDStreamof(K,Long)pairswherethevalueofeachkeyisitsfrequencywithinaslidingwindow.Likein
reduceByKeyAndWindow
,thenumberofreducetasksisconfigurablethroughanoptionalargument.

JoinOperations

Finally,itsworthhighlightinghoweasilyyoucanperformdifferentkindsofjoinsinSparkStreaming.

Stream-streamjoins
Streamscanbeveryeasilyjoinedwithotherstreams.

Scala

Java

Python

valstream1:DStream[String,String]=...
valstream2:DStream[String,String]=...
valjoinedStream=stream1.join(stream2)


Here,ineachbatchinterval,theRDDgeneratedby
stream1
willbejoinedwiththeRDDgeneratedby
stream2
.Youcanalsodo
leftOuterJoin
,
rightOuterJoin
,
fullOuterJoin
.Furthermore,itisoftenveryusefultodojoinsoverwindowsofthestreams.Thatisprettyeasyaswell.

Scala

Java

Python

valwindowedStream1=stream1.window(Seconds(20))
valwindowedStream2=stream2.window(Minutes(1))
valjoinedStream=windowedStream1.join(windowedStream2)


Stream-datasetjoins
Thishasalreadybeenshownearlierwhileexplain
DStream.transform
operation.Hereisyetanotherexampleofjoiningawindowedstreamwithadataset.

Scala

Java

Python

valdataset:RDD[String,String]=...
valwindowedStream=stream.window(Seconds(20))...
valjoinedStream=windowedStream.transform{rdd=>rdd.join(dataset)}


Infact,youcanalsodynamicallychangethedatasetyouwanttojoinagainst.Thefunctionprovidedto
transform
isevaluatedeverybatchintervalandthereforewillusethecurrentdatasetthat
dataset
referencepointsto.

ThecompletelistofDStreamtransformationsisavailableintheAPIdocumentation.FortheScalaAPI,seeDStreamandPairDStreamFunctions.FortheJavaAPI,seeJavaDStreamandJavaPairDStream.ForthePythonAPI,seeDStream.

OutputOperationsonDStreams

OutputoperationsallowDStream’sdatatobepushedouttoexternalsystemslikeadatabaseorafilesystems.Sincetheoutputoperationsactuallyallowthetransformeddatatobeconsumedbyexternalsystems,theytriggertheactualexecutionofalltheDStreamtransformations(similartoactionsforRDDs).Currently,thefollowingoutputoperationsaredefined:

OutputOperationMeaning
print()PrintsthefirsttenelementsofeverybatchofdatainaDStreamonthedrivernoderunningthestreamingapplication.Thisisusefulfordevelopmentanddebugging.
PythonAPIThisiscalledpprint()inthePythonAPI.
saveAsTextFiles(prefix,[suffix])SavethisDStream'scontentsastextfiles.Thefilenameateachbatchintervalisgeneratedbasedonprefixandsuffix:"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix,[suffix])SavethisDStream'scontentsas
SequenceFiles
ofserializedJavaobjects.Thefilenameateachbatchintervalisgeneratedbasedonprefixandsuffix:"prefix-TIME_IN_MS[.suffix]".
PythonAPIThisisnotavailableinthePythonAPI.
saveAsHadoopFiles(prefix,[suffix])SavethisDStream'scontentsasHadoopfiles.Thefilenameateachbatchintervalisgeneratedbasedonprefixandsuffix:"prefix-TIME_IN_MS[.suffix]".
PythonAPIThisisnotavailableinthePythonAPI.
foreachRDD(func)Themostgenericoutputoperatorthatappliesafunction,func,toeachRDDgeneratedfromthestream.ThisfunctionshouldpushthedataineachRDDtoanexternalsystem,suchassavingtheRDDtofiles,orwritingitoverthenetworktoadatabase.Notethatthefunctionfuncisexecutedinthedriverprocessrunningthestreamingapplication,andwillusuallyhaveRDDactionsinitthatwillforcethecomputationofthestreamingRDDs.

DesignPatternsforusingforeachRDD

dstream.foreachRDD
isapowerfulprimitivethatallowsdatatobesentouttoexternalsystems.However,itisimportanttounderstandhowtousethisprimitivecorrectlyandefficiently.Someofthecommonmistakestoavoidareasfollows.

Oftenwritingdatatoexternalsystemrequirescreatingaconnectionobject(e.g.TCPconnectiontoaremoteserver)andusingittosenddatatoaremotesystem.Forthispurpose,adevelopermayinadvertentlytrycreatingaconnectionobjectattheSparkdriver,andthentrytouseitinaSparkworkertosaverecordsintheRDDs.Forexample(inScala),

Scala

Python

dstream.foreachRDD{rdd=>
valconnection=createNewConnection()//executedatthedriver
rdd.foreach{record=>
connection.send(record)//executedattheworker
}
}


Thisisincorrectasthisrequirestheconnectionobjecttobeserializedandsentfromthedrivertotheworker.Suchconnectionobjectsarerarelytransferrableacrossmachines.Thiserrormaymanifestasserializationerrors(connectionobjectnotserializable),initializationerrors(connectionobjectneedstobeinitializedattheworkers),etc.Thecorrectsolutionistocreatetheconnectionobjectattheworker.

However,thiscanleadtoanothercommonmistake-creatinganewconnectionforeveryrecord.Forexample,

Scala

Python

dstream.foreachRDD{rdd=>
rdd.foreach{record=>
valconnection=createNewConnection()
connection.send(record)
connection.close()
}
}


Typically,creatingaconnectionobjecthastimeandresourceoverheads.Therefore,creatinganddestroyingaconnectionobjectforeachrecordcanincurunnecessarilyhighoverheadsandcansignificantlyreducetheoverallthroughputofthesystem.Abettersolutionistouse
rdd.foreachPartition
-createasingleconnectionobjectandsendalltherecordsinaRDDpartitionusingthatconnection.

Scala

Python

dstream.foreachRDD{rdd=>
rdd.foreachPartition{partitionOfRecords=>
valconnection=createNewConnection()
partitionOfRecords.foreach(record=>connection.send(record))
connection.close()
}
}


Thisamortizestheconnectioncreationoverheadsovermanyrecords.

Finally,thiscanbefurtheroptimizedbyreusingconnectionobjectsacrossmultipleRDDs/batches.OnecanmaintainastaticpoolofconnectionobjectsthancanbereusedasRDDsofmultiplebatchesarepushedtotheexternalsystem,thusfurtherreducingtheoverheads.

Scala

Python

dstream.foreachRDD{rdd=>
rdd.foreachPartition{partitionOfRecords=>
//ConnectionPoolisastatic,lazilyinitializedpoolofconnections
valconnection=ConnectionPool.getConnection()
partitionOfRecords.foreach(record=>connection.send(record))
ConnectionPool.returnConnection(connection)//returntothepoolforfuturereuse
}
}


Notethattheconnectionsinthepoolshouldbelazilycreatedondemandandtimedoutifnotusedforawhile.Thisachievesthemostefficientsendingofdatatoexternalsystems.

Otherpointstoremember:

DStreamsareexecutedlazilybytheoutputoperations,justlikeRDDsarelazilyexecutedbyRDDactions.Specifically,RDDactionsinsidetheDStreamoutputoperationsforcetheprocessingofthereceiveddata.Hence,ifyourapplicationdoesnothaveanyoutputoperation,orhasoutputoperationslike
dstream.foreachRDD()
withoutanyRDDactioninsidethem,thennothingwillgetexecuted.Thesystemwillsimplyreceivethedataanddiscardit.

Bydefault,outputoperationsareexecutedone-at-a-time.Andtheyareexecutedintheordertheyaredefinedintheapplication.

DataFrameandSQLOperations

YoucaneasilyuseDataFramesandSQLoperationsonstreamingdata.YouhavetocreateaSQLContextusingtheSparkContextthattheStreamingContextisusing.Furthermorethishastodonesuchthatitcanberestartedondriverfailures.ThisisdonebycreatingalazilyinstantiatedsingletoninstanceofSQLContext.Thisisshowninthefollowingexample.ItmodifiestheearlierwordcountexampletogeneratewordcountsusingDataFramesandSQL.EachRDDisconvertedtoaDataFrame,registeredasatemporarytableandthenqueriedusingSQL.

Scala

Java

Python

/**DataFrameoperationsinsideyourstreamingprogram*/

valwords:DStream[String]=...

words.foreachRDD{rdd=>

//GetthesingletoninstanceofSQLContext
valsqlContext=SQLContext.getOrCreate(rdd.sparkContext)
importsqlContext.implicits._

//ConvertRDD[String]toDataFrame
valwordsDataFrame=rdd.toDF("word")

//Registerastable
wordsDataFrame.registerTempTable("words")

//DowordcountonDataFrameusingSQLandprintit
valwordCountsDataFrame=
sqlContext.sql("selectword,count(*)astotalfromwordsgroupbyword")
wordCountsDataFrame.show()
}


Seethefullsourcecode.

YoucanalsorunSQLqueriesontablesdefinedonstreamingdatafromadifferentthread(thatis,asynchronoustotherunningStreamingContext).JustmakesurethatyousettheStreamingContexttorememberasufficientamountofstreamingdatasuchthatthequerycanrun.OtherwisetheStreamingContext,whichisunawareoftheanyasynchronousSQLqueries,willdeleteoffoldstreamingdatabeforethequerycancomplete.Forexample,ifyouwanttoquerythelastbatch,butyourquerycantake5minutestorun,thencall
streamingContext.remember(Minutes(5))
(inScala,orequivalentinotherlanguages).

SeetheDataFramesandSQLguidetolearnmoreaboutDataFrames.

MLlibOperations

YoucanalsoeasilyusemachinelearningalgorithmsprovidedbyMLlib.Firstofall,therearestreamingmachinelearningalgorithms(e.g.StreamingLinearRegression,StreamingKMeans,etc.)whichcansimultaneouslylearnfromthestreamingdataaswellasapplythemodelonthestreamingdata.Beyondthese,foramuchlargerclassofmachinelearningalgorithms,youcanlearnalearningmodeloffline(i.e.usinghistoricaldata)andthenapplythemodelonlineonstreamingdata.SeetheMLlibguideformoredetails.

Caching/Persistence

SimilartoRDDs,DStreamsalsoallowdeveloperstopersistthestream’sdatainmemory.Thatis,usingthe
persist()
methodonaDStreamwillautomaticallypersisteveryRDDofthatDStreaminmemory.ThisisusefulifthedataintheDStreamwillbecomputedmultipletimes(e.g.,multipleoperationsonthesamedata).Forwindow-basedoperationslike
reduceByWindow
and
reduceByKeyAndWindow
andstate-basedoperationslike
updateStateByKey
,thisisimplicitlytrue.Hence,DStreamsgeneratedbywindow-basedoperationsareautomaticallypersistedinmemory,withoutthedevelopercalling
persist()
.

Forinputstreamsthatreceivedataoverthenetwork(suchas,Kafka,Flume,sockets,etc.),thedefaultpersistencelevelissettoreplicatethedatatotwonodesforfault-tolerance.

Notethat,unlikeRDDs,thedefaultpersistencelevelofDStreamskeepsthedataserializedinmemory.ThisisfurtherdiscussedinthePerformanceTuningsection.MoreinformationondifferentpersistencelevelscanbefoundintheSparkProgrammingGuide.

Checkpointing

Astreamingapplicationmustoperate24/7andhencemustberesilienttofailuresunrelatedtotheapplicationlogic(e.g.,systemfailures,JVMcrashes,etc.).Forthistobepossible,SparkStreamingneedstocheckpointenoughinformationtoafault-tolerantstoragesystemsuchthatitcanrecoverfromfailures.Therearetwotypesofdatathatarecheckpointed.

Metadatacheckpointing-Savingoftheinformationdefiningthestreamingcomputationtofault-tolerantstoragelikeHDFS.Thisisusedtorecoverfromfailureofthenoderunningthedriverofthestreamingapplication(discussedindetaillater).Metadataincludes:

Configuration-Theconfigurationthatwasusedtocreatethestreamingapplication.

DStreamoperations-ThesetofDStreamoperationsthatdefinethestreamingapplication.

Incompletebatches-Batcheswhosejobsarequeuedbuthavenotcompletedyet.

Datacheckpointing-SavingofthegeneratedRDDstoreliablestorage.Thisisnecessaryinsomestatefultransformationsthatcombinedataacrossmultiplebatches.Insuchtransformations,thegeneratedRDDsdependonRDDsofpreviousbatches,whichcausesthelengthofthedependencychaintokeepincreasingwithtime.Toavoidsuchunboundedincreasesinrecoverytime(proportionaltodependencychain),intermediateRDDsofstatefultransformationsareperiodicallycheckpointedtoreliablestorage(e.g.HDFS)tocutoffthedependencychains.

Tosummarize,metadatacheckpointingisprimarilyneededforrecoveryfromdriverfailures,whereasdataorRDDcheckpointingisnecessaryevenforbasicfunctioningifstatefultransformationsareused.

WhentoenableCheckpointing

Checkpointingmustbeenabledforapplicationswithanyofthefollowingrequirements:

Usageofstatefultransformations-Ifeither
updateStateByKey
or
reduceByKeyAndWindow
(withinversefunction)isusedintheapplication,thenthecheckpointdirectorymustbeprovidedtoallowforperiodicRDDcheckpointing.

Recoveringfromfailuresofthedriverrunningtheapplication-Metadatacheckpointsareusedtorecoverwithprogressinformation.

Notethatsimplestreamingapplicationswithouttheaforementionedstatefultransformationscanberunwithoutenablingcheckpointing.Therecoveryfromdriverfailureswillalsobepartialinthatcase(somereceivedbutunprocesseddatamaybelost).ThisisoftenacceptableandmanyrunSparkStreamingapplicationsinthisway.Supportfornon-Hadoopenvironmentsisexpectedtoimproveinthefuture.

HowtoconfigureCheckpointing

Checkpointingcanbeenabledbysettingadirectoryinafault-tolerant,reliablefilesystem(e.g.,HDFS,S3,etc.)towhichthecheckpointinformationwillbesaved.Thisisdonebyusing
streamingContext.checkpoint(checkpointDirectory)
.Thiswillallowyoutousetheaforementionedstatefultransformations.Additionally,ifyouwanttomaketheapplicationrecoverfromdriverfailures,youshouldrewriteyourstreamingapplicationtohavethefollowingbehavior.

Whentheprogramisbeingstartedforthefirsttime,itwillcreateanewStreamingContext,setupallthestreamsandthencallstart().

Whentheprogramisbeingrestartedafterfailure,itwillre-createaStreamingContextfromthecheckpointdatainthecheckpointdirectory.

Scala

Java

Python

Thisbehaviorismadesimplebyusing
StreamingContext.getOrCreate
.Thisisusedasfollows.

//FunctiontocreateandsetupanewStreamingContext
deffunctionToCreateContext():StreamingContext={
valssc=newStreamingContext(...)//newcontext
vallines=ssc.socketTextStream(...)//createDStreams
...
ssc.checkpoint(checkpointDirectory)//setcheckpointdirectory
ssc
}

//GetStreamingContextfromcheckpointdataorcreateanewone
valcontext=StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext_)

//Doadditionalsetuponcontextthatneedstobedone,
//irrespectiveofwhetheritisbeingstartedorrestarted
context....

//Startthecontext
context.start()
context.awaitTermination()


Ifthe
checkpointDirectory
exists,thenthecontextwillberecreatedfromthecheckpointdata.Ifthedirectorydoesnotexist(i.e.,runningforthefirsttime),thenthefunction
functionToCreateContext
willbecalledtocreateanewcontextandsetuptheDStreams.SeetheScalaexampleRecoverableNetworkWordCount.Thisexampleappendsthewordcountsofnetworkdataintoafile.

Inadditiontousing
getOrCreate
onealsoneedstoensurethatthedriverprocessgetsrestartedautomaticallyonfailure.Thiscanonlybedonebythedeploymentinfrastructurethatisusedtoruntheapplication.ThisisfurtherdiscussedintheDeploymentsection.

NotethatcheckpointingofRDDsincursthecostofsavingtoreliablestorage.ThismaycauseanincreaseintheprocessingtimeofthosebatcheswhereRDDsgetcheckpointed.Hence,theintervalofcheckpointingneedstobesetcarefully.Atsmallbatchsizes(say1second),checkpointingeverybatchmaysignificantlyreduceoperationthroughput.Conversely,checkpointingtooinfrequentlycausesthelineageandtasksizestogrow,whichmayhavedetrimentaleffects.ForstatefultransformationsthatrequireRDDcheckpointing,thedefaultintervalisamultipleofthebatchintervalthatisatleast10seconds.Itcanbesetbyusing
dstream.checkpoint(checkpointInterval)
.Typically,acheckpointintervalof5-10slidingintervalsofaDStreamisagoodsettingtotry.

DeployingApplications

ThissectiondiscussesthestepstodeployaSparkStreamingapplication.

Requirements

TorunaSparkStreamingapplications,youneedtohavethefollowing.

Clusterwithaclustermanager-ThisisthegeneralrequirementofanySparkapplication,anddiscussedindetailinthedeploymentguide.

PackagetheapplicationJAR-YouhavetocompileyourstreamingapplicationintoaJAR.Ifyouareusing
spark-submit
tostarttheapplication,thenyouwillnotneedtoprovideSparkandSparkStreamingintheJAR.However,ifyourapplicationusesadvancedsources(e.g.Kafka,Flume,Twitter),thenyouwillhavetopackagetheextraartifacttheylinkto,alongwiththeirdependencies,intheJARthatisusedtodeploytheapplication.Forexample,anapplicationusing
TwitterUtils
willhavetoinclude
spark-streaming-twitter_2.10
andallitstransitivedependenciesintheapplicationJAR.

Configuringsufficientmemoryfortheexecutors-Sincethereceiveddatamustbestoredinmemory,theexecutorsmustbeconfiguredwithsufficientmemorytoholdthereceiveddata.Notethatifyouaredoing10minutewindowoperations,thesystemhastokeepatleastlast10minutesofdatainmemory.Sothememoryrequirementsfortheapplicationdependsontheoperationsusedinit.

Configuringcheckpointing-Ifthestreamapplicationrequiresit,thenadirectoryintheHadoopAPIcompatiblefault-tolerantstorage(e.g.HDFS,S3,etc.)mustbeconfiguredasthecheckpointdirectoryandthestreamingapplicationwritteninawaythatcheckpointinformationcanbeusedforfailurerecovery.Seethecheckpointingsectionformoredetails.

Configuringautomaticrestartoftheapplicationdriver-Toautomaticallyrecoverfromadriverfailure,thedeploymentinfrastructurethatisusedtorunthestreamingapplicationmustmonitorthedriverprocessandrelaunchthedriverifitfails.Differentclustermanagershavedifferenttoolstoachievethis.

SparkStandalone-ASparkapplicationdrivercanbesubmittedtorunwithintheSparkStandalonecluster(seeclusterdeploymode),thatis,theapplicationdriveritselfrunsononeoftheworkernodes.Furthermore,theStandaloneclustermanagercanbeinstructedtosupervisethedriver,andrelaunchitifthedriverfailseitherduetonon-zeroexitcode,orduetofailureofthenoderunningthedriver.SeeclustermodeandsuperviseintheSparkStandaloneguideformoredetails.

YARN-Yarnsupportsasimilarmechanismforautomaticallyrestartinganapplication.PleaserefertoYARNdocumentationformoredetails.

Mesos-MarathonhasbeenusedtoachievethiswithMesos.

Configuringwriteaheadlogs-SinceSpark1.2,wehaveintroducedwriteaheadlogsforachievingstrongfault-toleranceguarantees.Ifenabled,allthedatareceivedfromareceivergetswrittenintoawriteaheadlogintheconfigurationcheckpointdirectory.Thispreventsdatalossondriverrecovery,thusensuringzerodataloss(discussedindetailintheFault-toleranceSemanticssection).Thiscanbeenabledbysettingtheconfigurationparameter
spark.streaming.receiver.writeAheadLog.enable
to
true
.However,thesestrongersemanticsmaycomeatthecostofthereceivingthroughputofindividualreceivers.Thiscanbecorrectedbyrunningmorereceiversinparalleltoincreaseaggregatethroughput.Additionally,itisrecommendedthatthereplicationofthereceiveddatawithinSparkbedisabledwhenthewriteaheadlogisenabledasthelogisalreadystoredinareplicatedstoragesystem.Thiscanbedonebysettingthestoragelevelfortheinputstreamto
StorageLevel.MEMORY_AND_DISK_SER
.

Settingthemaxreceivingrate-Iftheclusterresourcesisnotlargeenoughforthestreamingapplicationtoprocessdataasfastasitisbeingreceived,thereceiverscanberatelimitedbysettingamaximumratelimitintermsofrecords/sec.Seetheconfigurationparameters
spark.streaming.receiver.maxRate
forreceiversand
spark.streaming.kafka.maxRatePerPartition
forDirectKafkaapproach.InSpark1.5,wehaveintroducedafeaturecalledbackpressurethateliminatetheneedtosetthisratelimit,asSparkStreamingautomaticallyfiguresouttheratelimitsanddynamicallyadjuststhemiftheprocessingconditionschange.Thisbackpressurecanbeenabledbysettingtheconfigurationparameter
spark.streaming.backpressure.enabled
to
true
.

UpgradingApplicationCode

IfarunningSparkStreamingapplicationneedstobeupgradedwithnewapplicationcode,thentherearetwopossiblemechanisms.

TheupgradedSparkStreamingapplicationisstartedandruninparalleltotheexistingapplication.Oncethenewone(receivingthesamedataastheoldone)hasbeenwarmedupandisreadyforprimetime,theoldonebecanbebroughtdown.Notethatthiscanbedonefordatasourcesthatsupportsendingthedatatotwodestinations(i.e.,theearlierandupgradedapplications).

Theexistingapplicationisshutdowngracefully(see
StreamingContext.stop(...)
or
JavaStreamingContext.stop(...)
forgracefulshutdownoptions)whichensuredatathathasbeenreceivediscompletelyprocessedbeforeshutdown.Thentheupgradedapplicationcanbestarted,whichwillstartprocessingfromthesamepointwheretheearlierapplicationleftoff.Notethatthiscanbedoneonlywithinputsourcesthatsupportsource-sidebuffering(likeKafka,andFlume)asdataneedstobebufferedwhilethepreviousapplicationwasdownandtheupgradedapplicationisnotyetup.Andrestartingfromearliercheckpointinformationofpre-upgradecodecannotbedone.ThecheckpointinformationessentiallycontainsserializedScala/Java/Pythonobjectsandtryingtodeserializeobjectswithnew,modifiedclassesmayleadtoerrors.Inthiscase,eitherstarttheupgradedappwithadifferentcheckpointdirectory,ordeletethepreviouscheckpointdirectory.

OtherConsiderations

Ifthedataisbeingreceivedbythereceiversfasterthanwhatcanbeprocessed,youcanlimittheratebysettingtheconfigurationparameter
spark.streaming.receiver.maxRate
.

MonitoringApplications

BeyondSpark’smonitoringcapabilities,thereareadditionalcapabilitiesspecifictoSparkStreaming.WhenaStreamingContextisused,theSparkwebUIshowsanadditional
Streaming
tabwhichshowsstatisticsaboutrunningreceivers(whetherreceiversareactive,numberofrecordsreceived,receivererror,etc.)andcompletedbatches(batchprocessingtimes,queueingdelays,etc.).Thiscanbeusedtomonitortheprogressofthestreamingapplication.

ThefollowingtwometricsinwebUIareparticularlyimportant:

ProcessingTime-Thetimetoprocesseachbatchofdata.

SchedulingDelay-thetimeabatchwaitsinaqueuefortheprocessingofpreviousbatchestofinish.

Ifthebatchprocessingtimeisconsistentlymorethanthebatchintervaland/orthequeueingdelaykeepsincreasing,thenitindicatesthatthesystemisnotabletoprocessthebatchesasfasttheyarebeinggeneratedandisfallingbehind.Inthatcase,considerreducingthebatchprocessingtime.

TheprogressofaSparkStreamingprogramcanalsobemonitoredusingtheStreamingListenerinterface,whichallowsyoutogetreceiverstatusandprocessingtimes.NotethatthisisadeveloperAPIanditislikelytobeimprovedupon(i.e.,moreinformationreported)inthefuture.

PerformanceTuning

GettingthebestperformanceoutofaSparkStreamingapplicationonaclusterrequiresabitoftuning.Thissectionexplainsanumberoftheparametersandconfigurationsthatcanbetunedtoimprovetheperformanceofyouapplication.Atahighlevel,youneedtoconsidertwothings:

Reducingtheprocessingtimeofeachbatchofdatabyefficientlyusingclusterresources.

Settingtherightbatchsizesuchthatthebatchesofdatacanbeprocessedasfastastheyarereceived(thatis,dataprocessingkeepsupwiththedataingestion).

ReducingtheBatchProcessingTimes

ThereareanumberofoptimizationsthatcanbedoneinSparktominimizetheprocessingtimeofeachbatch.ThesehavebeendiscussedindetailintheTuningGuide.Thissectionhighlightssomeofthemostimportantones.

LevelofParallelisminDataReceiving

Receivingdataoverthenetwork(likeKafka,Flume,socket,etc.)requiresthedatatobedeserializedandstoredinSpark.Ifthedatareceivingbecomesabottleneckinthesystem,thenconsiderparallelizingthedatareceiving.NotethateachinputDStreamcreatesasinglereceiver(runningonaworkermachine)thatreceivesasinglestreamofdata.ReceivingmultipledatastreamscanthereforebeachievedbycreatingmultipleinputDStreamsandconfiguringthemtoreceivedifferentpartitionsofthedatastreamfromthesource(s).Forexample,asingleKafkainputDStreamreceivingtwotopicsofdatacanbesplitintotwoKafkainputstreams,eachreceivingonlyonetopic.Thiswouldruntworeceivers,allowingdatatobereceivedinparallel,thusincreasingoverallthroughput.ThesemultipleDStreamscanbeunionedtogethertocreateasingleDStream.ThenthetransformationsthatwerebeingappliedonasingleinputDStreamcanbeappliedontheunifiedstream.Thisisdoneasfollows.

Scala

Java

Python

valnumStreams=5
valkafkaStreams=(1tonumStreams).map{i=>KafkaUtils.createStream(...)}
valunifiedStream=streamingContext.union(kafkaStreams)
unifiedStream.print()


Anotherparameterthatshouldbeconsideredisthereceiver’sblockinginterval,whichisdeterminedbytheconfigurationparameter
spark.streaming.blockInterval
.Formostreceivers,thereceiveddataiscoalescedtogetherintoblocksofdatabeforestoringinsideSpark’smemory.Thenumberofblocksineachbatchdeterminesthenumberoftasksthatwillbeusedtoprocessthereceiveddatainamap-liketransformation.Thenumberoftasksperreceiverperbatchwillbeapproximately(batchinterval/blockinterval).Forexample,blockintervalof200mswillcreate10tasksper2secondbatches.Ifthenumberoftasksistoolow(thatis,lessthanthenumberofcorespermachine),thenitwillbeinefficientasallavailablecoreswillnotbeusedtoprocessthedata.Toincreasethenumberoftasksforagivenbatchinterval,reducetheblockinterval.However,therecommendedminimumvalueofblockintervalisabout50ms,belowwhichthetasklaunchingoverheadsmaybeaproblem.

Analternativetoreceivingdatawithmultipleinputstreams/receiversistoexplicitlyrepartitiontheinputdatastream(using
inputStream.repartition(<numberofpartitions>)
).Thisdistributesthereceivedbatchesofdataacrossthespecifiednumberofmachinesintheclusterbeforefurtherprocessing.

LevelofParallelisminDataProcessing

Clusterresourcescanbeunder-utilizedifthenumberofparalleltasksusedinanystageofthecomputationisnothighenough.Forexample,fordistributedreduceoperationslike
reduceByKey
and
reduceByKeyAndWindow
,thedefaultnumberofparalleltasksiscontrolledbythe
spark.default.parallelism
configurationproperty.Youcanpassthelevelofparallelismasanargument(see
PairDStreamFunctions
documentation),orsetthe
spark.default.parallelism
configurationpropertytochangethedefault.

DataSerialization

Theoverheadsofdataserializationcanbereducedbytuningtheserializationformats.Inthecaseofstreaming,therearetwotypesofdatathatarebeingserialized.

Inputdata:Bydefault,theinputdatareceivedthroughReceiversisstoredintheexecutors’memorywithStorageLevel.MEMORY_AND_DISK_SER_2.Thatis,thedataisserializedintobytestoreduceGCoverheads,andreplicatedfortoleratingexecutorfailures.Also,thedataiskeptfirstinmemory,andspilledovertodiskonlyifthememoryisinsufficienttoholdalloftheinputdatanecessaryforthestreamingcomputation.Thisserializationobviouslyhasoverheads–thereceivermustdeserializethereceiveddataandre-serializeitusingSpark’sserializationformat.

PersistedRDDsgeneratedbyStreamingOperations:RDDsgeneratedbystreamingcomputationsmaybepersistedinmemory.Forexample,windowoperationspersistdatainmemoryastheywouldbeprocessedmultipletimes.However,unliketheSparkCoredefaultofStorageLevel.MEMORY_ONLY,persistedRDDsgeneratedbystreamingcomputationsarepersistedwithStorageLevel.MEMORY_ONLY_SER(i.e.serialized)bydefaulttominimizeGCoverheads.

Inbothcases,usingKryoserializationcanreducebothCPUandmemoryoverheads.SeetheSparkTuningGuideformoredetails.ForKryo,considerregisteringcustomclasses,anddisablingobjectreferencetracking(seeKryo-relatedconfigurationsintheConfigurationGuide).

Inspecificcaseswheretheamountofdatathatneedstoberetainedforthestreamingapplicationisnotlarge,itmaybefeasibletopersistdata(bothtypes)asdeserializedobjectswithoutincurringexcessiveGCoverheads.Forexample,ifyouareusingbatchintervalsofafewsecondsandnowindowoperations,thenyoucantrydisablingserializationinpersisteddatabyexplicitlysettingthestoragelevelaccordingly.ThiswouldreducetheCPUoverheadsduetoserialization,potentiallyimprovingperformancewithouttoomuchGCoverheads.

TaskLaunchingOverheads

Ifthenumberoftaskslaunchedpersecondishigh(say,50ormorepersecond),thentheoverheadofsendingouttaskstotheslavesmaybesignificantandwillmakeithardtoachievesub-secondlatencies.Theoverheadcanbereducedbythefollowingchanges:

TaskSerialization:UsingKryoserializationforserializingtaskscanreducethetasksizes,andthereforereducethetimetakentosendthemtotheslaves.Thisiscontrolledbythe
spark.closure.serializer
property.However,atthistime,Kryoserializationcannotbeenabledforclosureserialization.Thismayberesolvedinafuturerelease.

Executionmode:RunningSparkinStandalonemodeorcoarse-grainedMesosmodeleadstobettertasklaunchtimesthanthefine-grainedMesosmode.PleaserefertotheRunningonMesosguideformoredetails.

Thesechangesmayreducebatchprocessingtimeby100sofmilliseconds,thusallowingsub-secondbatchsizetobeviable.

SettingtheRightBatchInterval

ForaSparkStreamingapplicationrunningonaclustertobestable,thesystemshouldbeabletoprocessdataasfastasitisbeingreceived.Inotherwords,batchesofdatashouldbeprocessedasfastastheyarebeinggenerated.WhetherthisistrueforanapplicationcanbefoundbymonitoringtheprocessingtimesinthestreamingwebUI,wherethebatchprocessingtimeshouldbelessthanthebatchinterval.

Dependingonthenatureofthestreamingcomputation,thebatchintervalusedmayhavesignificantimpactonthedataratesthatcanbesustainedbytheapplicationonafixedsetofclusterresources.Forexample,letusconsidertheearlierWordCountNetworkexample.Foraparticulardatarate,thesystemmaybeabletokeepupwithreportingwordcountsevery2seconds(i.e.,batchintervalof2seconds),butnotevery500milliseconds.Sothebatchintervalneedstobesetsuchthattheexpecteddatarateinproductioncanbesustained.

Agoodapproachtofigureouttherightbatchsizeforyourapplicationistotestitwithaconservativebatchinterval(say,5-10seconds)andalowdatarate.Toverifywhetherthesystemisabletokeepupwiththedatarate,youcancheckthevalueoftheend-to-enddelayexperiencedbyeachprocessedbatch(eitherlookfor“Totaldelay”inSparkdriverlog4jlogs,orusetheStreamingListenerinterface).Ifthedelayismaintainedtobecomparabletothebatchsize,thensystemisstable.Otherwise,ifthedelayiscontinuouslyincreasing,itmeansthatthesystemisunabletokeepupanditthereforeunstable.Onceyouhaveanideaofastableconfiguration,youcantryincreasingthedatarateand/orreducingthebatchsize.Notethatamomentaryincreaseinthedelayduetotemporarydatarateincreasesmaybefineaslongasthedelayreducesbacktoalowvalue(i.e.,lessthanbatchsize).

MemoryTuning

TuningthememoryusageandGCbehaviorofSparkapplicationshasbeendiscussedingreatdetailintheTuningGuide.Itisstronglyrecommendedthatyoureadthat.Inthissection,wediscussafewtuningparametersspecificallyinthecontextofSparkStreamingapplications.

TheamountofclustermemoryrequiredbyaSparkStreamingapplicationdependsheavilyonthetypeoftransformationsused.Forexample,ifyouwanttouseawindowoperationonthelast10minutesofdata,thenyourclustershouldhavesufficientmemorytohold10minutesworthofdatainmemory.Orifyouwanttouse
updateStateByKey
withalargenumberofkeys,thenthenecessarymemorywillbehigh.Onthecontrary,ifyouwanttodoasimplemap-filter-storeoperation,thenthenecessarymemorywillbelow.

Ingeneral,sincethedatareceivedthroughreceiversisstoredwithStorageLevel.MEMORY_AND_DISK_SER_2,thedatathatdoesnotfitinmemorywillspillovertothedisk.Thismayreducetheperformanceofthestreamingapplication,andhenceitisadvisedtoprovidesufficientmemoryasrequiredbyyourstreamingapplication.Itsbesttotryandseethememoryusageonasmallscaleandestimateaccordingly.

Anotheraspectofmemorytuningisgarbagecollection.Forastreamingapplicationthatrequireslowlatency,itisundesirabletohavelargepausescausedbyJVMGarbageCollection.

ThereareafewparametersthatcanhelpyoutunethememoryusageandGCoverheads:

PersistenceLevelofDStreams:AsmentionedearlierintheDataSerializationsection,theinputdataandRDDsarebydefaultpersistedasserializedbytes.ThisreducesboththememoryusageandGCoverheads,comparedtodeserializedpersistence.EnablingKryoserializationfurtherreducesserializedsizesandmemoryusage.Furtherreductioninmemoryusagecanbeachievedwithcompression(seetheSparkconfiguration
spark.rdd.compress
),atthecostofCPUtime.

Clearingolddata:Bydefault,allinputdataandpersistedRDDsgeneratedbyDStreamtransformationsareautomaticallycleared.SparkStreamingdecideswhentoclearthedatabasedonthetransformationsthatareused.Forexample,ifyouareusingawindowoperationof10minutes,thenSparkStreamingwillkeeparoundthelast10minutesofdata,andactivelythrowawayolderdata.Datacanberetainedforalongerduration(e.g.interactivelyqueryingolderdata)bysetting
streamingContext.remember
.

CMSGarbageCollector:Useoftheconcurrentmark-and-sweepGCisstronglyrecommendedforkeepingGC-relatedpausesconsistentlylow.EventhoughconcurrentGCisknowntoreducetheoverallprocessingthroughputofthesystem,itsuseisstillrecommendedtoachievemoreconsistentbatchprocessingtimes.MakesureyousettheCMSGConboththedriver(using
--driver-java-options
in
spark-submit
)andtheexecutors(usingSparkconfiguration
spark.executor.extraJavaOptions
).

Othertips:TofurtherreduceGCoverheads,herearesomemoretipstotry.

UseTachyonforoff-heapstorageofpersistedRDDs.SeemoredetailintheSparkProgrammingGuide.

Usemoreexecutorswithsmallerheapsizes.ThiswillreducetheGCpressurewithineachJVMheap.

Fault-toleranceSemantics

Inthissection,wewilldiscussthebehaviorofSparkStreamingapplicationsintheeventoffailures.

Background

TounderstandthesemanticsprovidedbySparkStreaming,letusrememberthebasicfault-tolerancesemanticsofSpark’sRDDs.

AnRDDisanimmutable,deterministicallyre-computable,distributeddataset.EachRDDremembersthelineageofdeterministicoperationsthatwereusedonafault-tolerantinputdatasettocreateit.

IfanypartitionofanRDDislostduetoaworkernodefailure,thenthatpartitioncanbere-computedfromtheoriginalfault-tolerantdatasetusingthelineageofoperations.

AssumingthatalloftheRDDtransformationsaredeterministic,thedatainthefinaltransformedRDDwillalwaysbethesameirrespectiveoffailuresintheSparkcluster.

Sparkoperatesondatainfault-tolerantfilesystemslikeHDFSorS3.Hence,alloftheRDDsgeneratedfromthefault-tolerantdataarealsofault-tolerant.However,thisisnotthecaseforSparkStreamingasthedatainmostcasesisreceivedoverthenetwork(exceptwhen
fileStream
isused).Toachievethesamefault-tolerancepropertiesforallofthegeneratedRDDs,thereceiveddataisreplicatedamongmultipleSparkexecutorsinworkernodesinthecluster(defaultreplicationfactoris2).Thisleadstotwokindsofdatainthesystemthatneedtorecoveredintheeventoffailures:

Datareceivedandreplicated-Thisdatasurvivesfailureofasingleworkernodeasacopyofitexistsononeoftheothernodes.

Datareceivedbutbufferedforreplication-Sincethisisnotreplicated,theonlywaytorecoverthisdataistogetitagainfromthesource.

Furthermore,therearetwokindsoffailuresthatweshouldbeconcernedabout:

FailureofaWorkerNode-Anyoftheworkernodesrunningexecutorscanfail,andallin-memorydataonthosenodeswillbelost.Ifanyreceiverswererunningonfailednodes,thentheirbuffereddatawillbelost.

FailureoftheDriverNode-IfthedrivernoderunningtheSparkStreamingapplicationfails,thenobviouslytheSparkContextislost,andallexecutorswiththeirin-memorydataarelost.

Withthisbasicknowledge,letusunderstandthefault-tolerancesemanticsofSparkStreaming.

Definitions

Thesemanticsofstreamingsystemsareoftencapturedintermsofhowmanytimeseachrecordcanbeprocessedbythesystem.Therearethreetypesofguaranteesthatasystemcanprovideunderallpossibleoperatingconditions(despitefailures,etc.)

Atmostonce:Eachrecordwillbeeitherprocessedonceornotprocessedatall.

Atleastonce:Eachrecordwillbeprocessedoneormoretimes.Thisisstrongerthanat-mostonceasitensurethatnodatawillbelost.Buttheremaybeduplicates.

Exactlyonce:Eachrecordwillbeprocessedexactlyonce-nodatawillbelostandnodatawillbeprocessedmultipletimes.Thisisobviouslythestrongestguaranteeofthethree.

BasicSemantics

Inanystreamprocessingsystem,broadlyspeaking,therearethreestepsinprocessingthedata.

Receivingthedata:ThedataisreceivedfromsourcesusingReceiversorotherwise.

Transformingthedata:ThereceiveddataistransformedusingDStreamandRDDtransformations.

Pushingoutthedata:Thefinaltransformeddataispushedouttoexternalsystemslikefilesystems,databases,dashboards,etc.

Ifastreamingapplicationhastoachieveend-to-endexactly-onceguarantees,theneachstephastoprovideanexactly-onceguarantee.Thatis,eachrecordmustbereceivedexactlyonce,transformedexactlyonce,andpushedtodownstreamsystemsexactlyonce.Let’sunderstandthesemanticsofthesestepsinthecontextofSparkStreaming.

Receivingthedata:Differentinputsourcesprovidedifferentguarantees.Thisisdiscussedindetailinthenextsubsection.

Transformingthedata:Alldatathathasbeenreceivedwillbeprocessedexactlyonce,thankstotheguaranteesthatRDDsprovide.Eveniftherearefailures,aslongasthereceivedinputdataisaccessible,thefinaltransformedRDDswillalwayshavethesamecontents.

Pushingoutthedata:Outputoperationsbydefaultensureat-leastoncesemanticsbecauseitdependsonthetypeofoutputoperation(idempotent,ornot)andthesemanticsofthedownstreamsystem(supportstransactionsornot).Butuserscanimplementtheirowntransactionmechanismstoachieveexactly-oncesemantics.Thisisdiscussedinmoredetailslaterinthesection.

SemanticsofReceivedData

Differentinputsourcesprovidedifferentguarantees,rangingfromat-leastoncetoexactlyonce.Readformoredetails.

WithFiles

Ifalloftheinputdataisalreadypresentinafault-tolerantfilesystemlikeHDFS,SparkStreamingcanalwaysrecoverfromanyfailureandprocessallofthedata.Thisgivesexactly-oncesemantics,meaningallofthedatawillbeprocessedexactlyoncenomatterwhatfails.

WithReceiver-basedSources

Forinputsourcesbasedonreceivers,thefault-tolerancesemanticsdependonboththefailurescenarioandthetypeofreceiver.Aswediscussedearlier,therearetwotypesofreceivers:

ReliableReceiver-Thesereceiversacknowledgereliablesourcesonlyafterensuringthatthereceiveddatahasbeenreplicated.Ifsuchareceiverfails,thesourcewillnotreceiveacknowledgmentforthebuffered(unreplicated)data.Therefore,ifthereceiverisrestarted,thesourcewillresendthedata,andnodatawillbelostduetothefailure.

UnreliableReceiver-Suchreceiversdonotsendacknowledgmentandthereforecanlosedatawhentheyfailduetoworkerordriverfailures.

Dependingonwhattypeofreceiversareusedweachievethefollowingsemantics.Ifaworkernodefails,thenthereisnodatalosswithreliablereceivers.Withunreliablereceivers,datareceivedbutnotreplicatedcangetlost.Ifthedrivernodefails,thenbesidestheselosses,allofthepastdatathatwasreceivedandreplicatedinmemorywillbelost.Thiswillaffecttheresultsofthestatefultransformations.

Toavoidthislossofpastreceiveddata,Spark1.2introducedwriteaheadlogswhichsavethereceiveddatatofault-tolerantstorage.Withthewriteaheadlogsenabledandreliablereceivers,thereiszerodataloss.Intermsofsemantics,itprovidesanat-leastonceguarantee.

Thefollowingtablesummarizesthesemanticsunderfailures:

DeploymentScenarioWorkerFailureDriverFailure
Spark1.1orearlier,OR
Spark1.2orlaterwithoutwriteaheadlogs
Buffereddatalostwithunreliablereceivers
Zerodatalosswithreliablereceivers
At-leastoncesemantics
Buffereddatalostwithunreliablereceivers
Pastdatalostwithallreceivers
Undefinedsemantics
Spark1.2orlaterwithwriteaheadlogsZerodatalosswithreliablereceivers
At-leastoncesemantics
Zerodatalosswithreliablereceiversandfiles
At-leastoncesemantics

WithKafkaDirectAPI

InSpark1.3,wehaveintroducedanewKafkaDirectAPI,whichcanensurethatalltheKafkadataisreceivedbySparkStreamingexactlyonce.Alongwiththis,ifyouimplementexactly-onceoutputoperation,youcanachieveend-to-endexactly-onceguarantees.Thisapproach(experimentalasofSpark1.6.0)isfurtherdiscussedintheKafkaIntegrationGuide.

Semanticsofoutputoperations

Outputoperations(like
foreachRDD
)haveat-leastoncesemantics,thatis,thetransformeddatamaygetwrittentoanexternalentitymorethanonceintheeventofaworkerfailure.Whilethisisacceptableforsavingtofilesystemsusingthe
saveAs***Files
operations(asthefilewillsimplygetoverwrittenwiththesamedata),additionaleffortmaybenecessarytoachieveexactly-oncesemantics.Therearetwoapproaches.

Idempotentupdates:Multipleattemptsalwayswritethesamedata.Forexample,
saveAs***Files
alwayswritesthesamedatatothegeneratedfiles.

Transactionalupdates:Allupdatesaremadetransactionallysothatupdatesaremadeexactlyonceatomically.Onewaytodothiswouldbethefollowing.

Usethebatchtime(availablein
foreachRDD
)andthepartitionindexoftheRDDtocreateanidentifier.Thisidentifieruniquelyidentifiesablobdatainthestreamingapplication.

Updateexternalsystemwiththisblobtransactionally(thatis,exactlyonce,atomically)usingtheidentifier.Thatis,iftheidentifierisnotalreadycommitted,committhepartitiondataandtheidentifieratomically.Else,ifthiswasalreadycommitted,skiptheupdate.

dstream.foreachRDD{(rdd,time)=>
rdd.foreachPartition{partitionIterator=>
valpartitionId=TaskContext.get.partitionId()
valuniqueId=generateUniqueId(time.milliseconds,partitionId)
//usethisuniqueIdtotransactionallycommitthedatainpartitionIterator
}
}


MigrationGuidefrom0.9.1orbelowto1.x

BetweenSpark0.9.1andSpark1.0,therewereafewAPIchangesmadetoensurefutureAPIstability.Thissectionelaboratesthestepsrequiredtomigrateyourexistingcodeto1.0.

InputDStreams:Alloperationsthatcreateaninputstream(e.g.,
StreamingContext.socketStream
,
FlumeUtils.createStream
,etc.)nowreturnsInputDStream/ReceiverInputDStream(insteadofDStream)forScala,andJavaInputDStream/JavaPairInputDStream/JavaReceiverInputDStream/JavaPairReceiverInputDStream(insteadofJavaDStream)forJava.Thisensuresthatfunctionalityspecifictoinputstreamscanbeaddedtotheseclassesinthefuturewithoutbreakingbinarycompatibility.NotethatyourexistingSparkStreamingapplicationsshouldnotrequireanychange(asthesenewclassesaresubclassesofDStream/JavaDStream)butmayrequirerecompilationwithSpark1.0.

CustomNetworkReceivers:SincethereleasetoSparkStreaming,customnetworkreceiverscouldbedefinedinScalausingtheclassNetworkReceiver.However,theAPIwaslimitedintermsoferrorhandlingandreporting,andcouldnotbeusedfromJava.StartingSpark1.0,thisclasshasbeenreplacedbyReceiverwhichhasthefollowingadvantages.

Methodslike
stop
and
restart
havebeenaddedtoforbettercontrolofthelifecycleofareceiver.Seethecustomreceiverguideformoredetails.

CustomreceiverscanbeimplementedusingbothScalaandJava.

TomigrateyourexistingcustomreceiversfromtheearlierNetworkReceivertothenewReceiver,youhavetodothefollowing.

Makeyourcustomreceiverclassextend
org.apache.spark.streaming.receiver.Receiver
insteadof
org.apache.spark.streaming.dstream.NetworkReceiver
.

Earlier,aBlockGeneratorobjecthadtobecreatedbythecustomreceiver,towhichreceiveddatawasaddedforbeingstoredinSpark.Ithadtobeexplicitlystartedandstoppedfrom
onStart()
and
onStop()
methods.ThenewReceiverclassmakesthisunnecessaryasitaddsasetofmethodsnamed
store(<data>)
thatcanbecalledtostorethedatainSpark.So,tomigrateyourcustomnetworkreceiver,removeanyBlockGeneratorobject(doesnotexistanymoreinSpark1.0anyway),anduse
store(...)
methodsonreceiveddata.

Actor-basedReceivers:DatacouldhavebeenreceivedusinganyAkkaActorsbyextendingtheactorclasswith
org.apache.spark.streaming.receivers.Receiver
trait.Thishasbeenrenamedto
org.apache.spark.streaming.receiver.ActorHelper
andthe
pushBlock(...)
methodstostorereceiveddatahasbeenrenamedto
store(...)
.Otherhelperclassesinthe
org.apache.spark.streaming.receivers
packagewerealsomovedto
org.apache.spark.streaming.receiver
packageandrenamedforbetterclarity.

WheretoGofromHere

Additionalguides

KafkaIntegrationGuide

FlumeIntegrationGuide

KinesisIntegrationGuide

CustomReceiverGuide

APIdocumentation

Scaladocs

StreamingContextandDStream

KafkaUtils,FlumeUtils,KinesisUtils,TwitterUtils,ZeroMQUtils,andMQTTUtils

Javadocs

JavaStreamingContext,JavaDStreamandJavaPairDStream

KafkaUtils,FlumeUtils,KinesisUtilsTwitterUtils,ZeroMQUtils,andMQTTUtils

Pythondocs

StreamingContextandDStream

KafkaUtils

MoreexamplesinScalaandJavaandPython

PaperandvideodescribingSparkStreaming.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: