您的位置:首页 > 数据库 > Mongodb

spark从mongodb导入数据到hive

2016-11-15 13:38 459 查看
1、首先添加mongo-spark依赖,官网地址 https://docs.mongodb.com/spark-connector/
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.10</artifactId>
<version>1.1.0</version>
</dependency>

2、代码

object Mongo2Hive {

def MongodbToHive(args: Array[String], sc: SparkContext): Unit = {

val hiveContext = new HiveContext(sc)

val Array(schemaFilePath, queryCondition): Array[String] = args

val schemaFile = sc.textFile(schemaFilePath).collect()

val Array(schemaStruct, requireFields, tempTableName, sql, mongodbConfig) = schemaFile

//MongoDB Config
val json = JSON.parseObject(mongodbConfig)

//mongodb host
val hostList = json.getString("host")
val dataBase = json.getString("database")
val collection = json.getString("collection")

import com.mongodb.spark._
import com.mongodb.spark.config._

//    val df = MongoSpark.load(hiveContext, ReadConfig(Map("uri" -> s"mongodb://$hostList/$dataBase.$collection", "partitioner" -> "MongoSplitVectorPartitioner", "partitionKey" -> "updateTime")))
val rdd = sc.loadFromMongoDB(ReadConfig(Map("uri" -> s"mongodb://$hostList/$dataBase.$collection", "partitioner" -> "MongoSplitVectorPartitioner", "partitionKey" -> "updateTime"))) // Uses the ReadConfig
//查询添加可以在配置文件中配置
val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { readNum : { $eq: 35} } }")))

//Require Column
val requireFieldArrays = requireFields.split(",")

val schema = DataType.fromJson(schemaStruct).asInstanceOf[StructType]
val rowRdd = aggregatedRdd.toDF(schema).rdd

// DataFrame Temp Table
hiveContext.createDataFrame(rowRdd, schema).registerTempTable(tempTableName)

//HiveContext 将临时表导入到Hive   Hive SQL
hiveContext.sql("set hive.exec.dynamic.partition=true;")
hiveContext.sql("set hive.exec.dynamic.partition.mode=nostrick;")
hiveContext.sql(sql)
sc.stop()
}

3、数据加载配置文件格式

{"type":"struct","fields":[{"name":"_id","type":"string","nullable":true,"metadata":{}},{"name":"addTime","type":"long","nullable":true,"metadata":{}},{"name":"articleUrl","type":"string","nullable":true,"metadata":{}},{"name":"author","type":"string","nullable":true,"metadata":{}},{"name":"position","type":"long","nullable":true,"metadata":{}},{"name":"content","type":"string","nullable":true,"metadata":{}},{"name":"coverPlan","type":"string","nullable":true,"metadata":{}},{"name":"dynamics","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"addTime","type":"long","nullable":true,"metadata":{}},{"name":"readNum","type":"long","nullable":true,"metadata":{}},{"name":"likeNum","type":"long","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"readNum","type":"long","nullable":true,"metadata":{}},{"name":"isOriginal","type":"long","nullable":true,"metadata":{}},{"name":"likeNum","type":"long","nullable":true,"metadata":{}},{"name":"pmid","type":"string","nullable":true,"metadata":{}},{"name":"postTime","type":"long","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}},{"name":"hotToken","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"score","type":"string","nullable":true,"metadata":{}},{"name":"token","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"updateTime","type":"long","nullable":true,"metadata":{}}]}
pmid,articleUrl,readNum,likeNum,isOriginal,position,postTime,updateTime,hotToken,title,coverPlan,content
hiveTempTable
insert overwrite table gaia.wxArticleTest partition(time_date) select pmid,articleUrl,readNum,likeNum,isOriginal,position,postTime,updateTime,hotToken,title,coverPlan,content,from_unixtime(substring(updateTime,0,10),'yyyyMMddHH') time_date from hiveTempTable where readNum is not null and isOriginal is not null
{"host":"127.0.0.1:27017","database":"test","collection":"wxArticle"}





                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: