spark从mongodb导入数据到hive
2016-11-15 13:38
459 查看
1、首先添加mongo-spark依赖,官网地址 https://docs.mongodb.com/spark-connector/
2、代码
3、数据加载配置文件格式
{"type":"struct","fields":[{"name":"_id","type":"string","nullable":true,"metadata":{}},{"name":"addTime","type":"long","nullable":true,"metadata":{}},{"name":"articleUrl","type":"string","nullable":true,"metadata":{}},{"name":"author","type":"string","nullable":true,"metadata":{}},{"name":"position","type":"long","nullable":true,"metadata":{}},{"name":"content","type":"string","nullable":true,"metadata":{}},{"name":"coverPlan","type":"string","nullable":true,"metadata":{}},{"name":"dynamics","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"addTime","type":"long","nullable":true,"metadata":{}},{"name":"readNum","type":"long","nullable":true,"metadata":{}},{"name":"likeNum","type":"long","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"readNum","type":"long","nullable":true,"metadata":{}},{"name":"isOriginal","type":"long","nullable":true,"metadata":{}},{"name":"likeNum","type":"long","nullable":true,"metadata":{}},{"name":"pmid","type":"string","nullable":true,"metadata":{}},{"name":"postTime","type":"long","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}},{"name":"hotToken","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"score","type":"string","nullable":true,"metadata":{}},{"name":"token","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"updateTime","type":"long","nullable":true,"metadata":{}}]}
pmid,articleUrl,readNum,likeNum,isOriginal,position,postTime,updateTime,hotToken,title,coverPlan,content
hiveTempTable
insert overwrite table gaia.wxArticleTest partition(time_date) select pmid,articleUrl,readNum,likeNum,isOriginal,position,postTime,updateTime,hotToken,title,coverPlan,content,from_unixtime(substring(updateTime,0,10),'yyyyMMddHH') time_date from hiveTempTable where readNum is not null and isOriginal is not null
{"host":"127.0.0.1:27017","database":"test","collection":"wxArticle"}
<dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.10</artifactId> <version>1.1.0</version> </dependency>
2、代码
object Mongo2Hive { def MongodbToHive(args: Array[String], sc: SparkContext): Unit = { val hiveContext = new HiveContext(sc) val Array(schemaFilePath, queryCondition): Array[String] = args val schemaFile = sc.textFile(schemaFilePath).collect() val Array(schemaStruct, requireFields, tempTableName, sql, mongodbConfig) = schemaFile //MongoDB Config val json = JSON.parseObject(mongodbConfig) //mongodb host val hostList = json.getString("host") val dataBase = json.getString("database") val collection = json.getString("collection") import com.mongodb.spark._ import com.mongodb.spark.config._ // val df = MongoSpark.load(hiveContext, ReadConfig(Map("uri" -> s"mongodb://$hostList/$dataBase.$collection", "partitioner" -> "MongoSplitVectorPartitioner", "partitionKey" -> "updateTime"))) val rdd = sc.loadFromMongoDB(ReadConfig(Map("uri" -> s"mongodb://$hostList/$dataBase.$collection", "partitioner" -> "MongoSplitVectorPartitioner", "partitionKey" -> "updateTime"))) // Uses the ReadConfig //查询添加可以在配置文件中配置 val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { readNum : { $eq: 35} } }"))) //Require Column val requireFieldArrays = requireFields.split(",") val schema = DataType.fromJson(schemaStruct).asInstanceOf[StructType] val rowRdd = aggregatedRdd.toDF(schema).rdd // DataFrame Temp Table hiveContext.createDataFrame(rowRdd, schema).registerTempTable(tempTableName) //HiveContext 将临时表导入到Hive Hive SQL hiveContext.sql("set hive.exec.dynamic.partition=true;") hiveContext.sql("set hive.exec.dynamic.partition.mode=nostrick;") hiveContext.sql(sql) sc.stop() }
3、数据加载配置文件格式
{"type":"struct","fields":[{"name":"_id","type":"string","nullable":true,"metadata":{}},{"name":"addTime","type":"long","nullable":true,"metadata":{}},{"name":"articleUrl","type":"string","nullable":true,"metadata":{}},{"name":"author","type":"string","nullable":true,"metadata":{}},{"name":"position","type":"long","nullable":true,"metadata":{}},{"name":"content","type":"string","nullable":true,"metadata":{}},{"name":"coverPlan","type":"string","nullable":true,"metadata":{}},{"name":"dynamics","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"addTime","type":"long","nullable":true,"metadata":{}},{"name":"readNum","type":"long","nullable":true,"metadata":{}},{"name":"likeNum","type":"long","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"readNum","type":"long","nullable":true,"metadata":{}},{"name":"isOriginal","type":"long","nullable":true,"metadata":{}},{"name":"likeNum","type":"long","nullable":true,"metadata":{}},{"name":"pmid","type":"string","nullable":true,"metadata":{}},{"name":"postTime","type":"long","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}},{"name":"hotToken","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"score","type":"string","nullable":true,"metadata":{}},{"name":"token","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"updateTime","type":"long","nullable":true,"metadata":{}}]}
pmid,articleUrl,readNum,likeNum,isOriginal,position,postTime,updateTime,hotToken,title,coverPlan,content
hiveTempTable
insert overwrite table gaia.wxArticleTest partition(time_date) select pmid,articleUrl,readNum,likeNum,isOriginal,position,postTime,updateTime,hotToken,title,coverPlan,content,from_unixtime(substring(updateTime,0,10),'yyyyMMddHH') time_date from hiveTempTable where readNum is not null and isOriginal is not null
{"host":"127.0.0.1:27017","database":"test","collection":"wxArticle"}
相关文章推荐
- 第76课:Spark SQL基于网站Log的综合案例实战之Hive数据导入、Spark SQL对数据操作每天晚上20:00YY频道现场授课频道68917580
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 虚拟化
- spark 从HIVE读数据导入hbase中发生空指针(java.lang.NullPointerException)问题的解决
- 导入 Mongodb 数据至Hive 方法一
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Java Flume ZooKeeper Kafka Redis MongoDB 机器学习 云计算 视频教程
- MongoDB的数据导入到HDFS上的Hive中记录
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 云计算
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Flume ZooKeeper Kafka Redis MongoDB Java 机器学习 云计算 视频教程
- 导入Mongodb数据到Hive方法二
- mongodb数据导入hbase,spark读取hbase数据分析
- crt下hive 建表、导入数据测试
- Hive 导入导出数据
- sqoop导入数据到hive
- mongodb中导入地名数据
- Mongodb数据导入导出以及备份恢复
- hive数据导入
- Hive数据导入
- hive 创建表和导入数据
- 定时将数据导入到hive中遇到环境变量无法识别
- mongodb下数据的导出和导入