Hadoop-Python实现Hadoop Streaming分组和二次排序
2018-01-10 18:26
633 查看
分组(partition)
Hadoop streaming框架默认情况下会以’/t’作为分隔符,将每行第一个’/t’之前的部分作为key,其余内容作为value,如果没有’/t’分隔符,则整行作为key;这个key/tvalue对又作为该map对应的reduce的输入。
-D stream.map.output.field.separator 指定分割key分隔符,默认是/t
-D stream.num.map.output.key.fields 选择key的范围
-D map.output.key.field.separator 指定key内部的分隔符
-D num.key.fields.for.partition 指定对key分出来的前几部分做partition而不是整个key
准备数据
鲁V73930,鲁,549黑ML1711,黑,235
鲁V75066,鲁,657
桂J73031,桂,900
晋M42387,晋,432
桂J73138,桂,456
晋M41665,晋,879
晋M42529,晋,790
step_run.sh
#!/bin/bash EXEC_PATH=$(dirname "$0") HPHOME=/opt/cloudera/parcels/CDH JAR_PACKAGE=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar IN_PATH=/user/h_chenliling/test.txt.lzo OUT_PATH=/user/h_chenliling/testout.txt MAP_FILE=${EXEC_PATH}/step_map.py RED_FILE=${EXEC_PATH}/step_red.py $HPHOME/bin/hadoop fs -rm -r $OUT_PATH $HPHOME/bin/hadoop jar $JAR_PACKAGE \ -D mapred.job.queue.name=bdev \ -D stream.map.input.ignoreKey=true \ -D map.output.key.field.separator=, \ #内部key分隔符 -D num.key.fields.for.partition=1 \ #key分组范围 -numReduceTasks 2 \ -input $IN_PATH \ -output $OUT_PATH \ -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \ -mapper $MAP_FILE \ -reducer $RED_FILE \ -file $MAP_FILE \ -file $RED_FILE \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner #指定分区类 $HPHOME/bin/hadoop fs -ls $OUT_PATH
step_map.py
#!/usr/bin/env python #coding=utf-8 import sys for line in sys.stdin: line = line.strip() seq = line.split(",") if len(seq) >=3: plate = seq[0] #车牌号 province = seq[1] #注册地 mile = seq[2] #里程 print province+ "," + plate + "\t" + mile
step_red.py
#!/usr/bin/env python #coding=utf-8 import sys prov = "" sum_mile = 0 for line in sys.stdin: line = line.strip() seq = line.split("\t") mile = int(seq[1]) if prov == "": prov = seq[0].split(",")[0] sum_mile = mile else: if prov == seq[0].split(",")[0]: # 相同组 sum_mile = sum_mile + mile else: # 不同组,输出上一组数据 print "%s\t%d" % (prov, sum_mile) sum_mile = mile prov = seq[0].split(",")[0] print "%s\t%d" % (prov, sum_mile)
输出结果:
hadoop fs -text /user/h_chenliling/testout.txt/part-00000晋 2101
鲁 1775
hadoop fs -text /user/h_chenliling/testout.txt/part-00001
桂 1356
黑 235
补充
事实上KeyFieldBasePartitioner还有一个高级参数 mapred.text.key.partitioner.options,这个参数可以认为是 num.key.fields.for.partition的升级版,它可以指定不仅限于key中的前几个字段用做partition,而是可以单独指定 key中某个字段或者某几个字段一起做partition。比如上面的需求用mapred.text.key.partitioner.options表示为 mapred.text.key.partitioner.options=-k1,1
二次排序(Secondary Sort)
mapper的输出被partition到各个reducer之后,会有一步排序。默认是按照key做二次排序,如果key是多列组成,先按照第一列排序,第一列相同的,按照第二列排序如果需要自定义排序。这里要控制的就是key内部的哪些元素用来做排序依据,是排字典序还是数字序,倒序还是正序。用来控制的参数是mapred.text.key.comparator.options。
通过org.apache.hadoop.mapred.lib.KeyFieldBasedComparator来自定义使用key中的部分字段做比较。
准备数据
鲁V73930,鲁,2,549黑ML1711,黑,1,235
鲁V75066,鲁,1,657
桂J73031,桂,1,900
晋M42387,晋,3,432
桂J73138,桂,2,456
晋M41665,晋,2,879
晋M42529,晋,1,790
鲁V75530,鲁,3,569
step_run.sh
#!/bin/bash EXEC_PATH=$(dirname "$0") HPHOME=/opt/cloudera/parcels/CDH JAR_PACKAGE=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar IN_PATH=/user/h_chenliling/test1.txt.lzo OUT_PATH=/user/h_chenliling/testout1.txt MAP_FILE=${EXEC_PATH}/step_map.py RED_FILE=${EXEC_PATH}/step_red.py $HPHOME/bin/hadoop fs -rm -r $OUT_PATH $HPHOME/bin/hadoop jar $JAR_PACKAGE \ -D mapred.job.queue.name=bdev \ -D stream.map.input.ignoreKey=true \ -D stream.map.output.field.separator=, \ #分割key/value -D stream.num.map.output.key.fields=3 \ #取key范围 -D map.output.key.field.separator=, \ #内部key分割符 -D num.key.fields.for.partition=1 \ #取分区范围 -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ #排序类 -D mapred.text.key.comparator.options=-k3,3nr \ #第三个元素倒序 -numReduceTasks 5 \ -input $IN_PATH \ -output $OUT_PATH \ -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \ -mapper $MAP_FILE \ -reducer $RED_FILE \ -file $MAP_FILE \ -file $RED_FILE \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner #分区类 $HPHOME/bin/hadoop fs -ls $OUT_PATH
step_map.py
#!/usr/bin/env python #coding=utf-8 import sys for line in sys.stdin: line = line.strip() seq = line.split(",") if len(seq) >=3: plate = seq[0] #车牌号 province = seq[1] #注册地 order = seq[2] mile = seq[3] #里程 print province + "," +plate+","+ order + "," + mile
step_red.py
#!/usr/bin/env python #coding=utf-8 import sys for line in sys.stdin: line = line.strip() print line
输出结果
hadoop fs -text /user/h_chenliling/testout1.txt/part-00000鲁,鲁V73930,2 549
鲁,鲁V75066,1 657
黑,黑ML1711,1 235
hadoop fs -text /user/h_chenliling/testout1.txt/part-00001
桂,桂J73138,2 456
桂,桂J73031,1 900
hadoop fs -text /user/h_chenliling/testout1.txt/part-00002
晋,晋M42387,3 432
晋,晋M41665,2 879
晋,晋M42529,1 790
相关文章推荐
- python 实现Hadoop的partitioner和二次排序
- python 实现Hadoop的partitioner和二次排序
- python 实现Hadoop的partitioner和二次排序
- python 实现Hadoop的partitioner和二次排序
- Python+Hadoop Streaming实现MapReduce(word count)
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- hadoop streaming 输出数据分割与二次排序
- Hadoop1.x MapReduce 实现二次排序 实现WritableComparable接口
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- python基于Hadoop Streaming实现简单的WordCount
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- hadoop二次排序实现join
- Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解
- hadoop 二次排序join的实现
- (Hadoop学习-2)mapreduce实现二次排序
- Hadoop和Spark分别实现二次排序
- 使用Hadoop和Spark实现二次排序
- Python+Hadoop Streaming实现MapReduce(word count)