您的位置:首页 > 编程语言

oozie 编程方式的工作流

2015-05-06 11:33 211 查看
以一个例子简要说明搭建简单的oozie workflow 工作流

新闻门户网站统计出top k的热词汇

一、需求分析

1.首先实现一个mapredue统计出所有新闻搜索条目各自出现的屏次(与大数据领域的wordcount mr实现一致)

主类com.baidu.topk.itemsfreq.Main

2.实现一个mr 统计出top k ,并输出相关词汇(mr的每个map都取出top k,redue端在排序取出top k)

主类com.baidu.topk.itemscontent.Main

一、构建workflow。

<!--
#######################################################################
# workflow 2.0
#######################################################################
-->

<workflow-app xmlns="uri:oozie:workflow:0.4" name="${sys_name}-Task-${task_id}">
<start to="caclute-frequency"/>

<action name="caclute-frequency" retry-max="3" retry-interval="2">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${namenode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>ccom.baidu.topk.itemsfreq.Main</main-class>
<arg>-Ddfs.client.socket-timeout=30000</arg>
<arg>-Dmapreduce.job.queuename=${queueName}</arg>
<arg>-Dmapreduce.map.memory.mb=2048</arg>
<arg>-Dmapreduce.map.java.opts=-Xmx1024M</arg>
<arg>-Dmapreduce.reduce.memory.mb=2048</arg>
<arg>-Dmapreduce.reduce.java.opts=-Xmx1024M</arg>
<arg>-Dmapreduce.reduce.shuffle.memory.limit.percent=0.03</arg>
<arg>-Dmapreduce.reduce.shuffle.parallelcopies=32</arg>
<arg>-t</arg>
<arg>${task_input1}</arg>
<arg>-o</arg>
<arg>/topk/frequency/${task_id}</arg>
</java>
<ok to="calculate-topk"/>
<error to="notify-failed"/>
</action>

<action name="calculate-topk" retry-max="3" retry-interval="2">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${namenode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>com.baidu.topk.itemscontent.Main</main-class>
<arg>-Ddfs.client.socket-timeout=30000</arg>
<arg>-Dmapreduce.job.queuename=${queueName}</arg>
<arg>-Dmapreduce.map.memory.mb=2048</arg>
<arg>-Dmapreduce.map.java.opts=-Xmx1024M</arg>
<arg>-Dmapreduce.reduce.memory.mb=2048</arg>
<arg>-Dmapreduce.reduce.java.opts=-Xmx1024M</arg>
<arg>-Dmapreduce.reduce.shuffle.memory.limit.percent=0.03</arg>
<arg>-Dmapreduce.reduce.shuffle.parallelcopies=32</arg>
<arg>-t</arg>
<arg>${task_input2}</arg>
<arg>-o</arg>
<arg>/topk/content/${task_id}</arg>
</java>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>============ ERROR: ${wf:lastErrorNode()}, [${wf:errorMessage(wf:lastErrorNode())}] ============</message>
</kill>
<end name="end"/>
</workflow-app>


二、构建oozie工作目录

/topk/oozie/workflow.xml

/topk/oozie/
db41
lib

(lib中放各种jar包和库文件)

三、构建job.properties

#######################################################################
# sys set
namenode=namenode
jobTracker=jobtracker
oozie_url=oozie_url
queueName=queue_name
email_list=greahuang@163.com

# other set
sys_name=topk
#######################################################################

# control
oozie.wf.validate.ForkJoin=false

ssh_user=supertool
ssh_host=ssh_host

#######################################################################
task_id=3737
oozie.wf.application.path=/topk/oozie/workflow.xml


四、oozie java客户端

import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Properties;

/**
* @author greahuang
*/
public class WorkflowClient {

public static final String OOZIE_URL = "oozie_url";

private OozieClient oozieClient = new OozieClient(OOZIE_URL);

public WorkflowClient() {

}

/**
* @param workflowDefinition
* @param workflowParameters
* @return - jobId
* @throws org.apache.oozie.client.OozieClientException
*/
public String startJob(Properties properties)
throws OozieClientException {
// create a workflow job configuration and set the workflow application path
Properties configuration = oozieClient.createConfiguration();

Enumeration<?> enumeration = properties.propertyNames();
while (enumeration.hasMoreElements()) {
Object element = enumeration.nextElement();
configuration.setProperty(element.toString(), properties.getProperty(element.toString()).toString());
}

return oozieClient.run(configuration);
}

public static void main(String[] args) throws OozieClientException, InterruptedException, IOException {
String jobPropertyFile = args[0];

// Create client
WorkflowClient client = new WorkflowClient();
// Create parameters
Properties properties = new Properties();
properties.load(new FileInputStream(new File(jobPropertyFile)));

// Start Oozing
String jobId = client.startJob(properties);
SysOutLogger.info("jobId: " + jobId);
WorkflowJob jobInfo = client.oozieClient.getJobInfo(jobId);

SysOutLogger.info("job url: " + jobInfo.getConsoleUrl());

while (true) {
Thread.sleep(1000L);
WorkflowJob.Status status = client.oozieClient.getJobInfo(jobId).getStatus();
if (status == WorkflowJob.Status.SUCCEEDED || status == WorkflowJob.Status.FAILED || status == WorkflowJob.Status.KILLED) {
SysOutLogger.info("Job finish with status: " + status);
break;
} else {
SysOutLogger.info("job running: " + jobInfo.getStatus());
}
}

}
}


五、测试执行:

使用job.propert传入住函数。执行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息