您的位置:首页 > 运维架构

Hadoop 实战之Streaming(三)

2012-10-20 08:17 337 查看
Google曾经有一道非常经典的面试题:
给你一个长度为N的链表。N很大,但你不知道N有多大。你的任务是从这N个元素中随机取出k个元素。你只能遍历这个链表一次。你的算法必须保证取出的元素恰好有k个,且它们是完全随机的(出现概率均等)?

这道题的解法非常多,网上讨论也非常热烈。本文要讨论的是,这个问题是从何而来,有什么实用价值?

自从有了Hadoop之后,该问题便有了新的应用载体。随着数据量的增多,很多数据挖掘算法被转移到MapReduce上实现,而数据挖掘中有个基本的问题是怎样对数据进行抽样。在Hadoop中,每个job会被分解成多个task并行计算,而数据的总量事先是不知道的(知道job运行结束才能获取数总数,而数据量非常大时,扫描一遍数据的代价非常高),用户知道的只是要获取的样本量,那怎样在类似于Hadoop的分布式平台上进行数据抽样?

回过头来看google的这道面试题,是不是正好时Hadoop平台上海量数据抽样问题?

我们可以在Hadoop应用中应该同样的脚本来得到一个数据集样本,采样的数据集通常用于程序开发,因为你可以基于它在单机或者伪分布式下递归地快速调试Hadoop程序

环境:Vmware 8.0 和ubuntu11.04

第一步: 首先在/home/tanglg1987目录下新建一个start.sh脚本文件,每次启动虚拟机都要删除/tmp目录下的全部文件,重新格式化namenode,代码如下:
sudo rm -rf /tmp/*
rm -rf /home/tanglg1987/hadoop-0.20.2/logs
hadoop namenode -format
hadoop datanode -format
start-all.sh
hadoop fs -mkdir input
hadoop dfsadmin -safemode leave

第二步:给start.sh增加执行权限并启动hadoop伪分布式集群,代码如下:
chmod 777 /home/tanglg1987/start.sh
./start.sh

运行过程如下:



第三步:上传本地文件到hdfs

在专利局http://data.nber.org/patents/网站下载专利数据
http://data.nber.org/patents/apat63_99.zip
hadoop fs -put /home/tanglg1987/apat63_99.txt input

第四步:新建一个RandomSample.py的Python文件
#!/usr/bin/env python
import sys, random
for line in sys.stdin:
if (random.randint(1,100) <= int(sys.argv[1])):
print line.strip()

第五步:新建一个test.py的Python文件
解决Linux下运行Python脚本显示“: 没有那个文件或目录”的问题

我猜不少人都遇到过类似的问题:

在Windows下写好了一个python脚本,运行没问题

但放到Linux系统下就必须在命令行前加上一个python解释器才能运行

脚本开头的注释行已经指明了解释器的路径,也用chmod给了执行权限,但就是不能直接运行脚本。

比如这个脚本:

#!/usr/bin/env python

#-*- coding=utf-8 -*-

def main():

print('This is just a test!\r\n')

if __name__ == '__main__':

main()

按理说没错的,但为什么不能直接运行呢?

后来发现问题出在换行表示上……

Windows下,文本的换行是\r\n一同实现的,而*nix下则只用\n

所以我的第一行代码在Linux下就被识别为了:

#!/usr/bin/env python\r

很显然,系统不知道这个"python\r"是个什么东西……

知道了这个,解决方案就很显而易见了,写了一个自动替换换行标志的脚本:
#!/usr/bin/env python
#-*- coding=utf-8 -*-
import sys, os
def replace_linesep(file_name):
if type(file_name) != str:
raise ValueError
new_lines = []

#以读模式打开文件
try:
fobj_original = open(file_name, 'r')
except IOError:
print('Cannot read file %s!' % file_name)
return False
#逐行读取原始脚本
print('Reading file %s' % file_name)
line = fobj_original.readline()
while line:
if line[-2:] == '\r\n':
new_lines.append(line[:-2] + '\n')
else:
new_lines.append(line)
line = fobj_original.readline()
fobj_original.close()

#以写模式打开文件
try:
fobj_new = open(file_name, 'w')
except IOError:
print('Cannot write file %s!' % file_name)
return False
#逐行写入新脚本
print('Writing file %s' % file_name)
for new_line in new_lines:
fobj_new.write(new_line)
fobj_new.close()
return True

def main():
args = sys.argv
if len(args) < 2:
print('Please enter the file names as parameters follow this script.')
os._exit(0)
else:
file_names = args[1:]
for file_name in file_names:
if replace_linesep(file_name):
print('Replace for %s successfully!' % file_name)
else:
print('Replace for %s failed!' % file_name)
os._exit(1)

if __name__ == '__main__':
main()

第六步:新建一个replace.sh的shell文件
/home/tanglg1987/test/streaming/test.py *.py

运行过程如下:



第七步:编写一个名为:list-4-3.sh的shell脚本

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-0.20.2-streaming.jar -input input/apat63_99.txt -output output -file	/home/tanglg1987/test/streaming/RandomSample.py	-mapper 'RandomSample.py 10' D mapred.reduce.tasks=1


第八步:给list-4-3.sh增加执行权限并启动脚本,代码如下:
chmod 777 /home/tanglg1987/list-4-3.sh
./list-4-3.sh

第九步:运行过程如下:



第十步:查看结果集,运行结果如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: