您的位置:首页 > 运维架构

hadoop识别多个文件读入

2015-08-14 14:20 375 查看
原网址:

http://www.crazyant.net/1112.html

具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。

1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;

2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区

3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出

hadoop使用python实现join的map和reduce代码

mapper.py的代码:

# -*- coding: utf-8 -*-
#Mapper.py
#来自疯狂的蚂蚁www.crazyant.net
import os
import sys

#mapper脚本
def mapper():
#获取当前正在处理的文件的名字,这里我们有两个输入文件
#所以要加以区分
filepath = os.environ["map_input_file"]
filename = os.path.split(filepath)[-1]
for line in sys.stdin:
if line.strip()=="":
continue
fields = line[:-1].split("\t")
sno = fields[0]
#以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记
if filename == 'data_info':
name = fields[1]
#下面的数字'0'就是为数据源1加上的统一标记
print '\t'.join((sno,'0',name))
elif filename == 'data_grade':
courseno = fields[1]
grade = fields[2]
#下面的数字'1'就是为数据源1加上的统一标记
print '\t'.join((sno,'1',courseno,grade))

if __name__=='__main__':
mapper()


reducer的代码:

# -*- coding: utf-8 -*-
#reducer.py
#来自疯狂的蚂蚁www.crazyant.net
import sys

def reducer():
#为了记录和上一个记录的区别,用lastsno记录上个sno
lastsno = ""

for line in sys.stdin:
if line.strip()=="":
continue
fields = line[:-1].split("\t")
sno = fields[0]
'''
处理思路:
遇见当前key与上一条key不同并且label=0,就记录下来name值,
当前key与上一条key相同并且label==1,则将本条数据的courseno、
grade联通上一条记录的name一起输出成最终结果
'''
if sno != lastsno:
name=""
#这里没有判断label==1的情况,
#因为sno!=lastno,并且label=1表示该条key没有数据源1的数据
if fields[1]=="0":
name=fields[2]
elif sno==lastno:
#这里没有判断label==0的情况,
#因为sno==lastno并且label==0表示该条key没有数据源2的数据
if fields[2]=="1":
courseno=fields[2]
grade=fields[3]
if name:
print '\t'.join((lastsno,name,courseno,grade))
lastsno = sno

if __name__=='__main__':
reducer()


#先删除输出目录
~/hadoop-client/hadoop/bin/hadoop fs -rmr /hdfs/jointest/output
#来自疯狂的蚂蚁www.crazyant.net
#注意,下面配置中的环境值每个人机器不一样
~/hadoop-client/hadoop/bin/hadoop streaming \
-D mapred.map.tasks=10 \
-D mapred.reduce.tasks=5 \
-D mapred.job.map.capacity=10 \
-D mapred.job.reduce.capacity=5 \
-D mapred.job.name="join--sno_name-sno_courseno_grade" \
-D num.key.fields.for.partition=1 \
-D stream.num.map.output.key.fields=2 \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input "/hdfs/jointest/input/*" \
-output "/hdfs/jointest/output" \
-mapper "python26/bin/python26.sh mapper.py" \
-reducer "python26/bin/python26.sh reducer.py" \
-file "mapper.py" \
-file "reducer.py" \
-cacheArchive "/share/python26.tar.gz#python26"

#看看运行成功没,若输出0则表示成功了
echo $?


可以自己手工构造输入输出数据进行测试,本程序是验证过的。



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: