Spark流处理图片转格式
2014-05-01 22:42
169 查看
Spark流处理图片转格式
说明
Qimage是我自定义的数据类型,作为value,所以没有用WritableComparableQimageInputFormat<Text,Qimage>是自定义的输入格式
QimageRecordReader<Text,Qimage>是自定义的RecordReader
QimageOutputFormat<Text,Qimage>是自定义的输出格式
QimageRecordWriter<Text,Qimage>是自定义的RecordWriter
这个数据结构已经在hadoop上跑通了,现在转移到spark上进行流式的处理,实现了输入png格式的图片,输出bmp格式的图片,在写代码的时候遇到了几个问题。
1)在spark中怎么使用自定义的数据类型
2)怎么将自定义的数据类型保存到hdfs上
spark的流式为hdfs目录检测方式。
DataSerialization
Serializationplaysanimportantroleintheperformanceofanydistributedapplication.Formatsthatareslowtoserializeobjectsinto,orconsumealargenumberofbytes,willgreatlyslowdownthecomputation.Often,thiswillbethefirstthingyoushouldtunetooptimizeaSparkapplication.Sparkaimstostrikeabalancebetweenconvenience(allowingyoutoworkwithanyJavatypeinyouroperations)andperformance.Itprovidestwoserializationlibraries:
·
default,SparkserializesobjectsusingJava’s
ObjectOutputStreamframework,andcanworkwithanyclassyoucreatethatimplements
.java.io.Serializable
Youcanalsocontroltheperformanceofyourserializationmorecloselybyextending
java.io.Externalizable.
Javaserializationisflexiblebutoftenquiteslow,andleadstolargeserializedformatsformanyclasses.
·
2)toserializeobjectsmorequickly.KryoissignificantlyfasterandmorecompactthanJavaserialization(oftenasmuchas10x),butdoesnotsupportall
Serializabletypes
andrequiresyoutoregistertheclassesyou’lluseintheprograminadvanceforbestperformance.
YoucanswitchtousingKryobyinitializingyourjobwitha
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer").
ThissettingconfigurestheserializerusedfornotonlyshufflingdatabetweenworkernodesbutalsowhenserializingRDDstodisk.TheonlyreasonKryoisnotthedefaultisbecauseofthecustomregistrationrequirement,butwerecommendtryingitinanynetwork-intensiveapplication.
Finally,toregisteryourclasseswithKryo,createapublicclassthatextends
andorg.apache.spark.serializer.KryoRegistrator
setthe
spark.kryo.registratorconfigpropertytopointtoit,asfollows:
importcom.esotericsoftware.kryo.Kryo
importorg.apache.spark.serializer.KryoRegistrator
classMyRegistratorextendsKryoRegistrator{
overridedefregisterClasses(kryo:Kryo){[/code]
kryo.register(classOf[MyClass1])[/code]
kryo.register(classOf[MyClass2])[/code]
}[/code]
}
valconf=newSparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator","mypackage.MyRegistrator")
valsc=newSparkContext(conf)
The
Ifyourobjectsarelarge,youmayalsoneedtoincreasethe
spark.kryoserializer.buffer.mbconfigproperty.
Thedefaultis2,butthisvalueneedstobelargeenoughtoholdthelargestobjectyouwillserialize.
Finally,ifyoudon’tregisteryourclasses,Kryowillstillwork,butitwillhavetostorethefullclassnamewitheachobject,whichiswasteful.
有关自定义数据类型在spark上的输入输出函数
来源于官方源码textFileStream
/***CreateainputstreamthatmonitorsaHadoop-compatiblefilesystem
*fornewfilesandreadsthemastextfiles(usingkeyasLongWritable,value
*asTextandinputformatasTextInputFormat).Filesmustbewrittentothe
*monitoreddirectoryby"moving"themfromanotherlocationwithinthesame
*filesystem.Filenamesstartingwith.areignored.
*@paramdirectoryHDFSdirectorytomonitorfornewfile
*/
deftextFileStream(directory:String):DStream[String]={
fileStream[LongWritable,Text,TextInputFormat](directory).map(_._2.toString)
}
fileStream
/***CreateainputstreamthatmonitorsaHadoop-compatiblefilesystem
*fornewfilesandreadsthemusingthegivenkey-valuetypesandinputformat.
*Filesmustbewrittentothemonitoreddirectoryby"moving"themfromanother
*locationwithinthesamefilesystem.Filenamesstartingwith.areignored.
*@paramdirectoryHDFSdirectorytomonitorfornewfile
*@tparamKKeytypeforreadingHDFSfile
*@tparamVValuetypeforreadingHDFSfile
*@tparamFInputformatforreadingHDFSfile
*/
deffileStream[
K:ClassTag,
V:ClassTag,
F<:NewInputFormat[K,V]:ClassTag
](directory:String):DStream[(K,V)]={
newFileInputDStream[K,V,F](this,directory)
}
newAPIHadoopFile
/***GetanRDDforagivenHadoopfilewithanarbitrarynewAPIInputFormat
*andextraconfigurationoptionstopasstotheinputformat.
*
*'''Note:'''BecauseHadoop'sRecordReaderclassre-usesthesameWritableobjectforeach
*record,directlycachingthereturnedRDDwillcreatemanyreferencestothesameobject.
*IfyouplantodirectlycacheHadoopwritableobjects,youshouldfirstcopythemusing
*a`map`function.
*/
defnewAPIHadoopFile[K,V,F<:NewInputFormat[K,V]](
path:String,
fClass:Class[F],
kClass:Class[K],
vClass:Class[V],
conf:Configuration=hadoopConfiguration):RDD[(K,V)]={
valjob=newNewHadoopJob(conf)
NewFileInputFormat.addInputPath(job,newPath(path))
valupdatedConf=job.getConfiguration
newNewHadoopRDD(this,fClass,kClass,vClass,updatedConf)
}
/**
*GetanRDDforagivenHadoopfilewithanarbitrarynewAPIInputFormat
*andextraconfigurationoptionstopasstotheinputformat.
*
*'''Note:'''BecauseHadoop'sRecordReaderclassre-usesthesameWritableobjectforeach
*record,directlycachingthereturnedRDDwillcreatemanyreferencestothesameobject.
*IfyouplantodirectlycacheHadoopwritableobjects,youshouldfirstcopythemusing
*a`map`function.
*/
defnewAPIHadoopRDD[K,V,F<:NewInputFormat[K,V]](
conf:Configuration=hadoopConfiguration,
fClass:Class[F],
kClass:Class[K],
vClass:Class[V]):RDD[(K,V)]={
newNewHadoopRDD(this,fClass,kClass,vClass,conf)
}
相关文章推荐
- 并查集向量偏移
- 腾讯、网易、新浪新闻网站爬虫编写记录及评论格式分析
- Prime Gap
- 潘悟云教授“新世纪语言学的转型“讲座笔记
- Linux改变文件属性与权限
- 主/从DNS
- Binomial Coefficients
- Android启动器(Launcher)开发详解
- MySQL Cookbook读书笔记第三章
- Relatives
- 对比Windows Phone与iOS、Android开发的不同[转]
- CentOS 设置网络(修改IP&修改网关&修改DNS)--update.14.08.15
- android ActionBar(官方指导)
- SQL server 2000安装时“以前的某个程序安装已在安装计算机上创建挂起”
- Divisors
- LVS Nginx HAProxy 优缺点
- SWT让窗体居中显示
- 基于Opencv下RGB图像转HSV,并分离成单通道R/G/B与H/S/V
- HDU1878欧拉回路
- 微信公众平台开发(89) 高级群发接口