Spark: Best practice for retrieving big data from RDD to local machine
2016-06-30 20:03
621 查看
'vegotbigRDD(1gb)inyarncluster.Onlocalmachine,whichusethisclusterIhaveonly512mb.I'dliketoiterateovervaluesinRDDonmylocalmachine.Ican'tusecollect(),becauseitwouldcreatetoobigarraylocallywhichmorethenmyheap.Ineedsomeiterativeway.Thereismethoditerator(),butitrequiressomeadditionalinformation,Ican'tprovide. UDP:commitedtoLocalIteratormethod
| |||||||||
|
5Answers
upvote26downvoteaccepted | Update:RDD.toLocalIteratormethodthatappearedaftertheoriginalanswerhasbeenwrittenisamoreefficientwaytodothejob.Ituses runJobtoevaluateonlyasinglepartitiononeachstep. TL;DRAndtheoriginalanswermightgivearoughideahowitworks: Firstofall,getthearrayofpartitionindexes: valparts=rdd.partitions Thencreatesmallerrddsfilteringouteverythingbutasinglepartition.Collectthedatafromsmallerrddsanditerateovervaluesofasinglepartition: for(p<-parts){ validx=p.index valpartRdd=rdd.mapPartitionsWithIndex(a=>if(a._1==idx)a._2elseIterator(),true) //Thesecondargumentistruetoavoidrddreshuffling valdata=partRdd.collect//datacontainsallvaluesfromasinglepartition //intheformofarray //Nowyoucandowiththedatawhateveryouwant:iterate,savetoafile,etc. } Ididn'ttrythiscode,butitshouldwork.Pleasewriteacommentifitwon'tcompile.Ofcause,itwillworkonlyifthepartitionsaresmallenough.Iftheyaren't,youcanalwaysincreasethenumberofpartitionswith rdd.coalesce(numParts,true).
| ||||||||||||
|
Didyoufindthisquestioninteresting?Tryournewsletter
Signupforournewsletterandgetourtopnewquestionsdeliveredtoyourinbox(upvote10downvote | Wildfireanswerseemssemanticallycorrect,butI'msureyoushouldbeabletobevastlymoreefficientbyusingtheAPIofSpark.Ifyouwanttoprocesseachpartitioninturn,Idon'tseewhyyoucan'tusingmap/ filter/ reduce/ reduceByKey/ mapPartitionsoperations.Theonlytimeyou'dwanttohaveeverythinginoneplaceinonearrayiswhenyourgoingtoperformanon-monoidaloperation-butthatdoesn'tseemtobewhatyouwant.Youshouldbeabletodosomethinglike: rdd.mapPartitions(recordsIterator=>yourcodethatprocessesasinglechunk) Orthis rdd.foreachPartition(partition=>{ partition.toArray //Yourcode })
| ||||||||||||
|
upvote5downvote | Hereisthesameapproachas Thenicethingaboutthisapproach-itletsuseraccessrecordsinRDDinorder.I'musingthiscodetofeeddatafromRDDintoSTDINofthemachinelearningtool'sprocess. rdd=sc.parallelize(range(100),10) defmake_part_filter(index): defpart_filter(split_index,iterator): ifsplit_index==index: foreliniterator: yieldel returnpart_filter forpart_idinrange(rdd.getNumPartitions()): part_rdd=rdd.mapPartitionsWithIndex(make_part_filter(part_id),True) data_from_part_rdd=part_rdd.collect() print"partitionid:%selements:%s"%(part_id,data_from_part_rdd) Producesoutput: partitionid:0elements:[0,1,2,3,4,5,6,7,8,9] partitionid:1elements:[10,11,12,13,14,15,16,17,18,19] partitionid:2elements:[20,21,22,23,24,25,26,27,28,29] partitionid:3elements:[30,31,32,33,34,35,36,37,38,39] partitionid:4elements:[40,41,42,43,44,45,46,47,48,49] partitionid:5elements:[50,51,52,53,54,55,56,57,58,59] partitionid:6elements:[60,61,62,63,64,65,66,67,68,69] partitionid:7elements:[70,71,72,73,74,75,76,77,78,79] partitionid:8elements:[80,81,82,83,84,85,86,87,88,89] partitionid:9elements:[90,91,92,93,94,95,96,97,98,99]
| |||
addacomment |
upvote1downvote | Map/filter/reduceusingSparkanddownloadtheresultslater?IthinkusualHadoopapproachwillwork. Apisaysthattherearemap-filter-saveAsFilecommands:
| ||||||||||||
|
upvote1downvote | ForSpark1.3.1,theformatisasfollowsvalparts=rdd.partitions for(p<-parts){ validx=p.index valpartRdd=data.mapPartitionsWithIndex{ case(index:Int,value:Iterator[(String,String,Float)])=> if(index==idx)valueelseIterator()} valdataPartitioned=partRdd.collect //Applyfurtherprocessingondata } |
相关文章推荐
- Redis的持久化-AOF
- 面试题20:顺时针打印矩阵
- GMap.Net开发之在WinForm和WPF中使用GMap.Net地图插件
- caffe数据格式(Google Protocol Buffers)
- linux vmstat 1 ,watch , pmap -p,
- Java编程思想第四版读书笔记——第十三章 字符串
- 成长的路上一直有你——英语
- VS 和Visual Assist X快捷键
- ddd 调试器配置
- 黑马程序员——多线程(上)
- JS截字符串处理数字,汉字,英文问题
- GIT使用小结
- Yii1 跨模块调用模型
- 本机访问VMWare Ubuntu 14.04 Tomcat站点配置
- C语言(++地址)和(++数值)的区别
- 项目路由设置
- wpf自定义控件位置
- FPGArduino在DE2-35上的移植
- Java SE-(基本数据类型)类型转换
- C++17、STL——Vector