您的位置:首页 > 数据库 > Mongodb

MongoDB与Hadoop技术栈的整合应用

2016-06-12 19:31 495 查看
一般场景处理

[MongoDB Connector for Hadoop](#MongoDB Connector for Hadoop)

部署安装

[Hive Usage](#Hive Usage)

MongoDB-based方式

BSON-based方式

数据序列化

如何选择BSON还是直连MongoDB?

最近在研究Mongo和Hadoop技术栈结合使用的场景,抽空整理一下.

文章内容比较适用于使用MongoDB作为后端业务数据库,使用hadoop平台作为数据平台的场景.

一般情况下当数据相关的业务越来越多的时候,我们都不会选择在mongo中进行一些数据分析的事情.

更好的选择是同步到数据仓库中统一处理供数据分析及数据挖掘,我们的之前选择的是用mongoexport工具从secondary节点导出数据.

后面再进行一系列的数据处理的工作. 整个的pipeline这样:

一般场景处理

[MongoDB secondary node]

–> [local filesystem] 使用mongoexport/js/MongoClient读取数据

–> [数据存储系统] 上传/数据集成

–> [数据仓库] 数据清洗

这样的情景下重复劳动会比较多,譬如从一个collection中各自抽取出不同的数据,

可能要写几个脚本,对于mongoexport来导出Array等也很麻烦。

下面以一段简单的sample,描述上上述的pipeline过程,sample数据如下

db.ldc_test.insert({
"id":"@Tony_老七"
,"fav_id":[3,33,333,3333,33333]
,"info":{
"github":"https://github.com/tonylee0329"
,"location":"SH/CN"
}
})


采用mongoexport导出的数据

mongoexport -k -h data_mongo:29017 -q "{}" -d test -c ldc_test --type csv -o /tmp/info -f id,fav_id,info.github,info.location

导出结果为:
╰─$ cat /tmp/info
id,fav_id,info.github,info.location
@Tony_老七,"[3.0,33.0,333.0,3333.0,33333.0]",https://github.com/tonylee0329,SH/CN


如果我们使用到了fav_id的信息的时候,在RDBMS中无法做很好的数组打开操作,这时我们可能需要写单独的脚本处理(python or js)

js脚本导出mongo数据

var rs = db.ldc_test.find();

var fields = "id,fav_id,info_github,info_location";

rs.forEach(function(l) {
for(var fid in l.fav_id) {
line = [l.id, l.fav_id[fid], l.info.github, l.info.location];
print(line.join(","))
};
})

mongo --quiet data_mongo:29017/test  ~/extract.js

导出结果为:
╰─$ mongo --quiet data_mongo:29017/test mongo.js
@Tony_老七,3,https://github.com/tonylee0329,SH/CN
@Tony_老七,33,https://github.com/tonylee0329,SH/CN
@Tony_老七,333,https://github.com/tonylee0329,SH/CN
@Tony_老七,3333,https://github.com/tonylee0329,SH/CN
@Tony_老七,33333,https://github.com/tonylee0329,SH/CN


所以如果所需要同步的数据中存在数组形式的,当我们要不同level数据,我们可能需要多个不同的脚本来维护。

MongoDB Connector for Hadoop

针对Hadoop作为存储的技术选型,研究了下github上MongoDB官方项目,觉得很是适合数据相关的应用来做数据同步。

github的项目地址: mongo-hadoop

项目是使用MongoDB存储的原生bson数据,在之上做的根据数据类型来做数据序列化和反序列化,

目前Hadoop上数据处理使用最多的应该是Hive了,我们以Hive来看看如何使用

部署安装

依赖jar文件到Hive/Hadoop的classpath环境下,涉及以下几个文件:

mongo-hadoop-core-1.5.2.jar

mongo-hadoop-hive-1.5.2.jar

mongo-java-driver-3.2.2.jar

Hadoop我们使用的是CDH的软件包,请注意放到
${HADOOP_HOME}/share/hadoop/common
,

Hadoop Client程序启动加载classpath脚本现在太脏了,之前放到
${HADOOP_HOME/share/hadoop/tools/lib}
不work,

Hadoop2.0项目拆分后,变成了hadoop-common、hadoop-hdfs、hadoop-yarn等几个主要的module,但是common是公共的library、RPC等,

推断Hadoop Client执行必定会加载。

Hive的比较简单,直接部署到每个节点的
${HIVE_HOME}/lib
即可.

MongoDB这块建议是部署一个mongo hidden节点,供读操作.我们目前的MongoDB是按照Replica Set模式部署,

客户端是通过
mongos
服务访问secondary节点.Hidden角色节点其实跟普通的secondary没有太大不同,

数据还是异步地从primary节点同步过来,只是它不参与leader选举的投票,而且它不可能被切换成leader.

详细介绍可以参考 Hidden角色部署

Hive Usage

有两种连接方式:

其一MongoDB-based 直接连接hidden节点,使用
com.mongodb.hadoop.hive.MongoStorageHandler
做数据Serde

其二BSON-based 将数据dump成bson文件,上传到HDFS系统,使用
com.mongodb.hadoop.hive.BSONSerDe


下面分别介绍两种方式的使用,

MongoDB-based方式

较简单,quick demo如下:

CREATE TABLE temp.ldc_test_mongo
(
id string,
fav_id array<int>,
info struct<github:string, location:string>
)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"id","fav_id":"fav_id","info.github":"info.github","info.location":"info.location"}')
TBLPROPERTIES('mongo.uri'='mongodb://datatask01:29017/test.ldc_test')
;


BSON-based方式

BSON-based需要先将数据dump出来,但这个时候的dump与export不一样,不需要关心具体的数据内容,不需要指定fields list.

mongodump --host=datatask01:29017 --db=test --collection=ldc_test --out=/tmp
hdfs dfs -mkdir /dev_test/dli/bson_demo/
hdfs dfs -put /tmp/test/ldc_test.bson /dev_test/dli/bson_demo/
- 创建映射表
create external table temp.ldc_test_bson
(
id string,
fav_id array<int>,
info struct<github:string, location:string>
)
ROW FORMAT SERDE "com.mongodb.hadoop.hive.BSONSerDe"
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"id","fav_id":"fav_id","info.github":"info.github","info.location":"info.location"}')
STORED AS INPUTFORMAT "com.mongodb.hadoop.mapred.BSONFileInputFormat"
OUTPUTFORMAT "com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat"
location '/dev_test/dli/bson_demo/'
;


OK,我们先来看下query的结果

0: jdbc:hive2://hd-cosmos-01:10000/default> select * from temp.ldc_test_mongo;
+--------------------+------------------------+-----------------------------------------------------------------+--+
| ldc_test_mongo.id  | ldc_test_mongo.fav_id  |                       ldc_test_mongo.info                       |
+--------------------+------------------------+-----------------------------------------------------------------+--+
| @Tony_老七           | [3,33,333,3333,33333]  | {"github":"https://github.com/tonylee0329","location":"SH/CN"}  |
+--------------------+------------------------+-----------------------------------------------------------------+--+
1 row selected (0.345 seconds)


这样数据就存储在一个table,使用中如果需要打开数组,可以这样

SELECT id, fid
FROM temp.ldc_test_mongo LATERAL VIEW explode(fav_id) favids AS fid;
-- 访问struct结构数据
select id, info.github from temp.ldc_test_mongo


数据序列化

Hive Data TypeMongoDB Object
STRUCTdict
ARRAYdict
ARRAYarray
STRUCTObjectId(对应string也可以)
TimestampISODate()
booleanboolean
//根据不同的数据类型进行反序列操作,复杂类型在内容做element的循环,最终调用的都是对原子类型的操作.
public Object deserializeField(final Object value, final TypeInfo valueTypeInfo, final String ext) {
if (value != null) {
switch (valueTypeInfo.getCategory()) {
case LIST:
return deserializeList(value, (ListTypeInfo) valueTypeInfo, ext);
case MAP:
return deserializeMap(value, (MapTypeInfo) valueTypeInfo, ext);
case PRIMITIVE:
return deserializePrimitive(value, (PrimitiveTypeInfo) valueTypeInfo);
case STRUCT:
// Supports both struct and map, but should use struct
return deserializeStruct(value, (StructTypeInfo) valueTypeInfo, ext);
case UNION:
// Mongo also has no union
LOG.warn("BSONSerDe does not support unions.");
return null;
default:
// Must be an unknown (a Mongo specific type)
return deserializeMongoType(value);
}
}
return null;
}

// 转为java的原子类型存储.
private Object deserializePrimitive(final Object value, final PrimitiveTypeInfo valueTypeInfo) {
switch (valueTypeInfo.getPrimitiveCategory()) {
case BINARY:
return value;
case BOOLEAN:
return value;
case DOUBLE:
return ((Number) value).doubleValue();
case FLOAT:
return ((Number) value).floatValue();
case INT:
return ((Number) value).intValue();
case LONG:
return ((Number) value).longValue();
case SHORT:
return ((Number) value).shortValue();
case STRING:
return value.toString();
case TIMESTAMP:
if (value instanceof Date) {
return new Timestamp(((Date) value).getTime());
} else if (value instanceof BSONTimestamp) {
return new Timestamp(((BSONTimestamp) value).getTime() * 1000L);
} else if (value instanceof String) {
return Timestamp.valueOf((String) value);
} else {
return value;
}
default:
return deserializeMongoType(value);
}
}


如何选择BSON还是直连MongoDB?

官方是不建议使用直连方式的

Reasons for not using a StorageHandler for BSON

Using a StorageHandler will mark the table as non-native. Data cannot be loaded into the table directory from a “LOAD DATA” command.

Instead, the data can only come from the result of a query.

Files are only stored as a single output file, not one per reduce call. This will cause much lock leasing problems.

INSERT INTO a StorageHandler table behaves as INSERT OVERWRITE.

个人想法:

BSON访问方式比较适合offline计算

直连方式适用于小数据集的
实时同步
,至于Hive内部表drop的时候,可以通过权限控制使得hidden节点无法操作.

参考链接

Hive Usage

MongoDB Connector for hadoop FAQ

Integration of Hadoop and MongoDB
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mongodb hadoop