oozie 编程方式的工作流
2015-05-06 11:33
211 查看
以一个例子简要说明搭建简单的oozie workflow 工作流
新闻门户网站统计出top k的热词汇
一、需求分析
1.首先实现一个mapredue统计出所有新闻搜索条目各自出现的屏次(与大数据领域的wordcount mr实现一致)
主类com.baidu.topk.itemsfreq.Main
2.实现一个mr 统计出top k ,并输出相关词汇(mr的每个map都取出top k,redue端在排序取出top k)
主类com.baidu.topk.itemscontent.Main
一、构建workflow。
二、构建oozie工作目录
/topk/oozie/workflow.xml
/topk/oozie/
db41
lib
(lib中放各种jar包和库文件)
三、构建job.properties
四、oozie java客户端
五、测试执行:
使用job.propert传入住函数。执行。
新闻门户网站统计出top k的热词汇
一、需求分析
1.首先实现一个mapredue统计出所有新闻搜索条目各自出现的屏次(与大数据领域的wordcount mr实现一致)
主类com.baidu.topk.itemsfreq.Main
2.实现一个mr 统计出top k ,并输出相关词汇(mr的每个map都取出top k,redue端在排序取出top k)
主类com.baidu.topk.itemscontent.Main
一、构建workflow。
<!-- ####################################################################### # workflow 2.0 ####################################################################### --> <workflow-app xmlns="uri:oozie:workflow:0.4" name="${sys_name}-Task-${task_id}"> <start to="caclute-frequency"/> <action name="caclute-frequency" retry-max="3" retry-interval="2"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${namenode}</name-node> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> </configuration> <main-class>ccom.baidu.topk.itemsfreq.Main</main-class> <arg>-Ddfs.client.socket-timeout=30000</arg> <arg>-Dmapreduce.job.queuename=${queueName}</arg> <arg>-Dmapreduce.map.memory.mb=2048</arg> <arg>-Dmapreduce.map.java.opts=-Xmx1024M</arg> <arg>-Dmapreduce.reduce.memory.mb=2048</arg> <arg>-Dmapreduce.reduce.java.opts=-Xmx1024M</arg> <arg>-Dmapreduce.reduce.shuffle.memory.limit.percent=0.03</arg> <arg>-Dmapreduce.reduce.shuffle.parallelcopies=32</arg> <arg>-t</arg> <arg>${task_input1}</arg> <arg>-o</arg> <arg>/topk/frequency/${task_id}</arg> </java> <ok to="calculate-topk"/> <error to="notify-failed"/> </action> <action name="calculate-topk" retry-max="3" retry-interval="2"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${namenode}</name-node> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> </configuration> <main-class>com.baidu.topk.itemscontent.Main</main-class> <arg>-Ddfs.client.socket-timeout=30000</arg> <arg>-Dmapreduce.job.queuename=${queueName}</arg> <arg>-Dmapreduce.map.memory.mb=2048</arg> <arg>-Dmapreduce.map.java.opts=-Xmx1024M</arg> <arg>-Dmapreduce.reduce.memory.mb=2048</arg> <arg>-Dmapreduce.reduce.java.opts=-Xmx1024M</arg> <arg>-Dmapreduce.reduce.shuffle.memory.limit.percent=0.03</arg> <arg>-Dmapreduce.reduce.shuffle.parallelcopies=32</arg> <arg>-t</arg> <arg>${task_input2}</arg> <arg>-o</arg> <arg>/topk/content/${task_id}</arg> </java> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>============ ERROR: ${wf:lastErrorNode()}, [${wf:errorMessage(wf:lastErrorNode())}] ============</message> </kill> <end name="end"/> </workflow-app>
二、构建oozie工作目录
/topk/oozie/workflow.xml
/topk/oozie/
db41
lib
(lib中放各种jar包和库文件)
三、构建job.properties
####################################################################### # sys set namenode=namenode jobTracker=jobtracker oozie_url=oozie_url queueName=queue_name email_list=greahuang@163.com # other set sys_name=topk ####################################################################### # control oozie.wf.validate.ForkJoin=false ssh_user=supertool ssh_host=ssh_host ####################################################################### task_id=3737 oozie.wf.application.path=/topk/oozie/workflow.xml
四、oozie java客户端
import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowJob; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Enumeration; import java.util.Properties; /** * @author greahuang */ public class WorkflowClient { public static final String OOZIE_URL = "oozie_url"; private OozieClient oozieClient = new OozieClient(OOZIE_URL); public WorkflowClient() { } /** * @param workflowDefinition * @param workflowParameters * @return - jobId * @throws org.apache.oozie.client.OozieClientException */ public String startJob(Properties properties) throws OozieClientException { // create a workflow job configuration and set the workflow application path Properties configuration = oozieClient.createConfiguration(); Enumeration<?> enumeration = properties.propertyNames(); while (enumeration.hasMoreElements()) { Object element = enumeration.nextElement(); configuration.setProperty(element.toString(), properties.getProperty(element.toString()).toString()); } return oozieClient.run(configuration); } public static void main(String[] args) throws OozieClientException, InterruptedException, IOException { String jobPropertyFile = args[0]; // Create client WorkflowClient client = new WorkflowClient(); // Create parameters Properties properties = new Properties(); properties.load(new FileInputStream(new File(jobPropertyFile))); // Start Oozing String jobId = client.startJob(properties); SysOutLogger.info("jobId: " + jobId); WorkflowJob jobInfo = client.oozieClient.getJobInfo(jobId); SysOutLogger.info("job url: " + jobInfo.getConsoleUrl()); while (true) { Thread.sleep(1000L); WorkflowJob.Status status = client.oozieClient.getJobInfo(jobId).getStatus(); if (status == WorkflowJob.Status.SUCCEEDED || status == WorkflowJob.Status.FAILED || status == WorkflowJob.Status.KILLED) { SysOutLogger.info("Job finish with status: " + status); break; } else { SysOutLogger.info("job running: " + jobInfo.getStatus()); } } } }
五、测试执行:
使用job.propert传入住函数。执行。
相关文章推荐
- 编程方式部署jBPM工作流
- Oozie工作流属性配置的方式与策略
- 编程方式部署jBPM工作流
- 编程方式部署jBPM工作流
- Oozie工作流属性配置的方式与策略
- 使用编程的方式来启动SharePoint的工作流
- Oozie工作流属性配置的方式与策略
- 使用编程的方式来启动SharePoint的工作流
- 使用编程的方式来启动SharePoint的工作流 并传入参数
- 使用编程的方式来启动SharePoint的工作流
- 编程方式获取计算机主板序列号等的实验
- JavaScript---网络编程(9-1)--DHTML技术演示(2-1)-表格创建的几种方式
- EF三种编程方式详细图文教程(C#+EF)之Database First
- [摘自MSDN] ASP.Net2.0学习 [2] 主题 6 :如何:以编程方式应用 ASP.NET 主题
- PHP编程:介绍常见的文件操作方式
- Android网络编程之Http请求服务器数据(POST方式)
- Winsock 同步方式 编程要点
- Android 编程下两种方式注册广播的区别
- Android网络编程之使用post方式提交数据
- 使用网络编程的较好方式