您的位置:首页 > 运维架构

Hadoop Yarn 框架 Demo 变化

2014-03-11 22:08 134 查看
场景介绍:Weblogic 应用服务器日志分析

了解了 hadoop 新的 Yarn 框架的架构和思路后,我们用一个 Demo 示例来检验新 Yarn 框架下 Map-Reduce 程序的开发部署。

我们考虑如下应用场景:用户的生产系统由多台 Weblogic 应用服务器组成,每天需要每台对应用服务器的日志内容进行检查,统计其日志级别和日志模块的总数。

WebLogic 的日志范例如下图所示:

图 3.Weblogic 日志示例



如上图所示,<Info> 为 weblogic 的日志级别,<Security>,<Management> 为 Weblogic 的日志模块,我们主要分析 loglevel 和 logmodule 这两个维度分别在 WebLogic 日志中出现的次数,每天需要统计出 loglevel 和 logmodule 分别出现的次数总数。

Demo 测试环境 Yarn 框架搭建

由于 Weblogic 应用服务器分布于不同的主机,且日志数据量巨大,我们采用 hadoop 框架将 WebLogic 各个应用服务器主机上建立分布式目录,每天将 WebLogic 日志装载进 hadoop 分布式文件系统,并且编写基于 Yarn 框架的 MapReduce 程序对日志进行处理,分别统计出 LogLevel 和 Logmodule 在日志中出现的次数并计算总量,然后输出到分布式文件系统中,输出目录命名精确到小时为后缀以便区分每次 Demo 程序运行的处理结果。

我们搭建一个 Demo 测试环境以验证 Yarn 框架下分布式程序处理该案例的功能,以两台虚拟机作为该 Demo 的运行平台,两机均为 Linux 操作系统,机器 hostname 为 OEL 和 Stephen,OEL 作为 NameNode 和 ResouceManager 节点主机,64 位,Stephen 作为 DataNode 和 NodeManager 节点主机,32 位(Hadoop 支持异构性), 具体如下:

表 3.Demo 测试环境表

主机名角色备注
OEL(192.168.137.8)NameNode 节点主机

ResourceManager 主机
linux 操作系统

32bit
Stephen(192.168.l37.2)DataNode 节点主机

NodeManager 主机
linux 操作系统

64bit
我们把 hadoop 安装在两台测试机的 /hadoop 文件系统目录下,安装后的 hadoop 根目录为:/hadoop/hadoop-0.23.0,规划分布式文件系统存放于 /hadoop/dfs 的本地目录,对应分布式系统中的目录为 /user/oracle/dfs

我们根据 Yarn 框架要求,分别在 core-site.xml 中配置分布式文件系统的 URL,详细如下:

清单 1.core-site.xml 配置

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://192.168.137.8:9100</value>
</property>
</configuration>

在 hdfs-site.xml 中配置 nameNode,dataNode 的本地目录信息,详细如下:

清单 2.hdfs-site.xml 配置

<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/hadoop/dfs/name</value>
<description>  </description>
</property>

<property>
<name>dfs.datanode.data.dir</name>
<value>/hadoop/dfs/data</value>
<description> </description>
</property>

<property>
<name>dfs.replication</name>
<value>2</value>
</property>

</configuration>

在 mapred-site.xml 中配置其使用 Yarn 框架执行 map-reduce 处理程序,详细如下:

清单 3.mapred-site.xml 配置

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>Yarn</value>
</property>
</configuration>

最后在 Yarn-site.xml 中配置 ResourceManager,NodeManager 的通信端口,web 监控端口等,详细如下:

清单 4.Yarn-site.xml 配置

<?xml version="1.0"?>
<configuration>

<!-- Site specific YARN configuration properties -->
<property>
<name>Yarn.nodemanager.aux-services</name>
<value>mapreduce.shuffle</value>
</property>
<property>
<description>The address of the applications manager interface in the RM.</description>
<name>Yarn.resourcemanager.address</name>
<value>192.168.137.8:18040</value>
</property>

<property>
<description>The address of the scheduler interface.</description>
<name>Yarn.resourcemanager.scheduler.address</name>
<value>192.168.137.8:18030</value>
</property>

<property>
<description>The address of the RM web application.</description>
<name>Yarn.resourcemanager.webapp.address</name>
<value>192.168.137.8:18088</value>
</property>

<property>
<description>The address of the resource tracker interface.</description>
<name>Yarn.resourcemanager.resource-tracker.address</name>
<value>192.168.137.8:8025</value>
</property>
</configuration>

具体配置项的含义,在 hadoop 官方网站有详细的说明,读者可以参见 hadoop 0.23.0 官方配置模板

Demo 代码开发及详解

以下我们详细介绍一下新的 Yarn 框架下针对该应用场景的 Demo 代码的开发, 在 Demo 程序的每个类都有详细的注释和说明,Yarn 开发为了兼容老版本,API 变化不大,可以参考 官方 Hadoop Yarn 框架 API

在 Map 程序中,我们以行号为 key,行文本为 value 读取每一行 WebLogic 日志输入,将 loglevel 和 logmodule 的值读出作为 Map 处理后的新的 key 值,由于一行中 loglevel 和 logmodule 的出现次数应该唯一,所以经 Map 程序处理后的新的 record 记录的 value 应该都为 1:

清单 5. Map 业务逻辑

public static class MapClass extends Mapper<Object, Text, Text, IntWritable>
{
private Text record = new Text();
private static final IntWritable recbytes = new IntWritable(1);
public void map(Object key, Text value,Context context)
throws IOException,InterruptedException {
String line = value.toString();
// 没有配置 RecordReader,所以默认采用 line 的实现,
//key 就是行号,value 就是行内容,
// 按行 key-value 存放每行 loglevel 和 logmodule 内容
if (line == null || line.equals(""))
return;
String[] words = line.split("> <");
if (words == null || words.length < 2)
return;
String logLevel = words[1];
String moduleName = words[2];

record.clear();
record.set(new StringBuffer("logLevel::").append(logLevel).toString());
context.write(record, recbytes);
// 输出日志级别统计结果,通过 logLevel:: 作为前缀来标示。

record.clear();
record.set(new StringBuffer("moduleName::").append(moduleName).toString());
context.write(record, recbytes);
// 输出模块名的统计结果,通过 moduleName:: 作为前缀来标示
}
}

由于有 loglevel 和 logmodule 两部分的分析工作,我们设定两个 Reduce 来分别处理这两部分,loglevel 的交给 reduce1,logmodule 交给 reduce2。因此我们编写 Patitioner 类,根据 Map 传过来的 Key 中包含的 logLevel 和 moduleName 的前缀,来分配到不同的 Reduce:

清单 6.Partition 业务逻辑

public static class PartitionerClass extends Partitioner<Text, IntWritable>

{

public int getPartition(Text key, IntWritable value, int numPartitions)

{

if (numPartitions >= 2)//Reduce 个数,判断 loglevel 还是 logmodule 的统计,分配到不同的 Reduce

if (key.toString().startsWith("logLevel::"))

return 0;

else if(key.toString().startsWith("moduleName::"))

return 1;

else return 0;

else

return 0;

}

}
在 Reduce 程序中,累加并合并 loglevel 和 logmodule 的出现次数

清单 7. Reduce 业务逻辑

public static class ReduceClass extends  Reducer<Text, IntWritable,Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context)throws IOException,
InterruptedException {

int tmp = 0;
for (IntWritable val : values) {
tmp = tmp + val.get();
}
result.set(tmp);
context.write(key, result);// 输出最后的汇总结果
}
}

以上完成了 MapReduce 的主要处理逻辑,对于程序入口,我们使用 Hadoop 提供的 Tools 工具包方便的进行 May-Reduce 程序的启动和 Map/Reduce 对应处理 class 的配置。

清单 8. Main 执行类

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class LogAnalysiser extends Configured implements Tool {
public static void main(String[] args)
{
try
{
int res;
res = ToolRunner.run(new Configuration(),new LogAnalysiser(), args);
System.exit(res);
} catch (Exception e)
{
e.printStackTrace();
}
}
public int run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return 1;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd.HH.mm");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();

if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/oracle/dfs/" + shortin;
shortout = "/user/oracle/dfs/" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);

if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn't dir!");
return 0;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}
// 以下注释的是 hadoop 0.20.X 老版本的 Job 代码,在 hadoop0.23.X 新框架中已经大大简化
//   Configuration conf = getConf();
//   JobConf job = new JobConf(conf, LogAnalysiser.class);
//    JobConf conf = new JobConf(getConf(),LogAnalysiser.class);// 构建 Config
//    conf.setJarByClass(MapClass.class);
//    conf.setJarByClass(ReduceClass.class);
//    conf.setJarByClass(PartitionerClass.class);
//    conf.setJar("hadoopTest.jar");
//    job.setJar("hadoopTest.jar");

// 以下是新的 hadoop 0.23.X Yarn 的 Job 代码

job job = new Job(new Configuration());
job.setJarByClass(LogAnalysiser.class);
job.setJobName("analysisjob");
job.setOutputKeyClass(Text.class);// 输出的 key 类型,在 OutputFormat 会检查
job.setOutputValueClass(IntWritable.class); // 输出的 value 类型,在 OutputFormat 会检查
job.setJarByClass(LogAnalysiser.class);
job.setMapperClass(MapClass.class);
job.setCombinerClass(ReduceClass.class);
job.setReducerClass(ReduceClass.class);
job.setPartitionerClass(PartitionerClass.class);
job.setNumReduceTasks(2);// 强制需要有两个 Reduce 来分别处理流量和次数的统计
FileInputFormat.setInputPaths(job, new Path(shortin));//hdfs 中的输入路径
FileOutputFormat.setOutputPath(job,new Path(shortout));//hdfs 中输出路径

Date startTime = new Date();
System.out.println("Job started: " + startTime);
job.waitForCompletion(true);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
// 删除输入和输出的临时文件
//    fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
//    fileSys.delete(new Path(shortin),true);
//    fileSys.delete(new Path(shortout),true);
return 0;
}
}

Demo 部署及运行

Demo 输入输出的控制

本 demo 中我们将从 Weblogic 日志目录中拷贝原始待处理日志文件作为 Yarn 程序的输入,使用 hadoop dfs 命令将其放入分布式目录的 input 目录,处理完后将生成以时间戳为文件目录后缀的输出目录

Weblogic 日志存放的原始目录位于:/u01/app/Oracle/Middleware/user_projects/domains/test_domain/AdminServer/logs

分布式文件系统中的输入目录:/user/oracle/dfs/input

分布式文件系统中的输出目录:/user/oracle/dfs/output_%YYYY-MM-DD-hh-mm%

Demo 打包和部署

可以使用 JDeveloper 或者 Eclipse 等 IDE 工具将开发的 Hadoop Demo 代码打包为 jar,并指定 Main 类为 LoyAnalyze,本文中我们采用 JDeveloper 打包 Demo 代码,如下图示例:

图 4.Yarn Demo 程序打包示例



Demo 执行与跟踪

我们在 OEL 主机(NameNode&ResourceManager 主机,192.168.137.8)上启动 dfs 分布式文件系统:

图 5. 启动 Demo dfs 文件系统



从上图可以看出 dfs 分布式文件系统已经在 OEL 和 Stephen 主机上成功启动,我们通过默认的分布式文件系统 Web 监控 端口http://192.168.137.8:50070(也可以在上文中 core-site.xml
中配置 dfs.namenode.http-address 项指定其他端口 ) 来验证其文件系统情况:

图 6.hadoop 文件系统 web 监控页面



从上图中我们可以看到 /user/oracle/dfs 分布式文件系统已成功建立。

接下来我们在 NameNode 主机(OEL,192.168.137.8)上启动 Yarn 框架:

图 7. 启动 Demo Yarn 框架



