Hadoop Streaming高级编程
2013-02-25 18:40
465 查看
1.概要
本文主要介绍了Hadoop
Streaming的一些高级编程技巧,包括,怎样在mapredue作业中定制输出输出格式?怎样向mapreduce作业中传递参数?怎么在mapreduce作业中加载词典?怎样利用Hadoop
Streamng处理二进制格式的数据等。
关于HadoopStreaming的基本编程方法,可参考:Hadoop
Streaming编程。
2.在mapreduce作业中定制输入输出格式
Hadoop0.21.0之前的版本中的HadoopStreaming工具只支持文本格式的数据,而从Hadoop
0.21.0开始,也支持二进制格式的数据。这里介绍文本文件的输入输出格式定制,关于二进制数据的格式,可参考第5节。
HadoopStreaming提交作业的格式为:
其中,-D选项中的一些配置属性可定义输入输出格式,具体如下(注意,对于文本而言,每一行中存在一个key/value对,这里只能定制key和value之间的分割符,而行与行之间的分隔符不可定制,只能是\n):
(1)stream.map.input.field.separator/stream.map.output.field.separator:
maptask输入/输出数据的分隔符,默认均为\t。
(2)stream.num.map.output.key.fields:指定map
task输出记录中key所占的域数目,如
每一行形式为,Key1\tkey2\tkey3\tvalue,采用默认的分隔符,且stream.num.map.output.key.fields设为2,则Key1\tkey2表示key,key3\tvalue表示value。
(3)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce
task输入/输出数据的分隔符,默认均为\t。
(4)stream.num.reduce.output.key.fields:指定reduce
task输出记录中key所占的域数目
3.向mapreduce作业传递参数
提交作业时,使用-cmdenv选项以环境变量的形式将你的参数传递给mapper/reducer,如:
然后编写mapper或reducer时,使用main函数的第三个参数捕获你传入的环境变量,如:
4.在mapreduce作业中加载词典
提交作业时,使用-file选项,如:
然后编写mapper或reducer时,像本地文件一样打开并使用dic.txt文件,如:
如果要加载非常大的词典或配置文件,Hadoop
Streaming还提供了另外一个选项-files,该选项后面跟的是HDFS上的一个文件(将你的配置文件放到HDFS上,再大也可以!!!),你可以在程序中像打开本地文件一样打开该文件,此外,你也可以使用#符号在本地建一个系统链接,如:
在代码中这样做:
如:
5.处理二进制格式的数据
从Hadoop0.21.0开始,streaming支持二进制文件(具体可参考:HADOOP-1722),用户提交作业时,使用-io选项指明二进制文件格式。0.21.0版本中增加了两种二进制文件格式,分别为:
(1)rawbytes:key和value均用【4个字节的长度+原始字节】表示
(2)typedbytes:key和value均用【1字节类型+4字节长度+原始字节】表示
用户提交作业时,如果用-io指定二进制格式为typedbytes,则map的输入输出,reduce的输入输出均为typedbytes,如果想细粒度的控制这几个输入输出,可采用以下几个选项:
你如果采用的python语言,下面是从HADOOP-1722
中得到的一个例子(里面用到了解析typedbytes的python库,见:http://github.com/klbostee/typedbytes
):
mapper脚本如下:
reducer脚本:
6.自定义counter并增加counter的值
用户采用某种语言编写的mapper或者reducer可采用标准错误输出(stderr)自定义和改变counter值,格式为:reporter:counter:<group>,<counter>,<amount>,如,在C语言编写的mapper/reducer中:
注:用户定义的自定义counter的最终结果会在桌面或者web界面上显示出来。
如果你想在mapreduce作业执行过程中,打印一些状态信息,同样可使用标准错误输出,格式为:reporter:status:<message>,如,在C语言编写的mapper/reducer中:
7.在mapreduce使用LinuxPipes
迄今为止(0.21.0版本之前,包括0.21.0),HadoopStreaming是不支持Linux
Pipes,如:-mapper“cut-f1|seds/foo/bar/g”会报”java.io.IOException:
Brokenpipe”错误。
8.在mapreduce中获取JobConf的属性值
在0.21.0版本中,streaming作业执行过程中,JobConf中以mapreduce开头的属性(如mapreduce.job.id)会传递给mapper和reducer,关于这些参数,可参考:http://hadoop.apache.org/mapreduce/docs/r0.21.0/mapred_tutorial.html#Configured+Parameters
其中,属性名字中的“.”会变成“_”,如mapreduce.job.id会变为mapreduce_job_id,用户可在mapper/reducer中获取这些属性值直接使用(可能是传递给环境变量参数,即main函数的第三个参数,本文作业还未进行验证)。
9.一些HadoopStreaming的开源软件包
(1)针对Hadoop
Streaming常用操作的C++封装包(如自定义和更新counter,输出状态信息等):https://github.com/dgleich/hadoopcxx
(2)C++实现的typedbytes代码库:https://github.com/dgleich/libtypedbytes
(3)python实现的typedbytes代码库:http://github.com/klbostee/typedbytes
(4)Java实现的typedbytes代码库(Hadoop0.21.0代码中自带)
10.总结
Hadoop
Streaming使得程序员采用各种语言编写mapreduce程序变得可能,它具备程序员所需的大部分功能接口,同时由于这种方法编写mapreduce作业简单快速,越来越多的程序员开始尝试使用Hadoop
Steraming。
11.参考资料
http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html
https://issues.apache.org/jira/browse/HADOOP-1722
原创文章,转载请注明:转载自董的博客
本文链接地址:http://dongxicheng.org/mapreduce/hadoop-streaming-advanced-programming/
本文主要介绍了Hadoop
Streaming的一些高级编程技巧,包括,怎样在mapredue作业中定制输出输出格式?怎样向mapreduce作业中传递参数?怎么在mapreduce作业中加载词典?怎样利用Hadoop
Streamng处理二进制格式的数据等。
关于HadoopStreaming的基本编程方法,可参考:Hadoop
Streaming编程。
2.在mapreduce作业中定制输入输出格式
Hadoop0.21.0之前的版本中的HadoopStreaming工具只支持文本格式的数据,而从Hadoop
0.21.0开始,也支持二进制格式的数据。这里介绍文本文件的输入输出格式定制,关于二进制数据的格式,可参考第5节。
HadoopStreaming提交作业的格式为:
1 2 3 4 | Usage:$HADOOP_HOME/bin/hadoopjar\ $HADOOP_HOME/hadoop-streaming.jar [options] |
(1)stream.map.input.field.separator/stream.map.output.field.separator:
maptask输入/输出数据的分隔符,默认均为\t。
(2)stream.num.map.output.key.fields:指定map
task输出记录中key所占的域数目,如
每一行形式为,Key1\tkey2\tkey3\tvalue,采用默认的分隔符,且stream.num.map.output.key.fields设为2,则Key1\tkey2表示key,key3\tvalue表示value。
(3)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce
task输入/输出数据的分隔符,默认均为\t。
(4)stream.num.reduce.output.key.fields:指定reduce
task输出记录中key所占的域数目
3.向mapreduce作业传递参数
提交作业时,使用-cmdenv选项以环境变量的形式将你的参数传递给mapper/reducer,如:
1 2 3 4 5 6 7 8 9 10 11 12 | $HADOOP_HOME/bin/hadoopjar\ contrib/streaming/hadoop-0.20.2-streaming.jar \ -inputinput\ -ouputoutput\ -cmdenvgrade=1\ ……. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | int main( int argc, char *argv[], char *env[]){ int i, grade; for (i=0; if ( strncmp (env[i], grade= atoi (env[i]+6); …… } |
提交作业时,使用-file选项,如:
1 2 3 4 5 6 7 8 9 10 11 12 | $HADOOP_HOME/bin/hadoopjar\ contrib/streaming/hadoop-0.20.2-streaming.jar \ -inputinput\ -ouputoutput\ -file ……. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | int main( int argc, char *argv[], char *env[]){ FILE *fp; char buffer[1024]; fp = fopen ( "dict.txt" , "r" ); if (!fp) return 1; while ( fgets (buffer, …… } …… } |
Streaming还提供了另外一个选项-files,该选项后面跟的是HDFS上的一个文件(将你的配置文件放到HDFS上,再大也可以!!!),你可以在程序中像打开本地文件一样打开该文件,此外,你也可以使用#符号在本地建一个系统链接,如:
1 2 3 4 5 6 7 8 | $HADOOP_HOME/bin/hadoopjar\ contrib/streaming/hadoop-0.20.2-streaming.jar \ -filehdfs: //host:fs_port/user/dict.txt#dict_link \ ……. |
如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | int main( int argc, char *argv[], char *env[]){ FILE *fp; char buffer[1024]; fp = fopen ( "dict_link " , "r" ); //orfp=fopen("dict.txt if (!fp) return 1; while ( fgets (buffer, …… } …… } |
从Hadoop0.21.0开始,streaming支持二进制文件(具体可参考:
(1)rawbytes:key和value均用【4个字节的长度+原始字节】表示
(2)typedbytes:key和value均用【1字节类型+4字节长度+原始字节】表示
用户提交作业时,如果用-io指定二进制格式为typedbytes,则map的输入输出,reduce的输入输出均为typedbytes,如果想细粒度的控制这几个输入输出,可采用以下几个选项:
1 2 3 4 5 6 7 8 | -D stream.map.input=[identifier] -D stream.map.output=[identifier] -D stream.reduce.input=[identifier] -D stream.reduce.output=[identifier] |
中得到的一个例子(里面用到了解析typedbytes的python库,见:http://github.com/klbostee/typedbytes
):
mapper脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import sys import typedbytes input = typedbytes.PairedInput(sys.stdin) output = typedbytes.PairedOutput(sys.stdout) for (key, value) in input : for word in value.split(): output.write((word, 1 )) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import sys import typedbytes from itertools import groupby from operator import itemgetter input = typedbytes.PairedInput(sys.stdin) output = typedbytes.PairedOutput(sys.stdout) for (key, group) in groupby( input , itemgetter( 0 )): values = map (itemgetter( 1 ), group) output.write((key, sum (values))) |
用户采用某种语言编写的mapper或者reducer可采用标准错误输出(stderr)自定义和改变counter值,格式为:reporter:counter:<group>,<counter>,<amount>,如,在C语言编写的mapper/reducer中:
1 | fprintf (stderr, “reporter:counter:group,counter1,1”); //将组group中的counter1增加1 |
如果你想在mapreduce作业执行过程中,打印一些状态信息,同样可使用标准错误输出,格式为:reporter:status:<message>,如,在C语言编写的mapper/reducer中:
1 | fprintf (stderr, //在shell桌面上打印“mapreducejobis |
迄今为止(0.21.0版本之前,包括0.21.0),HadoopStreaming是不支持Linux
Pipes,如:-mapper“cut-f1|seds/foo/bar/g”会报”java.io.IOException:
Brokenpipe”错误。
8.在mapreduce中获取JobConf的属性值
在0.21.0版本中,streaming作业执行过程中,JobConf中以mapreduce开头的属性(如mapreduce.job.id)会传递给mapper和reducer,关于这些参数,可参考:
其中,属性名字中的“.”会变成“_”,如mapreduce.job.id会变为mapreduce_job_id,用户可在mapper/reducer中获取这些属性值直接使用(可能是传递给环境变量参数,即main函数的第三个参数,本文作业还未进行验证)。
9.一些HadoopStreaming的开源软件包
(1)针对Hadoop
Streaming常用操作的C++封装包(如自定义和更新counter,输出状态信息等):
(2)C++实现的typedbytes代码库:
(3)python实现的typedbytes代码库:
(4)Java实现的typedbytes代码库(Hadoop0.21.0代码中自带)
10.总结
Hadoop
Streaming使得程序员采用各种语言编写mapreduce程序变得可能,它具备程序员所需的大部分功能接口,同时由于这种方法编写mapreduce作业简单快速,越来越多的程序员开始尝试使用Hadoop
Steraming。
11.参考资料
原创文章,转载请注明:转载自
本文链接地址:
相关文章推荐
- Hadoop&nbsp;Streaming&nbsp;编程
- 转载:Hadoop Streaming高级编程
- Hadoop&nbsp;pipes编程
- hadoop&nbsp;streaming的单词统计C++版
- hadoop&nbsp;Streaming之aggregate
- Hadoop Streaming高级编程
- Hadoop&nbsp;Streaming机制
- Hadoop&nbsp;Streaming和Pipes
- Hadoop Streaming高级编程
- hadoop&nbsp;pipes编程示例
- 大数据hadoop学习之---Yarn 体系架…
- Hadoop Streaming编程实例
- Java网络编程(1) Socket缓…
- unix环境高级编程——笔记3(POSIX & XSI)
- hadoop&nbsp;datanode启动不起来
- Hadoop--07--MapReduce高级编程
- 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
- 从TIOBE&nbsp;2008&nbsp;年11&nbsp;月编程语言排名…
- Hadoop Streaming 编程
- 配置hadoop-1.2.1&nbsp;eclipse开发环境