您的位置:首页 > 其它

Spark流处理图片转格式

2014-05-01 22:42 169 查看

Spark流处理图片转格式

说明

Qimage是我自定义的数据类型,作为value,所以没有用WritableComparable

QimageInputFormat<Text,Qimage>是自定义的输入格式

QimageRecordReader<Text,Qimage>是自定义的RecordReader

QimageOutputFormat<Text,Qimage>是自定义的输出格式

QimageRecordWriter<Text,Qimage>是自定义的RecordWriter

这个数据结构已经在hadoop上跑通了,现在转移到spark上进行流式的处理,实现了输入png格式的图片,输出bmp格式的图片,在写代码的时候遇到了几个问题。

1)在spark中怎么使用自定义的数据类型

2)怎么将自定义的数据类型保存到hdfs上

spark的流式为hdfs目录检测方式。

DataSerialization

Serializationplaysanimportantroleintheperformanceofanydistributedapplication.Formatsthatareslowtoserializeobjectsinto,orconsumealargenumberofbytes,willgreatlyslowdownthecomputation.Often,thiswill
bethefirstthingyoushouldtunetooptimizeaSparkapplication.Sparkaimstostrikeabalancebetweenconvenience(allowingyoutoworkwithanyJavatypeinyouroperations)andperformance.Itprovidestwoserializationlibraries:

·Javaserialization:By
default,SparkserializesobjectsusingJava’s
ObjectOutputStream
framework,andcanworkwithanyclassyoucreatethatimplements
java.io.Serializable
.
Youcanalsocontroltheperformanceofyourserializationmorecloselybyextending
java.io.Externalizable
.
Javaserializationisflexiblebutoftenquiteslow,andleadstolargeserializedformatsformanyclasses.
·Kryoserialization:SparkcanalsousetheKryolibrary(version
2)toserializeobjectsmorequickly.KryoissignificantlyfasterandmorecompactthanJavaserialization(oftenasmuchas10x),butdoesnotsupportall
Serializable
types
andrequiresyoutoregistertheclassesyou’lluseintheprograminadvanceforbestperformance.
YoucanswitchtousingKryobyinitializingyourjobwithaSparkConfandcalling
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.
ThissettingconfigurestheserializerusedfornotonlyshufflingdatabetweenworkernodesbutalsowhenserializingRDDstodisk.TheonlyreasonKryoisnotthedefaultisbecauseofthecustomregistrationrequirement,butwerecommendtryingitinanynetwork-intensiveapplication.

Finally,toregisteryourclasseswithKryo,createapublicclassthatextends
org.apache.spark.serializer.KryoRegistrator
and
setthe
spark.kryo.registrator
configpropertytopointtoit,asfollows:

importcom.esotericsoftware.kryo.Kryo

importorg.apache.spark.serializer.KryoRegistrator


classMyRegistratorextendsKryoRegistrator{

overridedefregisterClasses(kryo:Kryo){[/code]
kryo.register(classOf[MyClass1])[/code]
kryo.register(classOf[MyClass2])[/code]
}[/code]
}


valconf=newSparkConf().setMaster(...).setAppName(...)

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

conf.set("spark.kryo.registrator","mypackage.MyRegistrator")

valsc=newSparkContext(conf)

TheKryodocumentationdescribesmoreadvancedregistrationoptions,suchasaddingcustomserializationcode.

Ifyourobjectsarelarge,youmayalsoneedtoincreasethe
spark.kryoserializer.buffer.mb
configproperty.
Thedefaultis2,butthisvalueneedstobelargeenoughtoholdthelargestobjectyouwillserialize.

Finally,ifyoudon’tregisteryourclasses,Kryowillstillwork,butitwillhavetostorethefullclassnamewitheachobject,whichiswasteful.

有关自定义数据类型在spark上的输入输出函数

来源于官方源码

textFileStream

/**

*CreateainputstreamthatmonitorsaHadoop-compatiblefilesystem

*fornewfilesandreadsthemastextfiles(usingkeyasLongWritable,value

*asTextandinputformatasTextInputFormat).Filesmustbewrittentothe

*monitoreddirectoryby"moving"themfromanotherlocationwithinthesame

*filesystem.Filenamesstartingwith.areignored.

*@paramdirectoryHDFSdirectorytomonitorfornewfile

*/

deftextFileStream(directory:String):DStream[String]={

fileStream[LongWritable,Text,TextInputFormat](directory).map(_._2.toString)

}

fileStream

/**

*CreateainputstreamthatmonitorsaHadoop-compatiblefilesystem

*fornewfilesandreadsthemusingthegivenkey-valuetypesandinputformat.

*Filesmustbewrittentothemonitoreddirectoryby"moving"themfromanother

*locationwithinthesamefilesystem.Filenamesstartingwith.areignored.

*@paramdirectoryHDFSdirectorytomonitorfornewfile

*@tparamKKeytypeforreadingHDFSfile

*@tparamVValuetypeforreadingHDFSfile

*@tparamFInputformatforreadingHDFSfile

*/

deffileStream[

K:ClassTag,

V:ClassTag,

F<:NewInputFormat[K,V]:ClassTag

](directory:String):DStream[(K,V)]={

newFileInputDStream[K,V,F](this,directory)

}

newAPIHadoopFile

/**

*GetanRDDforagivenHadoopfilewithanarbitrarynewAPIInputFormat

*andextraconfigurationoptionstopasstotheinputformat.

*

*'''Note:'''BecauseHadoop'sRecordReaderclassre-usesthesameWritableobjectforeach

*record,directlycachingthereturnedRDDwillcreatemanyreferencestothesameobject.

*IfyouplantodirectlycacheHadoopwritableobjects,youshouldfirstcopythemusing

*a`map`function.

*/

defnewAPIHadoopFile[K,V,F<:NewInputFormat[K,V]](

path:String,

fClass:Class[F],

kClass:Class[K],

vClass:Class[V],

conf:Configuration=hadoopConfiguration):RDD[(K,V)]={

valjob=newNewHadoopJob(conf)

NewFileInputFormat.addInputPath(job,newPath(path))

valupdatedConf=job.getConfiguration

newNewHadoopRDD(this,fClass,kClass,vClass,updatedConf)

}

/**

*GetanRDDforagivenHadoopfilewithanarbitrarynewAPIInputFormat

*andextraconfigurationoptionstopasstotheinputformat.

*

*'''Note:'''BecauseHadoop'sRecordReaderclassre-usesthesameWritableobjectforeach

*record,directlycachingthereturnedRDDwillcreatemanyreferencestothesameobject.

*IfyouplantodirectlycacheHadoopwritableobjects,youshouldfirstcopythemusing

*a`map`function.

*/

defnewAPIHadoopRDD[K,V,F<:NewInputFormat[K,V]](

conf:Configuration=hadoopConfiguration,

fClass:Class[F],

kClass:Class[K],

vClass:Class[V]):RDD[(K,V)]={

newNewHadoopRDD(this,fClass,kClass,vClass,conf)

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: