您的位置:首页 > 其它

Morhpline 简单应用介绍

2016-07-07 13:42 363 查看
Morphline Commands是Cloudera Search项目的一部分,实现了Flume、MapReduce、HBase、Spark到Apache Solr的数据ETL。目前网上文档主要介绍Morphline在Flume Interceptor和Flume Sink中应用,本文主要介绍Morphline的基本概念以及如何在Java程序中调用Morphline做数据转化,也就是Morphine的Standalone应用模式。

Kite Morphline是一个Morphline版本,将Morphline应用到除Search外的数据处理中,发布了丰富的库、工具、样例、文档。
Kite Morphline支持

•Flumeevents,
•HDFSfiles,
•SparkRDDs,
•RDBMStables
•Avroobjects
已经应用到Crunch、HBase、Impala、Pig、Hive、Sqoop等

Morphline重要概念

Commands和Record是Morphine处理数据的两个重要概念,其英文解释如下。
(1)Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record.

Commands的典型示例为 readLine, grok, convertTimestamp, sanitizeUnknownSolrFields, logInfo, loadSolr

(2)Record is an in-memory data structure of name-value pairs (Record)with optional blob attachments or POJO attachments. 

Morphline配置文件

Morphine 几乎所有的操作都通过配置文件实现, 在配置文件中将多个Commands组成一个链,前一个Command的结果传递给下一个Command,依据Coomand执行具体的数据转化或写入应用数据仓库。

morphlines : [
{
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
commands : [
{
# Parse input attachment and emit a record for each input line
readLine {
charset : UTF-8
}
}

{
grok {
dictionaryFiles : [src/test/resources/grok-dictionaries]
expressions : {
message : """<%{POSINT:priority}>%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:msg}"""
}
}
}

# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : timestamp
inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", ""MMM d HH:mm:ss"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : UTC
}
}

{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : {
collection : collection1       # Name of solr collection
zkHost : "127.0.0.1:2181/solr" # ZooKeeper ensemble
}
}
}

# log the record at INFO level to SLF4J
{ logInfo { format : "output record: {}", args : ["@{}"] } }

# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : {
collection : collection1       # Name of solr collection
zkHost : "127.0.0.1:2181/solr" # ZooKeeper ensemble
}
}
}
]
}
]


Morphine Standalone调用

以下为调用Morphline做数据转化的Java代码,可以看到Morphine的透明性很好,将所以的涉及具体转化操作都通过配置文件实现,在程序层面看不到任何具体数据类型,有效的隔离了数据处理和具体数据类型。
/** Usage: java ... ... */
public static void main(String[] args) {
// compile morphline.conf file on the fly
File configFile = new File(args[0]);
MorphlineContext context = new MorphlineContext.Builder().build();
Command morphline = new Compiler().compile(configFile, null, context, null);

// process each input data file
Notifications.notifyBeginTransaction(morphline);
for (int i = 1; i < args.length; i++) {
InputStream in = new FileInputStream(new File(args[i]));
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, in);
morphline.process(record);
in.close();
}
Notifications.notifyCommitTransaction(morphline);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息