从上图我们可以看到 ResouceManager 在 OEL 主机上成功启动,NodeManager 进程在 Stephen 节点主机上也已经启动,至此整个新的 Hadoop Yarn 框架已经成功启动。

我们将打好的 testHadoop.jar 包上传至 NameNode 主机(OEL)的 /hadoop/hadoop-0.23.0/ 根目录下,我们使用 Hadoop 自带的 hadoop 命令行工具执行 Demo 的 jar 包,具体步骤为,先使用 hadoop dfs 命令将输入文件(weblogic 原始日志)拷贝至 dfs 分布式目录的 input 输入目录,清理 dfs 分布式目录下的 output 输出子目录。然后使用 hadoop jar 命令执行 testHadoop 的 jar 包。

执行 Demo 的 shell 脚本示例如下:
./bin/hadoop dfs -rmr /user/oracle/dfs/output*
./bin/hadoop dfs -rmr /user/oracle/dfs/input
./bin/hadoop dfs -mkdir /user/oracle/dfs/input
./bin/hadoop dfs -copyFromLocal ./input/*.log /user/oracle/dfs/input/
./bin/hadoop jar ./hadoopTest.jar /hadoop/hadoop-0.23.0/input
/hadoop/hadoop-0.23.0/output

清单 9.Demo 执行脚本

然后我们使用上文中的脚本启动 demo 并执行:

图 8.Demo 程序运行



查看大图

从上图的 console 输出中我们可以看到 Demo 程序的结果和各项统计信息输出,下面我们通过 Web 监控界面详细中观察程序执行的执行流程和步骤细节。

Job 启动后我们可以通过 ResourceManager 的 Web 端口(在上文中 Yarn-site.xml 配置文件中 Yarn.resourcemanager.webapp.address 配置项) http://192.168.137.8:18088 来监控其 job 的资源调度。

图 9. 接收请求和生成 job application



查看大图

上图中我们可以看到 Yarn 框架接受到客户端请求 , 如上图所示 ID 为 application_1346564668712_0003 的 job 已经是 accepted 状态

我们点击该 ID 的链接进入到该 application 的 Map-Reduce 处理监控页面,该界面中有动态分配的 ApplicationMaster 的 Web 跟踪端口可以监视 MapReduce 程序的步骤细节

图 10.hadoop MapReduce Application Web 监控页面 (1)



点击上图中 ApplicationMaster 的 URL 可以进入该 ApplicationMaster 负责管理的 Job 的具体 Map-Reduce 运行状态:

图 11.hadoop MasterApplication Web 监控页面(2)



上图中我们可以看到 ID 为 application_1346564668712_0003 的 Job 正在执行,有 2 个 Map 进程,已经处理完毕,有 2 个 Reduce 正在处理,这跟我们程序设计预期的是一样的。

当状态变为 successful 后,进入 dfs 文件系统可以看到,输出的 dfs 文件系统已经生成,位置位于 /user/oracle/dfs 下,目录名为 output-2012.09.02.13.52,可以看到格式和命名方式与 Demo 设计是一致的,如下图所示:

图 12.Demo 输出目录(1)



我们进入具体的输出目录,可以清楚的看到程序处理的输出结果,正如我们 Demo 中设计的,两个 Reduce 分别生成了两个输出文件,分别是 part-r-00000 和 part-r-00001,对应 Module 和 Log Level 的处理输出信息:

图 13.Demo 输出目录(2)



点击 part-r-00000 的输出文件链接,可以看到程序处理后的 log level 的统计信息:

图 14.Demo 输出结果(1)



点击 part-r-00001 的输出文件链接,可以看到程序处理后 Module 的统计信息:

图 15.Demo 输出结果(2)



至此我们基于新的 Yarn 框架的 Demo 完全成功运行,实现功能与预期设计完全一致,运行状态和 NameNode/DataNode 部署,Job/MapReduece 程序的调度均和设计一致。读者可参考该 Demo 的配置及代码进行修改,做为实际生产环境部署和实施的基础。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: