Morhpline 简单应用介绍
2016-07-07 13:42
363 查看
Morphline Commands是Cloudera Search项目的一部分,实现了Flume、MapReduce、HBase、Spark到Apache Solr的数据ETL。目前网上文档主要介绍Morphline在Flume Interceptor和Flume Sink中应用,本文主要介绍Morphline的基本概念以及如何在Java程序中调用Morphline做数据转化,也就是Morphine的Standalone应用模式。
Kite Morphline是一个Morphline版本,将Morphline应用到除Search外的数据处理中,发布了丰富的库、工具、样例、文档。
Kite Morphline支持
•Flumeevents,
•HDFSfiles,
•SparkRDDs,
•RDBMStables
•Avroobjects
已经应用到Crunch、HBase、Impala、Pig、Hive、Sqoop等
(1)Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record.
Commands的典型示例为 readLine, grok, convertTimestamp, sanitizeUnknownSolrFields, logInfo, loadSolr
(2)Record is an in-memory data structure of name-value pairs (Record)with optional blob attachments or POJO attachments.
/** Usage: java ... ... */
public static void main(String[] args) {
// compile morphline.conf file on the fly
File configFile = new File(args[0]);
MorphlineContext context = new MorphlineContext.Builder().build();
Command morphline = new Compiler().compile(configFile, null, context, null);
// process each input data file
Notifications.notifyBeginTransaction(morphline);
for (int i = 1; i < args.length; i++) {
InputStream in = new FileInputStream(new File(args[i]));
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
morphline.process(record);
in.close();
}
Notifications.notifyCommitTransaction(morphline);
}
Kite Morphline是一个Morphline版本,将Morphline应用到除Search外的数据处理中,发布了丰富的库、工具、样例、文档。
Kite Morphline支持
•Flumeevents,
•HDFSfiles,
•SparkRDDs,
•RDBMStables
•Avroobjects
已经应用到Crunch、HBase、Impala、Pig、Hive、Sqoop等
Morphline重要概念
Commands和Record是Morphine处理数据的两个重要概念,其英文解释如下。(1)Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record.
Commands的典型示例为 readLine, grok, convertTimestamp, sanitizeUnknownSolrFields, logInfo, loadSolr
(2)Record is an in-memory data structure of name-value pairs (Record)with optional blob attachments or POJO attachments.
Morphline配置文件
Morphine 几乎所有的操作都通过配置文件实现, 在配置文件中将多个Commands组成一个链,前一个Command的结果传递给下一个Command,依据Coomand执行具体的数据转化或写入应用数据仓库。morphlines : [ { importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ { # Parse input attachment and emit a record for each input line readLine { charset : UTF-8 } } { grok { dictionaryFiles : [src/test/resources/grok-dictionaries] expressions : { message : """<%{POSINT:priority}>%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:msg}""" } } } # convert timestamp field to native Solr timestamp format # e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z { convertTimestamp { field : timestamp inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", ""MMM d HH:mm:ss"] inputTimezone : America/Los_Angeles outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" outputTimezone : UTC } } { sanitizeUnknownSolrFields { # Location from which to fetch Solr schema solrLocator : { collection : collection1 # Name of solr collection zkHost : "127.0.0.1:2181/solr" # ZooKeeper ensemble } } } # log the record at INFO level to SLF4J { logInfo { format : "output record: {}", args : ["@{}"] } } # load the record into a Solr server or MapReduce Reducer { loadSolr { solrLocator : { collection : collection1 # Name of solr collection zkHost : "127.0.0.1:2181/solr" # ZooKeeper ensemble } } } ] } ]
Morphine Standalone调用
以下为调用Morphline做数据转化的Java代码,可以看到Morphine的透明性很好,将所以的涉及具体转化操作都通过配置文件实现,在程序层面看不到任何具体数据类型,有效的隔离了数据处理和具体数据类型。/** Usage: java ... ... */
public static void main(String[] args) {
// compile morphline.conf file on the fly
File configFile = new File(args[0]);
MorphlineContext context = new MorphlineContext.Builder().build();
Command morphline = new Compiler().compile(configFile, null, context, null);
// process each input data file
Notifications.notifyBeginTransaction(morphline);
for (int i = 1; i < args.length; i++) {
InputStream in = new FileInputStream(new File(args[i]));
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
morphline.process(record);
in.close();
}
Notifications.notifyCommitTransaction(morphline);
}
相关文章推荐
- Cloudera 推动即时通讯巨头 LINE 实现数据驱动的创新
- 我是运营,我没有假期
- DB2数据库的安装
- C#实现把指定数据写入串口
- “传奇”图象数据存储方式
- 修复mysql数据库
- 浅析SQL数据操作语句
- SQLServer 数据导入导出的几种方法小结
- 简述MySQL分片中快速数据迁移
- MySQL数据备份之mysqldump的使用详解
- C#实现窗体间传递数据实例
- C#中的委托数据类型简介
- SQL Server删除表及删除表中数据的方法
- SqlServer2008误操作数据(delete或者update)后恢复数据的方法
- 给你的数据库文件减肥
- Oracle数据更改后出错的解决方法
- Oracle数据库数据丢失恢复的几种方法总结
- C#将Sql数据保存到Excel文件中的方法
- C#实例代码之抽奖升级版可以经表格数据导入数据库,抽奖设置,补抽
- SQL Server简单实现数据的日报和月报功能