<译>Spark Sreaming 编程指南
2016-01-08 13:01
411 查看
SparkStreaming编程指南
概述
SparkStreaming是Spark核心API的扩展,提供对实时数据的可扩展、高吞吐、容错的流式计算。数据可以从很多数据源摄入,比如Kafka,Flume,Twitter,ZeroMQ,Kinesis或则TCP套接字,数据可以被一些高层的带有复杂算法的方法处理,比如map、reduce、join和window.最后,被处理后的数据被输出到文件系统中、数据库中或实时仪表盘上。事实上,你可以在Spark流中使用Spark自带的在Spark内部,运作方式如下图。SparkSteaming接收实时的输入数据并把它们划分成batches,这些batches稍后会被SparkEngine处理生成最终的结果流。
SparkStreaming提供一个叫做离散流或DStream的高层抽象,它代表不间断的数据流。DStream既可以在来自数据源的数据流中被创建(比如Kafaka,Flume等),也可以在其它DStream中应用高层的操作。在内部,一个DStream就代表一个
这篇编程指南展示了怎样开始写一个包含DStream的SparkStreaming程序。你可以使用Scala、java或Python(spark1.2引入),都会在这里展示。整篇文章中会有标签标识不同的代码片段。
注意:这里对于Python语言有几个既不相同又不适用的APIs。以下会高亮出来。
AQuickExample
在编写自己的SparkStreaming之前,我们浏览一下一个简单的SparkStreaming程序是怎样的。我们看下统计一个监听的TCP连接中text的单词个数的例子,你所要做的全部工作如下:First,weimportthenamesoftheSparkStreamingclassesandsomeimplicitconversionsfromStreamingContextintoourenvironmentinordertoaddusefulmethodstootherclassesweneed(likeDStream).
首先,我们把SparkStreaming的类和StreamingContext中的一些相关类import进来。
importorg.apache.spark.*; importorg.apache.spark.api.java.function.*; importorg.apache.spark.streaming.*; importorg.apache.spark.streaming.api.java.*; importscala.Tuple2; //CreatealocalStreamingContextwithtwoworkingthreadandbatchintervalof1second SparkConfconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount") JavaStreamingContextjssc=newJavaStreamingContext(conf,Durations.seconds(1))
通过这些上下文,我们就创建了一个从TCP获取资源的DStream,并且已经指定了hostname(如localhost等)和端口号。
//CreateaDStreamthatwillconnecttohostname:port,likelocalhost:9999 JavaReceiverInputDStream<String>lines=jssc.socketTextStream("localhost",9999);
This
linesDStreamrepresentsthestreamofdatathatwillbereceivedfromthedataserver.EachrecordinthisDStreamisalineoftext.Next,wewanttosplitthelinesbyspacecharactersintowords.
下面的lines表示将要从数据源接收到的数据流。这个DStream中的每一条记录就是一行text。接下来,我们想要将lines中的单词根据空格分开。
//Spliteachlineintowords JavaDStream<String>words=lines.flatMap( newFlatMapFunction<String,String>(){ @OverridepublicIterable<String>call(Stringx){ returnArrays.asList(x.split("")); } });
flatMap是一变多的DStream操作,它会将一条记录分成多个新的单词。在这个例子中,每一行将被分割成多个word,这些word组成的流就表示为wordsDStream。接下来,我们要对这些word进行累加。
//Counteachwordineachbatch JavaPairDStream<String,Integer>pairs=words.mapToPair( newPairFunction<String,String,Integer>(){ @OverridepublicTuple2<String,Integer>call(Strings){ returnnewTuple2<String,Integer>(s,1); } }); JavaPairDStream<String,Integer>wordCounts=pairs.reduceByKey( newFunction2<Integer,Integer,Integer>(){ @OverridepublicIntegercall(Integeri1,Integeri2){ returni1+i2; } }); //PrintthefirsttenelementsofeachRDDgeneratedinthisDStreamtotheconsole wordCounts.print();
The
wordsDStreamisfurthermapped(one-to-onetransformation)toaDStreamof
(word,1)pairs,whichisthenreducedtogetthefrequencyofwordsineachbatchofdata.Finally,
wordCounts.print()willprintafewofthecountsgeneratedeverysecond.
Notethatwhentheselinesareexecuted,SparkStreamingonlysetsupthecomputationitwillperformwhenitisstarted,andnorealprocessinghasstartedyet.Tostarttheprocessingafterallthetransformationshavebeensetup,wefinallycall
ssc.start()//Startthecomputation
ssc.awaitTermination()//Waitforthecomputationtoterminate
ThecompletecodecanbefoundintheSparkStreamingexample
Ifyouhavealready
$nc-lk9999
Then,inadifferentterminal,youcanstarttheexamplebyusing
$./bin/run-examplestreaming.NetworkWordCountlocalhost9999
Then,anylinestypedintheterminalrunningthenetcatserverwillbecountedandprintedonscreeneverysecond.Itwilllooksomethinglikethefollowing.
#TERMINAL1: | #TERMINAL2:RUNNINGNetworkWordCount |
BasicConcepts
Next,wemovebeyondthesimpleexampleandelaborateonthebasicsofSparkStreaming.Linking
SimilartoSpark,SparkStreamingisavailablethroughMavenCentral.TowriteyourownSparkStreamingprogram,youwillhavetoaddthefollowingdependencytoyourSBTorMavenproject.<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.0</version> </dependency>
ForingestingdatafromsourceslikeKafka,Flume,andKinesisthatarenotpresentintheSparkStreamingcoreAPI,youwillhavetoaddthecorrespondingartifact
spark-streaming-xyz_2.10tothedependencies.Forexample,someofthecommononesareasfollows.
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka_2.10 |
Flume | spark-streaming-flume_2.10 |
Kinesis | spark-streaming-kinesis-asl_2.10[AmazonSoftwareLicense] |
spark-streaming-twitter_2.10 | |
ZeroMQ | spark-streaming-zeromq_2.10 |
MQTT | spark-streaming-mqtt_2.10 |
InitializingStreamingContext
ToinitializeaSparkStreamingprogram,aStreamingContextobjecthastobecreatedwhichisthemainentrypointofallSparkStreamingfunctionality.A
importorg.apache.spark._
importorg.apache.spark.streaming._
valconf=newSparkConf().setAppName(appName).setMaster(master)
valssc=newStreamingContext(conf,Seconds(1))
The
appNameparameterisanameforyourapplicationtoshowontheclusterUI.
masterisa
masterintheprogram,butrather
spark-submitandreceiveitthere.However,forlocaltestingandunittests,youcanpass“local[*]”torunSparkStreamingin-process(detectsthenumberofcoresinthelocalsystem).Notethatthisinternallycreatesa
ssc.sparkContext.
Thebatchintervalmustbesetbasedonthelatencyrequirementsofyourapplicationandavailableclusterresources.Seethe
A
StreamingContextobjectcanalsobecreatedfromanexisting
SparkContextobject.
importorg.apache.spark.streaming._
valsc=...//existingSparkContext
valssc=newStreamingContext(sc,Seconds(1))
Afteracontextisdefined,youhavetodothefollowing.
DefinetheinputsourcesbycreatinginputDStreams.
DefinethestreamingcomputationsbyapplyingtransformationandoutputoperationstoDStreams.
Startreceivingdataandprocessingitusing
streamingContext.start().
Waitfortheprocessingtobestopped(manuallyorduetoanyerror)using
streamingContext.awaitTermination().
Theprocessingcanbemanuallystoppedusing
streamingContext.stop().
Pointstoremember:
Onceacontexthasbeenstarted,nonewstreamingcomputationscanbesetuporaddedtoit.
Onceacontexthasbeenstopped,itcannotberestarted.
OnlyoneStreamingContextcanbeactiveinaJVMatthesametime.
stop()onStreamingContextalsostopstheSparkContext.TostoponlytheStreamingContext,settheoptionalparameterof
stop()called
stopSparkContexttofalse.
ASparkContextcanbere-usedtocreatemultipleStreamingContexts,aslongasthepreviousStreamingContextisstopped(withoutstoppingtheSparkContext)beforethenextStreamingContextiscreated.
离散流(DStreams)
离散流(也叫DStream)是SparkStreaming提供的最基本的抽象。它代表一个持续的数据流,无论是刚从数据源接收到的,或者是输入流被处理过后生成的新数据流。在内部,DStream代表着一连串的持续的RDD,如下图所示,每一个DStream里的RDD都包含着一定时间间隔的数据。任何使用在DStream上的操作在底层都被转换为对RDD的操作。比如,在最早的wordcount例子中,将流中的句子转换为单词,flatmap方法就是使用在linesDStream上去生成wordsDStream的RDD。如下图所示
这些底层的RDD操作由Sparkengine来执行。对DStream的操作隐藏了许多细节,提供给开发者一个方便的高层API。这些操作都将在后面详细讨论
InputDStreams和Receivers
InputDStream代表刚从数据源获得的输入数据流。在quikexample中,lines就是InputDStream,它代表着从netcat服务器接收到的数据流。每一个输入流(除了fileStream,将在后面讨论)都关联着一个Receiver,它从数据源接收数据并存储到Spark内存中供消费。SparkStreaming提供两种内置的streaming源
基本源:直接在sreamingContextAPI中可以使用的。例如:文件系统、socke连接、Akka
Advancedsources:源头如Kafka,Flume,Kinesis,Twitter等等,这些需要依赖其他包,
接下来会详细讨论
注意,如果你要在你的应用中并行地接收多个数据流,你可以创建多个InputDStream,这也将会创建多个receivers来接收多个流数据。但是注意Sparkworker/executor是一个长时间运行的task,因此它会占用分配给Spark应用的核数中的一个。所以,要确保SparkStreaming应用有足够的核数(或者线程,如果本地运行)来处理已经接收到的数据,同时也要保证有足够的核数来运行receiver(s)。
需要记住的点
本地运行时,不要使用"local"或"local[1]"作为masterURL。它们意味着只使用一个线程来跑task。如果使用了带有recevier的InputDStream(如sockets,Kafka,Flume等),那么仅有的这个线程就会去运行recevier,就没有线程来处理已经接收的数据了。因此,如果要本地运行,通常使用"local
",n>recevier的数量。(怎样设置masterURL,参见
同理,如果在集群上运行,分配的cores应该大于recevier。
BasicSources
在之前的例子中,我们已经知道了ssc.socketTextStream(...)通过TCPsocket连接接收text数据来创建DStream,除此之外,StreamingContextAPI提供从文件系统、Akkaactors输入源来创建DStream的方法文件流:从任何与HDFSAPI兼容的文件系统(如HDFS,S3,NFS等)的文件中读取数据,创建DStream:
streamingContext.fileStream<KeyClass,ValueClass,InputFormatClass>(dataDirectory);
SparkStreaming会监控给出的目录dataDirectory并处理在此目录中创建的任何文件(不支持嵌套目录)。注意:
文件必须有相同的数据格式
文件必须通过原子操作移动到此目录或重命名
一旦完成移动,文件不能被改动。所以如果文件中的数据是连续增加的,新增的数据不会被读取。
对于简单的文件,有一个更简单的方法可以被使用:streamingContext.textFileStream(dataDirectory)。并且文件流不需要recevier,所以不需要分配多余的cores。
fileStream在PythonAPI中不适用,只有textFileStream可用
StreamsbasedonCustomActors:来从Akkaactors接收数据创建DStream,详情参见
PythonAPI:目前为止
actorStream只适用于java和scalaAPI,还不适用于PythonAPI。
QueueofRDDsasaStream:为了使用测试数据测试SparkStreaming应用,也可以通过一个RDDs队列来创建DStream:streamingContext.queueStream(queueOfRDDs)。每一个队列里的RDD将会被当做一个数据batch来对待,像处理stream一样。
更多的关于socket,files,和actors的详情,参看API文档里的有关方法:scala
AdvancedSources
Spark1.6.0中,Kafka,Kinesis,FlumeandMQTT在PythonAPI中都有用。Thiscategoryofsourcesrequireinterfacingwithexternalnon-Sparklibraries,someofthemwithcomplexdependencies(e.g.,KafkaandFlume).Hence,tominimizeissuesrelatedtoversionconflictsofdependencies,thefunctionalitytocreateDStreamsfromthesesourceshasbeenmovedtoseparatelibrariesthatcanbe
这一类的source中需要扩展的非Spark类库,它们其中一些有着很复杂的依赖(像Kafka和Flume).因此,为了减小依赖间的版本冲突带来的问题,这些source被设计成分离的API,需要时再引入。举个例子,如果你想创建一个Twitter流,你将要以下步骤:
Linking:添加spark-streaming-twitter_2.10到SBT/Maven依赖中
Programming:导入TwtterUtils类并通过TwitterUtils.createStream创建DStream。
Deploying:生成一个包含有所有依赖的JAR,并部署这个JAR.详情见
importorg.apache.spark.streaming.twitter.*;
TwitterUtils.createStream(jssc);
注意这些advancedsources在SparkShell,因此不能再Shell里测试.如果确实想在Shell里面使用,你需要下载相应的Maven依赖,并添加入classpath.
一些advancedsources如下:
Kafka:SparkStreaming1.6.0iscompatiblewithKafka0.8.2.1.Seethe
Flume:SparkStreaming1.6.0iscompatiblewithFlume1.6.0.Seethe
Kinesis:SparkStreaming1.6.0iscompatiblewithKinesisClientLibrary1.2.1.Seethe
Twitter:SparkStreaming’sTwitterUtilsusesTwitter4jtogetthepublicstreamoftweetsusing
CustomSources
PythonAPI目前为止暂时还不支持InputDStreams也能被第三方数据源创建。你所要做的就是实现一个用户自定义的recevier来从custormsource接收数据,然后将数据推送给Spark。详情请参见
Receiver的可靠性
根据可靠性可将数据源分为两类。支持数据ack的数据源(比如Kafka和Flume)如果系统从这些可靠的数据源收到数据并正常ack,这就能保证没有数据回应为任何错误而丢失。以下是两类数据源:可靠Receiver:可靠的recevier正确接到数据并存储到Spark里后,向数据源发送ack信息
不可靠Receiver:一个不可靠的recevier不发送ack信息给数据源。这可以用在不支持ack的数据源中,或者是使用可靠数据源但不想使用ack时。
关于如何写可靠的recevier,见
TransformationsonDStreams
和RDDs一样,transformation允许输入数据中的数据被改动。DStreams在普通的SparkRDD上支持不少transformation。它们的其中一些如下:Transformation | Meaning |
---|---|
map(func) | 源DStream中的每一个元素在通过方法func的处理后返回一个新的DStream |
flatMap(func) | 和map类似,但是每一个输入目标可能对应着0个或多个输出目标 |
filter(func) | 返回使func返回true的DStream,过滤器 |
repartition(numPartitions) | 通过创建更多或更少的分区来改变DStream的并行度 |
union(otherStream) | 返回一个包含源DStream和其它DStream的元素的集合的新的DStream |
count() | 返回一个每个RDD只包含一个元素的单元素RDDDStream,这个RDD中的元素就是源DStream中每个RDD中元素个数的统计值 |
reduce(func) | 返回一个单元素RDDDStream,每一个RDD的元素就是源DStream中的每一个RDD的元素通过func方法聚合得到的。这个方法应该被设计成associative的,以便利于并行计算。 |
countByValue() | 当在元素类型为K的DStream上使用时,就会返回一个包含(K,Long)对的DStream,值就是源DStream中每个RDD中每个键出现的频率 |
reduceByKey(func,[numTasks]) | 在含有(K,V)对的DStream上调用,返回每个K对应的V通过给定函数聚合之后的键值对DStream。注意:默认情况下,使用Spark默认的并行task数(本地情况默认为2,集群情况下通过配置文件属性spark.default.parallelism来配置)来进行分组。你可以通过numTask参数来设置不同的tasks数 |
join(otherStream,[numTasks]) | 在包含(K,V)和(K,W)对的两条DStream上使用此方法,将返回包含(K,(V,W))对的DStream |
cogroup(otherStream,[numTasks]) | 在包含(K,V)和(K,W)对的DStream上调用,将会返回一个包含(K,Seq[V],Seq[W])的tuples |
transform(func) | 使用func方法处理源DStream的每一个RDD,每个RDD生成一个新的RDD。这可以被用来处理DStream的任意RDD |
updateStateByKey(func) | 通过给定的方法func作用于先前的状态和每个键对应的新值来改变DStream中每个键的状态,返回一个带有新状态的DSream。这能被用做维持每个键任意状态数据。 |
UpdateStateByKeyOperation
TheupdateStateByKeyoperationallowsyoutomaintainarbitrarystatewhilecontinuouslyupdatingitwithnewinformation.Tousethis,youwillhavetodotwosteps.
Definethestate-Thestatecanbeanarbitrarydatatype.
Definethestateupdatefunction-Specifywithafunctionhowtoupdatethestateusingthepreviousstateandthenewvaluesfromaninputstream.
Ineverybatch,Sparkwillapplythestateupdatefunctionforallexistingkeys,regardlessofwhethertheyhavenewdatainabatchornot.Iftheupdatefunctionreturns
Nonethenthekey-valuepairwillbeeliminated.
Let’sillustratethiswithanexample.Sayyouwanttomaintainarunningcountofeachwordseeninatextdatastream.Here,therunningcountisthestateanditisaninteger.Wedefinetheupdatefunctionas:
defupdateFunction(newValues:Seq[Int],runningCount:Option[Int]):Option[Int]={
valnewCount=...//addthenewvalueswiththepreviousrunningcounttogetthenewcount
Some(newCount)
}
ThisisappliedonaDStreamcontainingwords(say,the
pairsDStreamcontaining
(word,1)pairsinthe
valrunningCounts=pairs.updateStateByKey[Int](updateFunction_)
Theupdatefunctionwillbecalledforeachword,with
newValueshavingasequenceof1’s(fromthe
(word,1)pairs)andthe
runningCounthavingthepreviouscount.ForthecompleteScalacode,takealookattheexample
Notethatusing
updateStateByKeyrequiresthecheckpointdirectorytobeconfigured,whichisdiscussedindetailinthe
TransformOperation
Thetransformoperation(alongwithitsvariationslike
transformWith)allowsarbitraryRDD-to-RDDfunctionstobeappliedonaDStream.ItcanbeusedtoapplyanyRDDoperationthatisnotexposedintheDStreamAPI.Forexample,thefunctionalityofjoiningeverybatchinadatastreamwithanotherdatasetisnotdirectlyexposedintheDStreamAPI.However,youcaneasilyuse
transformtodothis.Thisenablesverypowerfulpossibilities.Forexample,onecandoreal-timedatacleaningbyjoiningtheinputdatastreamwithprecomputedspaminformation(maybegeneratedwithSparkaswell)andthenfilteringbasedonit.
valspamInfoRDD=ssc.sparkContext.newAPIHadoopRDD(...)//RDDcontainingspaminformation
valcleanedDStream=wordCounts.transform(rdd=>{
rdd.join(spamInfoRDD).filter(...)//joindatastreamwithspaminformationtododatacleaning
...
})
Notethatthesuppliedfunctiongetscalledineverybatchinterval.Thisallowsyoutodotime-varyingRDDoperations,thatis,RDDoperations,numberofpartitions,broadcastvariables,etc.canbechangedbetweenbatches.
WindowOperations
SparkStreamingalsoprovideswindowedcomputations,whichallowyoutoapplytransformationsoveraslidingwindowofdata.Thefollowingfigureillustratesthisslidingwindow.Asshowninthefigure,everytimethewindowslidesoverasourceDStream,thesourceRDDsthatfallwithinthewindowarecombinedandoperatedupontoproducetheRDDsofthewindowedDStream.Inthisspecificcase,theoperationisappliedoverthelast3timeunitsofdata,andslidesby2timeunits.Thisshowsthatanywindowoperationneedstospecifytwoparameters.
windowlength-Thedurationofthewindow(3inthefigure).
slidinginterval-Theintervalatwhichthewindowoperationisperformed(2inthefigure).
ThesetwoparametersmustbemultiplesofthebatchintervalofthesourceDStream(1inthefigure).
Let’sillustratethewindowoperationswithanexample.Say,youwanttoextendthe
reduceByKeyoperationonthe
pairsDStreamof
(word,1)pairsoverthelast30secondsofdata.Thisisdoneusingtheoperation
reduceByKeyAndWindow.
//Reducelast30secondsofdata,every10seconds
valwindowedWordCounts=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))
Someofthecommonwindowoperationsareasfollows.Alloftheseoperationstakethesaidtwoparameters-windowLengthandslideInterval.
Transformation | Meaning |
---|---|
window(windowLength,slideInterval) | ReturnanewDStreamwhichiscomputedbasedonwindowedbatchesofthesourceDStream. |
countByWindow(windowLength,slideInterval) | Returnaslidingwindowcountofelementsinthestream. |
reduceByWindow(func,windowLength,slideInterval) | Returnanewsingle-elementstream,createdbyaggregatingelementsinthestreamoveraslidingintervalusingfunc.Thefunctionshouldbeassociativesothatitcanbecomputedcorrectlyinparallel. |
reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks]) | WhencalledonaDStreamof(K,V)pairs,returnsanewDStreamof(K,V)pairswherethevaluesforeachkeyareaggregatedusingthegivenreducefunctionfuncoverbatchesinaslidingwindow.Note:Bydefault,thisusesSpark'sdefaultnumberofparalleltasks(2forlocalmode,andinclustermodethenumberisdeterminedbytheconfigpropertyspark.default.parallelism)todothegrouping.Youcanpassanoptional numTasksargumenttosetadifferentnumberoftasks. |
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks]) | AmoreefficientversionoftheabovereduceByKeyAndWindow()wherethereducevalueofeachwindowiscalculatedincrementallyusingthereducevaluesofthepreviouswindow.Thisisdonebyreducingthenewdatathatenterstheslidingwindow,and“inversereducing”theolddatathatleavesthewindow.Anexamplewouldbethatof“adding”and“subtracting”countsofkeysasthewindowslides.However,itisapplicableonlyto“invertiblereducefunctions”,thatis,thosereducefunctionswhichhaveacorresponding“inversereduce”function(takenasparameterinvFunc).Likein reduceByKeyAndWindow,thenumberofreducetasksisconfigurablethroughanoptionalargument.Notethat |
countByValueAndWindow(windowLength,slideInterval,[numTasks]) | WhencalledonaDStreamof(K,V)pairs,returnsanewDStreamof(K,Long)pairswherethevalueofeachkeyisitsfrequencywithinaslidingwindow.LikeinreduceByKeyAndWindow,thenumberofreducetasksisconfigurablethroughanoptionalargument. |
JoinOperations
Finally,itsworthhighlightinghoweasilyyoucanperformdifferentkindsofjoinsinSparkStreaming.Stream-streamjoins
Streamscanbeveryeasilyjoinedwithotherstreams.
valstream1:DStream[String,String]=...
valstream2:DStream[String,String]=...
valjoinedStream=stream1.join(stream2)
Here,ineachbatchinterval,theRDDgeneratedby
stream1willbejoinedwiththeRDDgeneratedby
stream2.Youcanalsodo
leftOuterJoin,
rightOuterJoin,
fullOuterJoin.Furthermore,itisoftenveryusefultodojoinsoverwindowsofthestreams.Thatisprettyeasyaswell.
valwindowedStream1=stream1.window(Seconds(20))
valwindowedStream2=stream2.window(Minutes(1))
valjoinedStream=windowedStream1.join(windowedStream2)
Stream-datasetjoins
Thishasalreadybeenshownearlierwhileexplain
DStream.transformoperation.Hereisyetanotherexampleofjoiningawindowedstreamwithadataset.
valdataset:RDD[String,String]=...
valwindowedStream=stream.window(Seconds(20))...
valjoinedStream=windowedStream.transform{rdd=>rdd.join(dataset)}
Infact,youcanalsodynamicallychangethedatasetyouwanttojoinagainst.Thefunctionprovidedto
transformisevaluatedeverybatchintervalandthereforewillusethecurrentdatasetthat
datasetreferencepointsto.
ThecompletelistofDStreamtransformationsisavailableintheAPIdocumentation.FortheScalaAPI,see
OutputOperationsonDStreams
OutputoperationsallowDStream’sdatatobepushedouttoexternalsystemslikeadatabaseorafilesystems.Sincetheoutputoperationsactuallyallowthetransformeddatatobeconsumedbyexternalsystems,theytriggertheactualexecutionofalltheDStreamtransformations(similartoactionsforRDDs).Currently,thefollowingoutputoperationsaredefined:OutputOperation | Meaning |
---|---|
print() | PrintsthefirsttenelementsofeverybatchofdatainaDStreamonthedrivernoderunningthestreamingapplication.Thisisusefulfordevelopmentanddebugging. PythonAPIThisiscalledpprint()inthePythonAPI. |
saveAsTextFiles(prefix,[suffix]) | SavethisDStream'scontentsastextfiles.Thefilenameateachbatchintervalisgeneratedbasedonprefixandsuffix:"prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix,[suffix]) | SavethisDStream'scontentsasSequenceFilesofserializedJavaobjects.Thefilenameateachbatchintervalisgeneratedbasedonprefixandsuffix:"prefix-TIME_IN_MS[.suffix]". PythonAPIThisisnotavailableinthePythonAPI. |
saveAsHadoopFiles(prefix,[suffix]) | SavethisDStream'scontentsasHadoopfiles.Thefilenameateachbatchintervalisgeneratedbasedonprefixandsuffix:"prefix-TIME_IN_MS[.suffix]". PythonAPIThisisnotavailableinthePythonAPI. |
foreachRDD(func) | Themostgenericoutputoperatorthatappliesafunction,func,toeachRDDgeneratedfromthestream.ThisfunctionshouldpushthedataineachRDDtoanexternalsystem,suchassavingtheRDDtofiles,orwritingitoverthenetworktoadatabase.Notethatthefunctionfuncisexecutedinthedriverprocessrunningthestreamingapplication,andwillusuallyhaveRDDactionsinitthatwillforcethecomputationofthestreamingRDDs. |
DesignPatternsforusingforeachRDD
dstream.foreachRDDisapowerfulprimitivethatallowsdatatobesentouttoexternalsystems.However,itisimportanttounderstandhowtousethisprimitivecorrectlyandefficiently.Someofthecommonmistakestoavoidareasfollows.
Oftenwritingdatatoexternalsystemrequirescreatingaconnectionobject(e.g.TCPconnectiontoaremoteserver)andusingittosenddatatoaremotesystem.Forthispurpose,adevelopermayinadvertentlytrycreatingaconnectionobjectattheSparkdriver,andthentrytouseitinaSparkworkertosaverecordsintheRDDs.Forexample(inScala),
dstream.foreachRDD{rdd=>
valconnection=createNewConnection()//executedatthedriver
rdd.foreach{record=>
connection.send(record)//executedattheworker
}
}
Thisisincorrectasthisrequirestheconnectionobjecttobeserializedandsentfromthedrivertotheworker.Suchconnectionobjectsarerarelytransferrableacrossmachines.Thiserrormaymanifestasserializationerrors(connectionobjectnotserializable),initializationerrors(connectionobjectneedstobeinitializedattheworkers),etc.Thecorrectsolutionistocreatetheconnectionobjectattheworker.
However,thiscanleadtoanothercommonmistake-creatinganewconnectionforeveryrecord.Forexample,
dstream.foreachRDD{rdd=>
rdd.foreach{record=>
valconnection=createNewConnection()
connection.send(record)
connection.close()
}
}
Typically,creatingaconnectionobjecthastimeandresourceoverheads.Therefore,creatinganddestroyingaconnectionobjectforeachrecordcanincurunnecessarilyhighoverheadsandcansignificantlyreducetheoverallthroughputofthesystem.Abettersolutionistouse
rdd.foreachPartition-createasingleconnectionobjectandsendalltherecordsinaRDDpartitionusingthatconnection.
dstream.foreachRDD{rdd=>
rdd.foreachPartition{partitionOfRecords=>
valconnection=createNewConnection()
partitionOfRecords.foreach(record=>connection.send(record))
connection.close()
}
}
Thisamortizestheconnectioncreationoverheadsovermanyrecords.
Finally,thiscanbefurtheroptimizedbyreusingconnectionobjectsacrossmultipleRDDs/batches.OnecanmaintainastaticpoolofconnectionobjectsthancanbereusedasRDDsofmultiplebatchesarepushedtotheexternalsystem,thusfurtherreducingtheoverheads.
dstream.foreachRDD{rdd=>
rdd.foreachPartition{partitionOfRecords=>
//ConnectionPoolisastatic,lazilyinitializedpoolofconnections
valconnection=ConnectionPool.getConnection()
partitionOfRecords.foreach(record=>connection.send(record))
ConnectionPool.returnConnection(connection)//returntothepoolforfuturereuse
}
}
Notethattheconnectionsinthepoolshouldbelazilycreatedondemandandtimedoutifnotusedforawhile.Thisachievesthemostefficientsendingofdatatoexternalsystems.
Otherpointstoremember:
DStreamsareexecutedlazilybytheoutputoperations,justlikeRDDsarelazilyexecutedbyRDDactions.Specifically,RDDactionsinsidetheDStreamoutputoperationsforcetheprocessingofthereceiveddata.Hence,ifyourapplicationdoesnothaveanyoutputoperation,orhasoutputoperationslike
dstream.foreachRDD()withoutanyRDDactioninsidethem,thennothingwillgetexecuted.Thesystemwillsimplyreceivethedataanddiscardit.
Bydefault,outputoperationsareexecutedone-at-a-time.Andtheyareexecutedintheordertheyaredefinedintheapplication.
DataFrameandSQLOperations
Youcaneasilyuse/**DataFrameoperationsinsideyourstreamingprogram*/
valwords:DStream[String]=...
words.foreachRDD{rdd=>
//GetthesingletoninstanceofSQLContext
valsqlContext=SQLContext.getOrCreate(rdd.sparkContext)
importsqlContext.implicits._
//ConvertRDD[String]toDataFrame
valwordsDataFrame=rdd.toDF("word")
//Registerastable
wordsDataFrame.registerTempTable("words")
//DowordcountonDataFrameusingSQLandprintit
valwordCountsDataFrame=
sqlContext.sql("selectword,count(*)astotalfromwordsgroupbyword")
wordCountsDataFrame.show()
}
Seethefull
YoucanalsorunSQLqueriesontablesdefinedonstreamingdatafromadifferentthread(thatis,asynchronoustotherunningStreamingContext).JustmakesurethatyousettheStreamingContexttorememberasufficientamountofstreamingdatasuchthatthequerycanrun.OtherwisetheStreamingContext,whichisunawareoftheanyasynchronousSQLqueries,willdeleteoffoldstreamingdatabeforethequerycancomplete.Forexample,ifyouwanttoquerythelastbatch,butyourquerycantake5minutestorun,thencall
streamingContext.remember(Minutes(5))(inScala,orequivalentinotherlanguages).
Seethe
MLlibOperations
YoucanalsoeasilyusemachinelearningalgorithmsprovidedbyCaching/Persistence
SimilartoRDDs,DStreamsalsoallowdeveloperstopersistthestream’sdatainmemory.Thatis,usingthepersist()methodonaDStreamwillautomaticallypersisteveryRDDofthatDStreaminmemory.ThisisusefulifthedataintheDStreamwillbecomputedmultipletimes(e.g.,multipleoperationsonthesamedata).Forwindow-basedoperationslike
reduceByWindowand
reduceByKeyAndWindowandstate-basedoperationslike
updateStateByKey,thisisimplicitlytrue.Hence,DStreamsgeneratedbywindow-basedoperationsareautomaticallypersistedinmemory,withoutthedevelopercalling
persist().
Forinputstreamsthatreceivedataoverthenetwork(suchas,Kafka,Flume,sockets,etc.),thedefaultpersistencelevelissettoreplicatethedatatotwonodesforfault-tolerance.
Notethat,unlikeRDDs,thedefaultpersistencelevelofDStreamskeepsthedataserializedinmemory.Thisisfurtherdiscussedinthe
Checkpointing
Astreamingapplicationmustoperate24/7andhencemustberesilienttofailuresunrelatedtotheapplicationlogic(e.g.,systemfailures,JVMcrashes,etc.).Forthistobepossible,SparkStreamingneedstocheckpointenoughinformationtoafault-tolerantstoragesystemsuchthatitcanrecoverfromfailures.Therearetwotypesofdatathatarecheckpointed.Metadatacheckpointing-Savingoftheinformationdefiningthestreamingcomputationtofault-tolerantstoragelikeHDFS.Thisisusedtorecoverfromfailureofthenoderunningthedriverofthestreamingapplication(discussedindetaillater).Metadataincludes:
Configuration-Theconfigurationthatwasusedtocreatethestreamingapplication.
DStreamoperations-ThesetofDStreamoperationsthatdefinethestreamingapplication.
Incompletebatches-Batcheswhosejobsarequeuedbuthavenotcompletedyet.
Datacheckpointing-SavingofthegeneratedRDDstoreliablestorage.Thisisnecessaryinsomestatefultransformationsthatcombinedataacrossmultiplebatches.Insuchtransformations,thegeneratedRDDsdependonRDDsofpreviousbatches,whichcausesthelengthofthedependencychaintokeepincreasingwithtime.Toavoidsuchunboundedincreasesinrecoverytime(proportionaltodependencychain),intermediateRDDsofstatefultransformationsareperiodicallycheckpointedtoreliablestorage(e.g.HDFS)tocutoffthedependencychains.
Tosummarize,metadatacheckpointingisprimarilyneededforrecoveryfromdriverfailures,whereasdataorRDDcheckpointingisnecessaryevenforbasicfunctioningifstatefultransformationsareused.
WhentoenableCheckpointing
Checkpointingmustbeenabledforapplicationswithanyofthefollowingrequirements:Usageofstatefultransformations-Ifeither
updateStateByKeyor
reduceByKeyAndWindow(withinversefunction)isusedintheapplication,thenthecheckpointdirectorymustbeprovidedtoallowforperiodicRDDcheckpointing.
Recoveringfromfailuresofthedriverrunningtheapplication-Metadatacheckpointsareusedtorecoverwithprogressinformation.
Notethatsimplestreamingapplicationswithouttheaforementionedstatefultransformationscanberunwithoutenablingcheckpointing.Therecoveryfromdriverfailureswillalsobepartialinthatcase(somereceivedbutunprocesseddatamaybelost).ThisisoftenacceptableandmanyrunSparkStreamingapplicationsinthisway.Supportfornon-Hadoopenvironmentsisexpectedtoimproveinthefuture.
HowtoconfigureCheckpointing
Checkpointingcanbeenabledbysettingadirectoryinafault-tolerant,reliablefilesystem(e.g.,HDFS,S3,etc.)towhichthecheckpointinformationwillbesaved.ThisisdonebyusingstreamingContext.checkpoint(checkpointDirectory).Thiswillallowyoutousetheaforementionedstatefultransformations.Additionally,ifyouwanttomaketheapplicationrecoverfromdriverfailures,youshouldrewriteyourstreamingapplicationtohavethefollowingbehavior.
Whentheprogramisbeingstartedforthefirsttime,itwillcreateanewStreamingContext,setupallthestreamsandthencallstart().
Whentheprogramisbeingrestartedafterfailure,itwillre-createaStreamingContextfromthecheckpointdatainthecheckpointdirectory.
Thisbehaviorismadesimplebyusing
StreamingContext.getOrCreate.Thisisusedasfollows.
//FunctiontocreateandsetupanewStreamingContext
deffunctionToCreateContext():StreamingContext={
valssc=newStreamingContext(...)//newcontext
vallines=ssc.socketTextStream(...)//createDStreams
...
ssc.checkpoint(checkpointDirectory)//setcheckpointdirectory
ssc
}
//GetStreamingContextfromcheckpointdataorcreateanewone
valcontext=StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext_)
//Doadditionalsetuponcontextthatneedstobedone,
//irrespectiveofwhetheritisbeingstartedorrestarted
context....
//Startthecontext
context.start()
context.awaitTermination()
Ifthe
checkpointDirectoryexists,thenthecontextwillberecreatedfromthecheckpointdata.Ifthedirectorydoesnotexist(i.e.,runningforthefirsttime),thenthefunction
functionToCreateContextwillbecalledtocreateanewcontextandsetuptheDStreams.SeetheScalaexample
Inadditiontousing
getOrCreateonealsoneedstoensurethatthedriverprocessgetsrestartedautomaticallyonfailure.Thiscanonlybedonebythedeploymentinfrastructurethatisusedtoruntheapplication.Thisisfurtherdiscussedinthe
NotethatcheckpointingofRDDsincursthecostofsavingtoreliablestorage.ThismaycauseanincreaseintheprocessingtimeofthosebatcheswhereRDDsgetcheckpointed.Hence,theintervalofcheckpointingneedstobesetcarefully.Atsmallbatchsizes(say1second),checkpointingeverybatchmaysignificantlyreduceoperationthroughput.Conversely,checkpointingtooinfrequentlycausesthelineageandtasksizestogrow,whichmayhavedetrimentaleffects.ForstatefultransformationsthatrequireRDDcheckpointing,thedefaultintervalisamultipleofthebatchintervalthatisatleast10seconds.Itcanbesetbyusing
dstream.checkpoint(checkpointInterval).Typically,acheckpointintervalof5-10slidingintervalsofaDStreamisagoodsettingtotry.
DeployingApplications
ThissectiondiscussesthestepstodeployaSparkStreamingapplication.Requirements
TorunaSparkStreamingapplications,youneedtohavethefollowing.Clusterwithaclustermanager-ThisisthegeneralrequirementofanySparkapplication,anddiscussedindetailinthe
PackagetheapplicationJAR-YouhavetocompileyourstreamingapplicationintoaJAR.Ifyouareusing
spark-submittostarttheapplication,thenyouwillnotneedtoprovideSparkandSparkStreamingintheJAR.However,ifyourapplicationuses
TwitterUtilswillhavetoinclude
spark-streaming-twitter_2.10andallitstransitivedependenciesintheapplicationJAR.
Configuringsufficientmemoryfortheexecutors-Sincethereceiveddatamustbestoredinmemory,theexecutorsmustbeconfiguredwithsufficientmemorytoholdthereceiveddata.Notethatifyouaredoing10minutewindowoperations,thesystemhastokeepatleastlast10minutesofdatainmemory.Sothememoryrequirementsfortheapplicationdependsontheoperationsusedinit.
Configuringcheckpointing-Ifthestreamapplicationrequiresit,thenadirectoryintheHadoopAPIcompatiblefault-tolerantstorage(e.g.HDFS,S3,etc.)mustbeconfiguredasthecheckpointdirectoryandthestreamingapplicationwritteninawaythatcheckpointinformationcanbeusedforfailurerecovery.Seethe
Configuringautomaticrestartoftheapplicationdriver-Toautomaticallyrecoverfromadriverfailure,thedeploymentinfrastructurethatisusedtorunthestreamingapplicationmustmonitorthedriverprocessandrelaunchthedriverifitfails.Different
SparkStandalone-ASparkapplicationdrivercanbesubmittedtorunwithintheSparkStandalonecluster(see
YARN-Yarnsupportsasimilarmechanismforautomaticallyrestartinganapplication.PleaserefertoYARNdocumentationformoredetails.
Mesos-
Configuringwriteaheadlogs-SinceSpark1.2,wehaveintroducedwriteaheadlogsforachievingstrongfault-toleranceguarantees.Ifenabled,allthedatareceivedfromareceivergetswrittenintoawriteaheadlogintheconfigurationcheckpointdirectory.Thispreventsdatalossondriverrecovery,thusensuringzerodataloss(discussedindetailinthe
spark.streaming.receiver.writeAheadLog.enableto
true.However,thesestrongersemanticsmaycomeatthecostofthereceivingthroughputofindividualreceivers.Thiscanbecorrectedbyrunning
StorageLevel.MEMORY_AND_DISK_SER.
Settingthemaxreceivingrate-Iftheclusterresourcesisnotlargeenoughforthestreamingapplicationtoprocessdataasfastasitisbeingreceived,thereceiverscanberatelimitedbysettingamaximumratelimitintermsofrecords/sec.Seethe
spark.streaming.receiver.maxRateforreceiversand
spark.streaming.kafka.maxRatePerPartitionforDirectKafkaapproach.InSpark1.5,wehaveintroducedafeaturecalledbackpressurethateliminatetheneedtosetthisratelimit,asSparkStreamingautomaticallyfiguresouttheratelimitsanddynamicallyadjuststhemiftheprocessingconditionschange.Thisbackpressurecanbeenabledbysettingthe
spark.streaming.backpressure.enabledto
true.
UpgradingApplicationCode
IfarunningSparkStreamingapplicationneedstobeupgradedwithnewapplicationcode,thentherearetwopossiblemechanisms.TheupgradedSparkStreamingapplicationisstartedandruninparalleltotheexistingapplication.Oncethenewone(receivingthesamedataastheoldone)hasbeenwarmedupandisreadyforprimetime,theoldonebecanbebroughtdown.Notethatthiscanbedonefordatasourcesthatsupportsendingthedatatotwodestinations(i.e.,theearlierandupgradedapplications).
Theexistingapplicationisshutdowngracefully(see
StreamingContext.stop(...)or
JavaStreamingContext.stop(...)forgracefulshutdownoptions)whichensuredatathathasbeenreceivediscompletelyprocessedbeforeshutdown.Thentheupgradedapplicationcanbestarted,whichwillstartprocessingfromthesamepointwheretheearlierapplicationleftoff.Notethatthiscanbedoneonlywithinputsourcesthatsupportsource-sidebuffering(likeKafka,andFlume)asdataneedstobebufferedwhilethepreviousapplicationwasdownandtheupgradedapplicationisnotyetup.Andrestartingfromearliercheckpointinformationofpre-upgradecodecannotbedone.ThecheckpointinformationessentiallycontainsserializedScala/Java/Pythonobjectsandtryingtodeserializeobjectswithnew,modifiedclassesmayleadtoerrors.Inthiscase,eitherstarttheupgradedappwithadifferentcheckpointdirectory,ordeletethepreviouscheckpointdirectory.
OtherConsiderations
Ifthedataisbeingreceivedbythereceiversfasterthanwhatcanbeprocessed,youcanlimittheratebysettingthespark.streaming.receiver.maxRate.
MonitoringApplications
BeyondSpark’sStreamingtabwhichshowsstatisticsaboutrunningreceivers(whetherreceiversareactive,numberofrecordsreceived,receivererror,etc.)andcompletedbatches(batchprocessingtimes,queueingdelays,etc.).Thiscanbeusedtomonitortheprogressofthestreamingapplication.
ThefollowingtwometricsinwebUIareparticularlyimportant:
ProcessingTime-Thetimetoprocesseachbatchofdata.
SchedulingDelay-thetimeabatchwaitsinaqueuefortheprocessingofpreviousbatchestofinish.
Ifthebatchprocessingtimeisconsistentlymorethanthebatchintervaland/orthequeueingdelaykeepsincreasing,thenitindicatesthatthesystemisnotabletoprocessthebatchesasfasttheyarebeinggeneratedandisfallingbehind.Inthatcase,consider
TheprogressofaSparkStreamingprogramcanalsobemonitoredusingthe
PerformanceTuning
GettingthebestperformanceoutofaSparkStreamingapplicationonaclusterrequiresabitoftuning.Thissectionexplainsanumberoftheparametersandconfigurationsthatcanbetunedtoimprovetheperformanceofyouapplication.Atahighlevel,youneedtoconsidertwothings:Reducingtheprocessingtimeofeachbatchofdatabyefficientlyusingclusterresources.
Settingtherightbatchsizesuchthatthebatchesofdatacanbeprocessedasfastastheyarereceived(thatis,dataprocessingkeepsupwiththedataingestion).
ReducingtheBatchProcessingTimes
ThereareanumberofoptimizationsthatcanbedoneinSparktominimizetheprocessingtimeofeachbatch.ThesehavebeendiscussedindetailintheLevelofParallelisminDataReceiving
Receivingdataoverthenetwork(likeKafka,Flume,socket,etc.)requiresthedatatobedeserializedandstoredinSpark.Ifthedatareceivingbecomesabottleneckinthesystem,thenconsiderparallelizingthedatareceiving.NotethateachinputDStreamcreatesasinglereceiver(runningonaworkermachine)thatreceivesasinglestreamofdata.ReceivingmultipledatastreamscanthereforebeachievedbycreatingmultipleinputDStreamsandconfiguringthemtoreceivedifferentpartitionsofthedatastreamfromthesource(s).Forexample,asingleKafkainputDStreamreceivingtwotopicsofdatacanbesplitintotwoKafkainputstreams,eachreceivingonlyonetopic.Thiswouldruntworeceivers,allowingdatatobereceivedinparallel,thusincreasingoverallthroughput.ThesemultipleDStreamscanbeunionedtogethertocreateasingleDStream.ThenthetransformationsthatwerebeingappliedonasingleinputDStreamcanbeappliedontheunifiedstream.Thisisdoneasfollows.valnumStreams=5
valkafkaStreams=(1tonumStreams).map{i=>KafkaUtils.createStream(...)}
valunifiedStream=streamingContext.union(kafkaStreams)
unifiedStream.print()
Anotherparameterthatshouldbeconsideredisthereceiver’sblockinginterval,whichisdeterminedbythe
spark.streaming.blockInterval.Formostreceivers,thereceiveddataiscoalescedtogetherintoblocksofdatabeforestoringinsideSpark’smemory.Thenumberofblocksineachbatchdeterminesthenumberoftasksthatwillbeusedtoprocessthereceiveddatainamap-liketransformation.Thenumberoftasksperreceiverperbatchwillbeapproximately(batchinterval/blockinterval).Forexample,blockintervalof200mswillcreate10tasksper2secondbatches.Ifthenumberoftasksistoolow(thatis,lessthanthenumberofcorespermachine),thenitwillbeinefficientasallavailablecoreswillnotbeusedtoprocessthedata.Toincreasethenumberoftasksforagivenbatchinterval,reducetheblockinterval.However,therecommendedminimumvalueofblockintervalisabout50ms,belowwhichthetasklaunchingoverheadsmaybeaproblem.
Analternativetoreceivingdatawithmultipleinputstreams/receiversistoexplicitlyrepartitiontheinputdatastream(using
inputStream.repartition(<numberofpartitions>)).Thisdistributesthereceivedbatchesofdataacrossthespecifiednumberofmachinesintheclusterbeforefurtherprocessing.
LevelofParallelisminDataProcessing
Clusterresourcescanbeunder-utilizedifthenumberofparalleltasksusedinanystageofthecomputationisnothighenough.Forexample,fordistributedreduceoperationslikereduceByKeyand
reduceByKeyAndWindow,thedefaultnumberofparalleltasksiscontrolledbythe
spark.default.parallelism
PairDStreamFunctionsdocumentation),orsetthe
spark.default.parallelism
DataSerialization
Theoverheadsofdataserializationcanbereducedbytuningtheserializationformats.Inthecaseofstreaming,therearetwotypesofdatathatarebeingserialized.Inputdata:Bydefault,theinputdatareceivedthroughReceiversisstoredintheexecutors’memorywith
PersistedRDDsgeneratedbyStreamingOperations:RDDsgeneratedbystreamingcomputationsmaybepersistedinmemory.Forexample,windowoperationspersistdatainmemoryastheywouldbeprocessedmultipletimes.However,unliketheSparkCoredefaultof
Inbothcases,usingKryoserializationcanreducebothCPUandmemoryoverheads.Seethe
Inspecificcaseswheretheamountofdatathatneedstoberetainedforthestreamingapplicationisnotlarge,itmaybefeasibletopersistdata(bothtypes)asdeserializedobjectswithoutincurringexcessiveGCoverheads.Forexample,ifyouareusingbatchintervalsofafewsecondsandnowindowoperations,thenyoucantrydisablingserializationinpersisteddatabyexplicitlysettingthestoragelevelaccordingly.ThiswouldreducetheCPUoverheadsduetoserialization,potentiallyimprovingperformancewithouttoomuchGCoverheads.
TaskLaunchingOverheads
Ifthenumberoftaskslaunchedpersecondishigh(say,50ormorepersecond),thentheoverheadofsendingouttaskstotheslavesmaybesignificantandwillmakeithardtoachievesub-secondlatencies.Theoverheadcanbereducedbythefollowingchanges:TaskSerialization:UsingKryoserializationforserializingtaskscanreducethetasksizes,andthereforereducethetimetakentosendthemtotheslaves.Thisiscontrolledbythe
spark.closure.serializerproperty.However,atthistime,Kryoserializationcannotbeenabledforclosureserialization.Thismayberesolvedinafuturerelease.
Executionmode:RunningSparkinStandalonemodeorcoarse-grainedMesosmodeleadstobettertasklaunchtimesthanthefine-grainedMesosmode.Pleaserefertothe
Thesechangesmayreducebatchprocessingtimeby100sofmilliseconds,thusallowingsub-secondbatchsizetobeviable.
SettingtheRightBatchInterval
ForaSparkStreamingapplicationrunningonaclustertobestable,thesystemshouldbeabletoprocessdataasfastasitisbeingreceived.Inotherwords,batchesofdatashouldbeprocessedasfastastheyarebeinggenerated.WhetherthisistrueforanapplicationcanbefoundbyDependingonthenatureofthestreamingcomputation,thebatchintervalusedmayhavesignificantimpactonthedataratesthatcanbesustainedbytheapplicationonafixedsetofclusterresources.Forexample,letusconsidertheearlierWordCountNetworkexample.Foraparticulardatarate,thesystemmaybeabletokeepupwithreportingwordcountsevery2seconds(i.e.,batchintervalof2seconds),butnotevery500milliseconds.Sothebatchintervalneedstobesetsuchthattheexpecteddatarateinproductioncanbesustained.
Agoodapproachtofigureouttherightbatchsizeforyourapplicationistotestitwithaconservativebatchinterval(say,5-10seconds)andalowdatarate.Toverifywhetherthesystemisabletokeepupwiththedatarate,youcancheckthevalueoftheend-to-enddelayexperiencedbyeachprocessedbatch(eitherlookfor“Totaldelay”inSparkdriverlog4jlogs,orusethe
MemoryTuning
TuningthememoryusageandGCbehaviorofSparkapplicationshasbeendiscussedingreatdetailintheTheamountofclustermemoryrequiredbyaSparkStreamingapplicationdependsheavilyonthetypeoftransformationsused.Forexample,ifyouwanttouseawindowoperationonthelast10minutesofdata,thenyourclustershouldhavesufficientmemorytohold10minutesworthofdatainmemory.Orifyouwanttouse
updateStateByKeywithalargenumberofkeys,thenthenecessarymemorywillbehigh.Onthecontrary,ifyouwanttodoasimplemap-filter-storeoperation,thenthenecessarymemorywillbelow.
Ingeneral,sincethedatareceivedthroughreceiversisstoredwithStorageLevel.MEMORY_AND_DISK_SER_2,thedatathatdoesnotfitinmemorywillspillovertothedisk.Thismayreducetheperformanceofthestreamingapplication,andhenceitisadvisedtoprovidesufficientmemoryasrequiredbyyourstreamingapplication.Itsbesttotryandseethememoryusageonasmallscaleandestimateaccordingly.
Anotheraspectofmemorytuningisgarbagecollection.Forastreamingapplicationthatrequireslowlatency,itisundesirabletohavelargepausescausedbyJVMGarbageCollection.
ThereareafewparametersthatcanhelpyoutunethememoryusageandGCoverheads:
PersistenceLevelofDStreams:Asmentionedearlierinthe
spark.rdd.compress),atthecostofCPUtime.
Clearingolddata:Bydefault,allinputdataandpersistedRDDsgeneratedbyDStreamtransformationsareautomaticallycleared.SparkStreamingdecideswhentoclearthedatabasedonthetransformationsthatareused.Forexample,ifyouareusingawindowoperationof10minutes,thenSparkStreamingwillkeeparoundthelast10minutesofdata,andactivelythrowawayolderdata.Datacanberetainedforalongerduration(e.g.interactivelyqueryingolderdata)bysetting
streamingContext.remember.
CMSGarbageCollector:Useoftheconcurrentmark-and-sweepGCisstronglyrecommendedforkeepingGC-relatedpausesconsistentlylow.EventhoughconcurrentGCisknowntoreducetheoverallprocessingthroughputofthesystem,itsuseisstillrecommendedtoachievemoreconsistentbatchprocessingtimes.MakesureyousettheCMSGConboththedriver(using
--driver-java-optionsin
spark-submit)andtheexecutors(using
spark.executor.extraJavaOptions).
Othertips:TofurtherreduceGCoverheads,herearesomemoretipstotry.
UseTachyonforoff-heapstorageofpersistedRDDs.Seemoredetailinthe
Usemoreexecutorswithsmallerheapsizes.ThiswillreducetheGCpressurewithineachJVMheap.
Fault-toleranceSemantics
Inthissection,wewilldiscussthebehaviorofSparkStreamingapplicationsintheeventoffailures.Background
TounderstandthesemanticsprovidedbySparkStreaming,letusrememberthebasicfault-tolerancesemanticsofSpark’sRDDs.AnRDDisanimmutable,deterministicallyre-computable,distributeddataset.EachRDDremembersthelineageofdeterministicoperationsthatwereusedonafault-tolerantinputdatasettocreateit.
IfanypartitionofanRDDislostduetoaworkernodefailure,thenthatpartitioncanbere-computedfromtheoriginalfault-tolerantdatasetusingthelineageofoperations.
AssumingthatalloftheRDDtransformationsaredeterministic,thedatainthefinaltransformedRDDwillalwaysbethesameirrespectiveoffailuresintheSparkcluster.
Sparkoperatesondatainfault-tolerantfilesystemslikeHDFSorS3.Hence,alloftheRDDsgeneratedfromthefault-tolerantdataarealsofault-tolerant.However,thisisnotthecaseforSparkStreamingasthedatainmostcasesisreceivedoverthenetwork(exceptwhen
fileStreamisused).Toachievethesamefault-tolerancepropertiesforallofthegeneratedRDDs,thereceiveddataisreplicatedamongmultipleSparkexecutorsinworkernodesinthecluster(defaultreplicationfactoris2).Thisleadstotwokindsofdatainthesystemthatneedtorecoveredintheeventoffailures:
Datareceivedandreplicated-Thisdatasurvivesfailureofasingleworkernodeasacopyofitexistsononeoftheothernodes.
Datareceivedbutbufferedforreplication-Sincethisisnotreplicated,theonlywaytorecoverthisdataistogetitagainfromthesource.
Furthermore,therearetwokindsoffailuresthatweshouldbeconcernedabout:
FailureofaWorkerNode-Anyoftheworkernodesrunningexecutorscanfail,andallin-memorydataonthosenodeswillbelost.Ifanyreceiverswererunningonfailednodes,thentheirbuffereddatawillbelost.
FailureoftheDriverNode-IfthedrivernoderunningtheSparkStreamingapplicationfails,thenobviouslytheSparkContextislost,andallexecutorswiththeirin-memorydataarelost.
Withthisbasicknowledge,letusunderstandthefault-tolerancesemanticsofSparkStreaming.
Definitions
Thesemanticsofstreamingsystemsareoftencapturedintermsofhowmanytimeseachrecordcanbeprocessedbythesystem.Therearethreetypesofguaranteesthatasystemcanprovideunderallpossibleoperatingconditions(despitefailures,etc.)Atmostonce:Eachrecordwillbeeitherprocessedonceornotprocessedatall.
Atleastonce:Eachrecordwillbeprocessedoneormoretimes.Thisisstrongerthanat-mostonceasitensurethatnodatawillbelost.Buttheremaybeduplicates.
Exactlyonce:Eachrecordwillbeprocessedexactlyonce-nodatawillbelostandnodatawillbeprocessedmultipletimes.Thisisobviouslythestrongestguaranteeofthethree.
BasicSemantics
Inanystreamprocessingsystem,broadlyspeaking,therearethreestepsinprocessingthedata.Receivingthedata:ThedataisreceivedfromsourcesusingReceiversorotherwise.
Transformingthedata:ThereceiveddataistransformedusingDStreamandRDDtransformations.
Pushingoutthedata:Thefinaltransformeddataispushedouttoexternalsystemslikefilesystems,databases,dashboards,etc.
Ifastreamingapplicationhastoachieveend-to-endexactly-onceguarantees,theneachstephastoprovideanexactly-onceguarantee.Thatis,eachrecordmustbereceivedexactlyonce,transformedexactlyonce,andpushedtodownstreamsystemsexactlyonce.Let’sunderstandthesemanticsofthesestepsinthecontextofSparkStreaming.
Receivingthedata:Differentinputsourcesprovidedifferentguarantees.Thisisdiscussedindetailinthenextsubsection.
Transformingthedata:Alldatathathasbeenreceivedwillbeprocessedexactlyonce,thankstotheguaranteesthatRDDsprovide.Eveniftherearefailures,aslongasthereceivedinputdataisaccessible,thefinaltransformedRDDswillalwayshavethesamecontents.
Pushingoutthedata:Outputoperationsbydefaultensureat-leastoncesemanticsbecauseitdependsonthetypeofoutputoperation(idempotent,ornot)andthesemanticsofthedownstreamsystem(supportstransactionsornot).Butuserscanimplementtheirowntransactionmechanismstoachieveexactly-oncesemantics.Thisisdiscussedinmoredetailslaterinthesection.
SemanticsofReceivedData
Differentinputsourcesprovidedifferentguarantees,rangingfromat-leastoncetoexactlyonce.Readformoredetails.WithFiles
Ifalloftheinputdataisalreadypresentinafault-tolerantfilesystemlikeHDFS,SparkStreamingcanalwaysrecoverfromanyfailureandprocessallofthedata.Thisgivesexactly-oncesemantics,meaningallofthedatawillbeprocessedexactlyoncenomatterwhatfails.WithReceiver-basedSources
Forinputsourcesbasedonreceivers,thefault-tolerancesemanticsdependonboththefailurescenarioandthetypeofreceiver.AswediscussedReliableReceiver-Thesereceiversacknowledgereliablesourcesonlyafterensuringthatthereceiveddatahasbeenreplicated.Ifsuchareceiverfails,thesourcewillnotreceiveacknowledgmentforthebuffered(unreplicated)data.Therefore,ifthereceiverisrestarted,thesourcewillresendthedata,andnodatawillbelostduetothefailure.
UnreliableReceiver-Suchreceiversdonotsendacknowledgmentandthereforecanlosedatawhentheyfailduetoworkerordriverfailures.
Dependingonwhattypeofreceiversareusedweachievethefollowingsemantics.Ifaworkernodefails,thenthereisnodatalosswithreliablereceivers.Withunreliablereceivers,datareceivedbutnotreplicatedcangetlost.Ifthedrivernodefails,thenbesidestheselosses,allofthepastdatathatwasreceivedandreplicatedinmemorywillbelost.Thiswillaffecttheresultsofthestatefultransformations.
Toavoidthislossofpastreceiveddata,Spark1.2introducedwriteaheadlogswhichsavethereceiveddatatofault-tolerantstorage.Withthe
Thefollowingtablesummarizesthesemanticsunderfailures:
DeploymentScenario | WorkerFailure | DriverFailure |
---|---|---|
Spark1.1orearlier,OR Spark1.2orlaterwithoutwriteaheadlogs | Buffereddatalostwithunreliablereceivers Zerodatalosswithreliablereceivers At-leastoncesemantics | Buffereddatalostwithunreliablereceivers Pastdatalostwithallreceivers Undefinedsemantics |
Spark1.2orlaterwithwriteaheadlogs | Zerodatalosswithreliablereceivers At-leastoncesemantics | Zerodatalosswithreliablereceiversandfiles At-leastoncesemantics |
WithKafkaDirectAPI
InSpark1.3,wehaveintroducedanewKafkaDirectAPI,whichcanensurethatalltheKafkadataisreceivedbySparkStreamingexactlyonce.Alongwiththis,ifyouimplementexactly-onceoutputoperation,youcanachieveend-to-endexactly-onceguarantees.Thisapproach(experimentalasofSpark1.6.0)isfurtherdiscussedintheSemanticsofoutputoperations
Outputoperations(likeforeachRDD)haveat-leastoncesemantics,thatis,thetransformeddatamaygetwrittentoanexternalentitymorethanonceintheeventofaworkerfailure.Whilethisisacceptableforsavingtofilesystemsusingthe
saveAs***Filesoperations(asthefilewillsimplygetoverwrittenwiththesamedata),additionaleffortmaybenecessarytoachieveexactly-oncesemantics.Therearetwoapproaches.
Idempotentupdates:Multipleattemptsalwayswritethesamedata.Forexample,
saveAs***Filesalwayswritesthesamedatatothegeneratedfiles.
Transactionalupdates:Allupdatesaremadetransactionallysothatupdatesaremadeexactlyonceatomically.Onewaytodothiswouldbethefollowing.
Usethebatchtime(availablein
foreachRDD)andthepartitionindexoftheRDDtocreateanidentifier.Thisidentifieruniquelyidentifiesablobdatainthestreamingapplication.
Updateexternalsystemwiththisblobtransactionally(thatis,exactlyonce,atomically)usingtheidentifier.Thatis,iftheidentifierisnotalreadycommitted,committhepartitiondataandtheidentifieratomically.Else,ifthiswasalreadycommitted,skiptheupdate.
dstream.foreachRDD{(rdd,time)=>
rdd.foreachPartition{partitionIterator=>
valpartitionId=TaskContext.get.partitionId()
valuniqueId=generateUniqueId(time.milliseconds,partitionId)
//usethisuniqueIdtotransactionallycommitthedatainpartitionIterator
}
}
MigrationGuidefrom0.9.1orbelowto1.x
BetweenSpark0.9.1andSpark1.0,therewereafewAPIchangesmadetoensurefutureAPIstability.Thissectionelaboratesthestepsrequiredtomigrateyourexistingcodeto1.0.InputDStreams:Alloperationsthatcreateaninputstream(e.g.,
StreamingContext.socketStream,
FlumeUtils.createStream,etc.)nowreturns
CustomNetworkReceivers:SincethereleasetoSparkStreaming,customnetworkreceiverscouldbedefinedinScalausingtheclassNetworkReceiver.However,theAPIwaslimitedintermsoferrorhandlingandreporting,andcouldnotbeusedfromJava.StartingSpark1.0,thisclasshasbeenreplacedby
Methodslike
stopand
restarthavebeenaddedtoforbettercontrolofthelifecycleofareceiver.Seethe
CustomreceiverscanbeimplementedusingbothScalaandJava.
TomigrateyourexistingcustomreceiversfromtheearlierNetworkReceivertothenewReceiver,youhavetodothefollowing.
Makeyourcustomreceiverclassextend
org.apache.spark.streaming.receiver.Receiverinsteadof
org.apache.spark.streaming.dstream.NetworkReceiver.
Earlier,aBlockGeneratorobjecthadtobecreatedbythecustomreceiver,towhichreceiveddatawasaddedforbeingstoredinSpark.Ithadtobeexplicitlystartedandstoppedfrom
onStart()and
onStop()methods.ThenewReceiverclassmakesthisunnecessaryasitaddsasetofmethodsnamed
store(<data>)thatcanbecalledtostorethedatainSpark.So,tomigrateyourcustomnetworkreceiver,removeanyBlockGeneratorobject(doesnotexistanymoreinSpark1.0anyway),anduse
store(...)methodsonreceiveddata.
Actor-basedReceivers:DatacouldhavebeenreceivedusinganyAkkaActorsbyextendingtheactorclasswith
org.apache.spark.streaming.receivers.Receivertrait.Thishasbeenrenamedto
org.apache.spark.streaming.receiver.ActorHelperandthe
pushBlock(...)methodstostorereceiveddatahasbeenrenamedto
store(...).Otherhelperclassesinthe
org.apache.spark.streaming.receiverspackagewerealsomovedto
org.apache.spark.streaming.receiverpackageandrenamedforbetterclarity.
WheretoGofromHere
AdditionalguidesAPIdocumentation
Scaladocs
Javadocs
Pythondocs
Moreexamplesin
相关文章推荐
- BZOJ 1088 [SCOI 2005] 扫雷Mine 题解&代码
- 收集的编程学习的网址
- 一段多线程锁机制的代码,看完你会提高不少。
- Java并发之volatile
- netbeans c/c++ (or gcc) & nasm co-work
- C# winForm 将窗体状态栏StatusStrip示例(显示当前时间)
- Java 代码创建selector
- LinkedList双链表简单分析,及单链表的实现代码
- Java环境配置
- python-ldap
- C语言学习之关键字第九讲
- 菜鸟学python(2) 常见运算符
- 21. Merge Two Sorted Lists leetcode Python 2016 new Season
- springmvc配置
- eclipse 中设置content assist 如何设置空格不上屏
- CF Good Bye 2015 F. New Year and Cleaning(思维)
- [Github] fatal: remote error: You can't push to git 解决办法
- W3c phpl基础参考手册-2016-01参考
- C语言tips不完全汇总
- c++ new