您的位置:首页 > 其它

DistributedCache小记

2015-06-11 11:34 225 查看
一、DistributedCache简介

DistributedCache是hadoop框架提供的一种机制,可以将job指定的文件,在job执行前,先行分发到task执行的机器上,并有相关机制对cache文件进行管理.

常见的应用场景有:
分发第三方库(jar,so等);分发算法需要的词典文件;分发程序运行需要的配置;分发多表数据join时小表数据简便处理等

主要的注意事项有:
1.DistributedCache只能应用于分布式的情况,包括伪分布式,完全分布式.有些api在这2种情况下有移植性问题.
2.需要分发的文件,必须提前放到hdfs上.默认的路径前缀是hdfs://的,不是file://
3.需要分发的文件,最好在运行期间是只读的.
4.不建议分发较大的文件,比如压缩文件,可能会影响task的启动速度.

二、DistributedCache的相关配置

MRv1
属性名默认值备注
mapred.local.dir
${hadoop.tmp.dir}/mapred/local
ThelocaldirectorywhereMapReducestoresintermediatedatafiles.Maybeacomma-separatedlistofdirectoriesondifferentdevicesinordertospreaddiski/o.Directoriesthatdonotexistareignored.
local.cache.size
10737418240(10G)ThenumberofbytestoallocateineachlocalTaskTrackerdirectoryforholdingDistributedCachedata.
mapreduce.tasktracker.cache.local.numberdirectories
10000Themaximumnumberofsubdirectoriesthatshouldbecreatedinanyparticulardistributedcachestore.Afterthismanydirectorieshavebeencreated,cacheitemswillbeexpungedregardlessofwhetherthetotalsize
thresholdhasbeenexceeded.
mapreduce.tasktracker.cache.local.keep.pct
0.95(作用于上面2个参数)Itisthetargetpercentageofthelocaldistributedcachethatshouldbekeptinbetweengarbagecollectionruns.InpracticeitwilldeleteunuseddistributedcacheentriesinLRUorderuntilthesizeofthecache
islessthanmapreduce.tasktracker.cache.local.keep.pctofthemaximumcachesize.Thisisafloatingpointvaluebetween0.0and1.0.Thedefaultis0.95.
MRv2

新的yarn架构的代码还没有看过,不过从配置里可以看出相关的如下配置,本文主要基于MRv1.
yarn.nodemanager.local-dirs
yarn.nodemanager.delete.debug-delay-sec
yarn.nodemanager.local-cache.max-files-per-directory
yarn.nodemanager.localizer.cache.cleanup.interval-ms
yarn.nodemanager.localizer.cache.target-size-mb

三、DistributedCache的使用方式

1.通过配置
可以配置这三个属性值:
mapred.cache.files,
mapred.cache.archives,
mapred.create.symlink(值设为yes如果要建link的话)
如果要分发的文件有多个的话,要以逗号分隔(貌似在建link的时候,逗号分隔前后还不能有空格,否则会报错)

2.使用命令行
在pipes和streaming里面可能会用到
-filesSpecifycomma-separatedfilestobecopiedtotheMap/Reducecluster
-libjarsSpecifycomma-separatedjarfilestoincludeintheclasspath
-archivesSpecifycomma-separatedarchivestobeunarchivedonthecomputemachines

例如:
-fileshdfs://host:fs_port/user/testfile.txt
-fileshdfs://host:fs_port/user/testfile.txt#testfile
-fileshdfs://host:fs_port/user/testfile1.txt,hdfs://host:fs_port/user/testfile2.txt
-archiveshdfs://host:fs_port/user/testfile.jar
-archiveshdfs://host:fs_port/user/testfile.tgz#tgzdir

3.代码调用
DistributedCache.addCacheFile(URI,conf)/DistributedCache.addCacheArchive(URI,conf)
DistributedCache.setCacheFiles(URIs,conf)/DistributedCache.setCacheArchives(URIs,conf)
如果要建link,需要增加DistributedCache.createSymlink(Configuration)

获取cache文件可以使用
getLocalCacheFiles(Configurationconf)

getLocalCacheArchives(Configurationconf)

代码调用常常会有各样的问题,一般我比较倾向于通过createSymlink的方式来使用,就把cache当做当前目录的文件来操作,简单很多.
常见的通过代码来读取cache文件的问题如下:
a.getLocalCacheFiles在伪分布式情况下,常常返回null.
b.getLocalCacheFiles其实是把DistributedCache中的所有文件都返回.需要自己筛选出所需的文件.archives也有类似的问题.
c.getLocalCacheFiles返回的是tt机器本地文件系统的路径,使用的时候要注意,因为很多地方默认的都是hdfs://,可以自己加上file://来避免这个问题

4.symlink
给分发的文件,在task运行的当前工作目录建立软连接,在使用起来的时候会更方便.没有上面的各种麻烦
mapred.create.symlink需要设置为yes,不是true或Y之类哦

5.实际文件存放情况
下图显示的为tt机器上实际文件的状况(只有yarn集群的截图)




四、DistributedCache的内部基本流程

1.每个tasktracker启动时,都会产生一个TrackerDistributedCacheManager对象,用来管理该tt机器上所有的task的cache文件.
2.在客户端提交job时,在JobClient内,对即将cache的文件,进行校验
以确定文件是否存在,文件的大小,文件的修改时间,以及文件的权限是否是privateorpublic.
3.当task在tt初始化job时,会由TrackerDistributedCacheManager产生一个TaskDistributedCacheManager对象,来管理本task的cache文件.
4.和本task相关联的TaskDistributedCacheManager,获取并解压相关cache文件到本地相应目录
如果本tt机器上已经有了本job的其他task,并已经完成了相应cache文件的获取和解压工作,则不会重复进行.
如果本地已经有了cache文件,则比较修改时间和hdfs上的文件是否一致,如果一致则可以使用.
5.当task结束时,会对该cache进行ref减一操作.
6.TrackerDistributedCacheManager有一个clearup线程,每隔1min会去处理那些无人使用的,目录大小大于local.cache.size或者子目录个数大于mapreduce.tasktracker.cache.local.numberdirectories的cache目录.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: