实战hadoop海量数据处理系列 01:数据导入篇
2017-06-07 08:59
399 查看
实战hadoop海量数据处理系列 01:数据导入篇
本文假设读者已经按照范老师的书搭建好了eclipse环境,并且已经导入myBi文件夹下面的子工程。在阅读本文前,强烈建议阅读原书“实现数据导入导出模块”章节。
本文的代码同步于https://github.com/titer1/Play_HadoopFelix
1 项目结构图
借用范老师的图,这是全篇的重要点,本文重要import, export的内容可以触类旁通。原图
![](http://7xjs3n.com1.z0.glb.clouddn.com/hadoop_import_work_flow2.gif)
2 项目核心代码
这里是import.py的代码,作者已经详细注释,请认真阅读# -*- coding:UTF-8 -*-s ''' Created on 2014-8-26 @author: Administrator ''' from com.util.pro_env import * from xml.etree import ElementTree from com.util.sqoop import SqoopUtil import sys #其中dt为昨天的日期,将由调度模块传入 def resolve_conf(dt): #获得配置文件名 conf_file = PROJECT_CONF_DIR + "Import.xml" #解析配置文件 xml_tree = ElementTree.parse(conf_file) #获得pras元素 tasks = xml_tree.findall('./task') for task in tasks: #获得导入类型,增量导入或者全量导入 import_type = task.attrib["type"] #获得表名集合 tables = task.findall('./table') #用来保存待执行的Sqoop命令的集合 cmds = [] #迭代表名集合,解析表配置文件 for i in range(len(tables)): #表名 table_name = tables[i].text #表配置文件名 table_conf_file = PROJECT_CONF_DIR + table_name + ".xml" #解析表配置文件 xmlTree = ElementTree.parse(table_conf_file) #获取sqoop-shell节点 sqoopNodes = xmlTree.findall("./sqoop-shell") #获取sqoop命令类型 sqoop_cmd_type = sqoopNodes[0].attrib["type"] #获取 praNodes = sqoopNodes[0].findall("./param") #用来保存param的信息的字典 cmap = {} for i in range(len(praNodes)): #获得key属性的值 key = praNodes[i].attrib["key"] #获得param标签中间的值 value = praNodes[i].text #保存到字典中 cmap[key] = value #首先组装成sqoop命令头 command = "sqoop " + "--" + sqoop_cmd_type #如果为全量导入 if (import_type == "all"): import_condition = "< " + dt #如果为增量导入 elif (import_type == "add"): import_condition = "= " + dt else: raise Exception ##迭代字典将param的信息拼装成字符串 for key in cmap.keys(): value = cmap[key] #如果不是键值对形式的命令选项 if(value == None or value == "" or value == " "): value = "" #将query的CONDITIONS替换为查询条件 if(key == "query"): value = value.replace("\$CONDITIONS", import_condition) #将导入分区替换为传入的时间 if(key == "hive-partition-value"): value = value.replace("$dt", dt) #拼装为命令 command += " --" + key + " " + value + "\n" #将命令加入至待执行命令集合 cmds.append(command) return cmds if __name__ == '__main__': #调度模块将昨天的时间传入 dt = "2017-06-06" #解析配置文件,获得sqoop命令集合 cmds = resolve_conf(dt) #迭代集合,执行命令 for i in range(len(cmds)): cmd = cmds[i] print cmd #执行导入过程 #SqoopUtil.execute_shell(cmd) ##66 在linux 上打开这段进行执行
3 运行动态图:windows模拟端
有图有真相![](http://7xjs3n.com1.z0.glb.clouddn.com/hadoop_import_v1.gif)
4 运行动态图:Centos
待更新其他 原书代码工程解读
拿到作者工程第一步,先要对工程的结构进行分析。首先,工程领域压缩包里面有两个分类,一个是基础篇,第二个是实战篇,本系列的着力点在实战篇。myBi是实战篇的基础,建议小伙伴们首先看这个工程。如果大家有后续问题,可以帖子后留言,我会后续跟进.
相关文章推荐
- 实战hadoop海量数据处理系列05 : 实现点击流日志的数据清洗模块
- 实战hadoop海量数据处理系列03 :数据仓库的设计
- 实战hadoop海量数据处理系列04预热篇:窗函数row_number 从理论到实践
- 实战hadoop海量数据处理系列02 番外篇: 在linux上使用hql执行工具 | hive排错记录
- 实战hadoop海量数据处理系列:序
- 实战hadoop海量数据处理系列02: hql执行工具
- 云星数据---Scala实战系列(精品版)】:Scala入门教程033-Scala实战源码-Scala apply方法01 调用
- WCF+Ef实战系列二:EF实体的构建及数据业务层的处理
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink实战基础002--flink特性:流处理特性介绍
- 【Web API系列教程】3.4 — 实战:处理数据(处理实体关系)
- Hadoop大数据零基础高端实战培训系列配文本挖掘项目
- 大数据处理技术 - 基于Hadoop的实战
- Hadoop系列之一:大数据存储及处理平台产生的背景
- 大数据处理技术 - 基于Hadoop的实战培训
- 大数据处理技术 - 基于Hadoop的实战
- Hadoop大数据零基础高端实战培训系列配文本挖掘项目
- 大数据处理技术-基于Hadoop/Yarn的大数据技术实战
- 1 大数据实战系列-spark+hadoop集成环境搭建
- 云星数据---Scala实战系列(精品版)】:Scala入门教程022-Scala实战源码-Scala 多态01
- 云星数据---Scala实战系列(精品版)】:Scala入门教程050-Scala实战源码-Scala implicit 操作01