您的位置:首页 > 其它

Spark: Best practice for retrieving big data from RDD to local machine

2016-06-30 20:03 621 查看
'vegotbigRDD(1gb)inyarncluster.Onlocalmachine,whichusethisclusterIhaveonly512mb.I'dliketoiterateovervaluesinRDDonmylocalmachine.Ican'tusecollect(),becauseitwouldcreatetoobigarraylocallywhichmorethenmyheap.Ineedsomeiterativeway.Thereismethoditerator(),butitrequiressomeadditionalinformation,Ican'tprovide.

UDP:commitedtoLocalIteratormethod

apache-spark
shareimprovethisquestion
editedApr10'14at13:56

askedFeb11'14at9:55



epahomov
111117

toLocalIteratorisnotidealifyouwanttoiteratelocallyoverapartitionatatime–LandonKuhnOct29'14at2:25
2
@LandonKuhnwhynot?–TomYubingDongAug4'15at23:02
addacomment

5Answers

activeoldestvotes

upvote26downvoteaccepted
Update:
RDD.toLocalIterator
methodthatappearedaftertheoriginalanswerhasbeenwrittenisamoreefficientwaytodothejob.Ituses
runJob
toevaluateonlyasinglepartitiononeachstep.

TL;DRAndtheoriginalanswermightgivearoughideahowitworks:

Firstofall,getthearrayofpartitionindexes:

valparts=rdd.partitions

Thencreatesmallerrddsfilteringouteverythingbutasinglepartition.Collectthedatafromsmallerrddsanditerateovervaluesofasinglepartition:

for(p<-parts){
validx=p.index
valpartRdd=rdd.mapPartitionsWithIndex(a=>if(a._1==idx)a._2elseIterator(),true)
//Thesecondargumentistruetoavoidrddreshuffling
valdata=partRdd.collect//datacontainsallvaluesfromasinglepartition
//intheformofarray
//Nowyoucandowiththedatawhateveryouwant:iterate,savetoafile,etc.
}

Ididn'ttrythiscode,butitshouldwork.Pleasewriteacommentifitwon'tcompile.Ofcause,itwillworkonlyifthepartitionsaresmallenough.Iftheyaren't,youcanalwaysincreasethenumberofpartitionswith
rdd.coalesce(numParts,true)
.

shareimprovethisanswer
editedNov18'15at8:36

answeredFeb15'14at18:33



Wildfire
4,53811739

doesthiscodecauseeachpartitiontobecomputedinserialwhenitloopsthroughandcallmapPartitionsWithIndex?What'sthebestwaytoremedythis?–foboi1122Nov18'15at0:42
@foboi1122Pleaseseeupdatedanswer–WildfireNov18'15at8:36
@WildfireWillthisapproachresolvethis.Elsehowtoresolveusinganyormightbethisapproach.–ChikuMiku2daysago
addacomment


Didyoufindthisquestioninteresting?Tryournewsletter

Signupforournewsletterandgetourtopnewquestionsdeliveredtoyourinbox(seeanexample).

upvote10downvote
Wildfireanswerseemssemanticallycorrect,butI'msureyoushouldbeabletobevastlymoreefficientbyusingtheAPIofSpark.Ifyouwanttoprocesseachpartitioninturn,Idon'tseewhyyoucan'tusing
map
/
filter
/
reduce
/
reduceByKey
/
mapPartitions
operations.Theonlytimeyou'dwanttohaveeverythinginoneplaceinonearrayiswhenyourgoingtoperformanon-monoidaloperation-butthatdoesn'tseemtobewhatyouwant.Youshouldbeabletodosomethinglike:

rdd.mapPartitions(recordsIterator=>yourcodethatprocessesasinglechunk)

Orthis

rdd.foreachPartition(partition=>{
partition.toArray
//Yourcode
})


shareimprovethisanswer
editedApr3'14at15:55

answeredMar30'14at11:05



samthebest
10.4k54369

Is'ttheseoperatorsexecuteoncluster?–epahomovApr3'14at7:05
1
Yesitwill,butwhyareyouavoidingthat?Ifyoucanprocesseachchunkinturn,youshouldbeabletowritethecodeinsuchawaysoitcandistribute-likeusing
aggregate
.–samthebestApr3'14at15:54
Isnottheiteratorreturnedby
forEachPartitition
thedataiteratorforasinglepartition-andnotaniteratorofallpartitions?–javadbaMay20at8:23
addacomment
upvote5downvote
Hereisthesameapproachassuggestedby@Wildlifebutwritteninpyspark.

Thenicethingaboutthisapproach-itletsuseraccessrecordsinRDDinorder.I'musingthiscodetofeeddatafromRDDintoSTDINofthemachinelearningtool'sprocess.

rdd=sc.parallelize(range(100),10)
defmake_part_filter(index):
defpart_filter(split_index,iterator):
ifsplit_index==index:
foreliniterator:
yieldel
returnpart_filter

forpart_idinrange(rdd.getNumPartitions()):
part_rdd=rdd.mapPartitionsWithIndex(make_part_filter(part_id),True)
data_from_part_rdd=part_rdd.collect()
print"partitionid:%selements:%s"%(part_id,data_from_part_rdd)

Producesoutput:

partitionid:0elements:[0,1,2,3,4,5,6,7,8,9]
partitionid:1elements:[10,11,12,13,14,15,16,17,18,19]
partitionid:2elements:[20,21,22,23,24,25,26,27,28,29]
partitionid:3elements:[30,31,32,33,34,35,36,37,38,39]
partitionid:4elements:[40,41,42,43,44,45,46,47,48,49]
partitionid:5elements:[50,51,52,53,54,55,56,57,58,59]
partitionid:6elements:[60,61,62,63,64,65,66,67,68,69]
partitionid:7elements:[70,71,72,73,74,75,76,77,78,79]
partitionid:8elements:[80,81,82,83,84,85,86,87,88,89]
partitionid:9elements:[90,91,92,93,94,95,96,97,98,99]


shareimprovethisanswer
editedJun5'15at20:14

answeredJun5'15at20:07



vvladymyrov
2,9781124

addacomment
upvote1downvote
Map/filter/reduceusingSparkanddownloadtheresultslater?IthinkusualHadoopapproachwillwork.

Apisaysthattherearemap-filter-saveAsFilecommands:https://spark.incubator.apache.org/docs/0.8.1/scala-programming-guide.html#transformations

shareimprovethisanswer
answeredFeb11'14at10:09



ya_pulser
1,2601715

Badoption.Idon'twanttodoserialization/deserialization.SoIwantthisdataretrievingfromspark–epahomovFeb11'14at10:37
Howdoyouintendtoget1gbwithoutserde(i.e.storingonthedisk.)?onanodewith512mb?–scrapcodesFeb12'14at9:13
1
ByiteratingovertheRDD.Youshouldbeabletogeteachpartitioninsequencetosendeachdataiteminsequencetothemaster,whichcanthenpullthemoffthenetworkandworkonthem.–interfectFeb12'14at18:07
addacomment
upvote1downvote
ForSpark1.3.1,theformatisasfollows

valparts=rdd.partitions
for(p<-parts){
validx=p.index
valpartRdd=data.mapPartitionsWithIndex{
case(index:Int,value:Iterator[(String,String,Float)])=>
if(index==idx)valueelseIterator()}
valdataPartitioned=partRdd.collect
//Applyfurtherprocessingondata
}



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: