您的位置:首页 > 大数据 > 云计算

云计算(一)——使用 Hadoop Mapreduce 进行数据处理

2016-04-14 19:12 519 查看
[b]使用 Hadoop Mapreduce 进行数据处理[/b]

1. 综述

  使用HDP(下载: http://zh.hortonworks.com/products/releases/hdp-2-3/#install)搭建环境,进行分布式数据处理。

  项目文件下载,解压文件后将看到项目文件夹。该程序将读取 cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件, 文件中的每一行文本都是来自于 wikipedia 的一个标题, 读取每个标题,并使用 cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 中指定的特殊符号分割标题成独立单词,然后将单词转换为全小写,然后将出现在
cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 中的单词全部删除,最后统计剩余单词的出现次数,并输出。

  程序的编译过程需要在“~/.bashrc”文件内定义自己的环境变量“$hadoop_CLASSPATH”,在“~/.bashrc”文件中添加一行:  

export hadoop_CLASSPATH="/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/lib/*:/usr/hdp/2.3.2.0-2950/hadoop/.//*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/./:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/.//*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/.//*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/.//*:::/usr/share/java/mysql-connector-java-5.1.17.jar:/usr/share/java/mysql-connector-java-5.1.31-bin.jar:/usr/share/java/mysql-connector-java.jar:/usr/hdp/2.3.2.0-2950/tez/*:/usr/hdp/2.3.2.0-2950/tez/lib/*:/usr/hdp/2.3.2.0-2950/tez/conf:/usr/hdp/current/hadoop-yarn-client/.//*:/usr/hdp/current/hadoop-yarn-client/lib/*"


2. 运行过程

Step (1): 将项目文件夹放入HDP虚拟机,进入cloudMR文件夹,运行下列命令启动:

./start.sh


要求输入账号,随意输入10位数字即可。再运行下列命令检查 hadoop 是否正常运行:

hadoop version


Step (2): 编写 TitleCount.java 文件,完成相应功能。完成后的 TitleCount.java 如下:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.*;
import java.util.*;
/**
* Classic "Word Count"
*/
public class TitleCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TitleCount(), args);
System.exit(res);
}

@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(this.getConf(), "Title Count");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setMapperClass(TitleCountMap.class);
job.setReducerClass(TitleCountReduce.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setJarByClass(TitleCount.class);
return job.waitForCompletion(true) ? 0 : 1;
}

public static String readHDFSFile(String path, Configuration conf) throws IOException{
Path pt=new Path(path);
FileSystem fs = FileSystem.get(pt.toUri(), conf);
FSDataInputStream file = fs.open(pt);
BufferedReader buffIn=new BufferedReader(new InputStreamReader(file));

StringBuilder everything = new StringBuilder();
String line;
while( (line = buffIn.readLine()) != null) {
everything.append(line);
everything.append("\n");
}
return everything.toString();
}

public static class TitleCountMap extends Mapper<Object, Text, Text, IntWritable> {
Set<String> stopWords = new HashSet<String>();
String delimiters;

@Override
protected void setup(Context context) throws IOException,InterruptedException {

Configuration conf = context.getConfiguration();

String delimitersPath = conf.get("delimiters");
delimiters = readHDFSFile(delimitersPath, conf);

String stopWordsPath = conf.get("stopwords");
List<String> stopWordsList = Arrays.asList(readHDFSFile(stopWordsPath, conf).split("\n"));
for(String e : stopWordsList){
stopWords.add(e);
}
}

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer stk = new StringTokenizer(value.toString(),delimiters);
while(stk.hasMoreTokens()){
String e = stk.nextToken().trim().toLowerCase();
if(stopWords.contains(e) == false){
context.write(new Text(e),new IntWritable(1));
}
}
}
}

public static class TitleCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable e : values){
sum += e.get();
}
context.write(key, new IntWritable(sum));
}
}
}


Step (3): 编译java源文件。为了方便,在cloudMR文件夹中新建output文件夹,用来存放编译生成的.class文件。使用以下命令(在cloudMR文件夹下执行):

mkdir output


javac -classpath $Hadoop_CLASSPATH -d output TitleCount.java


进入output文件夹会看到3个.class文件。

Step (4): 将编译生成的类文件打包。

首先在 cloudMR 文件夹下新建文本文件 manifest.mf,使用以下命令(在 cloudMR 文件夹下执行):

touch manifest.mf


编辑内容为

Main-Class: TitleCount.class


manifest.mf 内是一些关于这个包的信息,这里定义了主类。

再使用下面命令打包(在cloudMR文件夹下执行):

jar cvfM TitleCount.jar manifest.mf -C output/ .


这条命令的含义是;

jar        打包命令

cvfM       

TitleCount.jar   打成的包的名字

manifest.mf   将这个文件打进包里

-C        -C之后的文件夹内的所有文件打进包里

output/      将output文件夹内的文件全部打进包里

.         打成的包TitleCount.jar放在当前文件夹

注意:打包过程很重要而且易错,请务必按照上文所述步骤进行。

Step (5): 将 TitleCount.jar 发布。

在发布(yarn)之前,还要完成准备工作。

将相关文件:cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件、cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 、
cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 上传到hdfs.

在 hdfs 的 /user/root/ 文件夹内新建 data 文件夹,将 delimiters.txt、stopwords.txt 放入 data 文件夹,再在 data 文件夹中新建 titles 文件夹,将cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件放入 titles 文件夹。

介绍相关的命令:

hadoop fs -ls       列出hdfs目录,由于没有参数,列出的是当前用户的主目录

hadoop fs -ls /      列出hdfs根目录

hadoop fs -mkdir data 在默认目录下新建data目录

hadoop fs -mkdir data/titles 在data目录中新建 titles目录

hadoop fs -copyFromLocal ./abc.txt data 上传当前目录(本地)中的 abc.txt 到 hdfs 上的 data 目录

  之后便可以发布了,使用命令:  

yarn jar TitleCount.jar TitleCount -D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt" data/titles output


这条命令的含义是;

yarn 发布内容

jar 要发布的内容为jar包

TitleCount.jar 发布的内容

TitleCount    TitleCount.jar的入口

-D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt"       -D后跟参数,这里定义了两个参数

data/titles 输入文件夹,其内的文件作为Map 的输入

output 输出文件存放的位置

  yarn 命令执行完毕后,即可查看运行结果。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: