您的位置:首页 > 其它

hive与es之间实现数据交互

2016-03-26 10:45 471 查看
1、环境描述:

hadoop集群环境:hadoop-2.6.0;3台集群环境

hbase集群环境:habase-1.1.2 ;3台集群环境

hive环境:hive-1.2.1;1台测试环境

elasticsearch:elasticsearch-1.7.1测试环境

2、下载hive与es之间数据交互的插件。

说明:如果用ElasticSearch版本为2.1.0,必须使用elasticsearch-hadoop-2.2.0才能支持,如果ES版本低于2.1.0,可以使用elasticsearch-hadoop-2.1.2,在本次实验中我选择的是elasticsearch-hadoop-2.1.2。

3、注册elasticsearch-hadoop-2.1.2:

hive> add jar file:///home/hadoop/xuguokun/elasticsearch-hadoop-2.2.0.jar;
4、在es端创建表索引创建

package com.test.file2es;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Date;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import com.test.utils.DateUtils;

public class ReadFile2Es1 {

public static void main(String[] args) {

String filePath = "E:\\大数据相关资料\\TestData\\ESData";//这里替换成205服务器上测试数据的存放根目录

@SuppressWarnings("resource")
Client client = new TransportClient().addTransportAddresses(new InetSocketTransportAddress("192.168.174.130", 9300));

try{

bianli(filePath,client);
System.out.println("遍历结束");

}catch(Exception e){

e.printStackTrace();
System.out.println("error");
}
}

public static void bianli(String path,Client client) {

File file = new File(path);

if (file.exists()) {

File[] files = file.listFiles();

if (files.length == 0) {

System.out.println("文件夹是空的!");
return;

} else {

for (File file2 : files) {

if (file2.isDirectory()) {

System.out.println("文件夹:" + file2.getAbsolutePath());
bianli(file2.getAbsolutePath(),client);

} else {

FileReader fr;

try {

fr = new FileReader(file2.getAbsolutePath());
BufferedReader br = new BufferedReader(fr);
String line = "";

IndexResponse indexResponse = null;

while ((line = br.readLine()) != null) {

if(line!=null&&!line.equals("")){

String[] str = line.split(",");

indexResponse = client.prepareIndex("hive2es", "info")
.setSource(jsonBuilder()
.startObject()
.field("area", str[0])
.field("media_view_tags",str[1])
.field("interest",str[2])
.endObject())
.execute()
.actionGet();
}
}

br.close();
fr.close();

} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();
}
//System.out.println("文件:" + file2.getAbsolutePath());
}
}
}
} else {
System.out.println("文件不存在!");
}
}

public static String getRealData(String str[]){

if (str.length == 38) {

String realData = "";

for(int i = 0; i < 38;i++){

if((str[i].substring(1, str[i].length() - 1).equals("null"))||(str[i].substring(1, str[i].length() - 1)).equals("")){

realData = realData + "null";

}
else{

realData =  realData + str[i].substring(1, str[i].length() - 1);
}

if(i!=37){

realData = realData + ",";
}

}

return realData;

}
else {

return "格式不正确";
}

}
}


注意:当前步骤es读取的本地测试数据如下所示:

beijing,diannaokeji,kejiwang

5、在hive端创建表hive_es

CREATE EXTERNAL TABLE hive_es (cookieid string,area string,media_view_tags string,interest string )STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.nodes' = '192.168.174.130:9200','es.index.auto.create' = 'false','es.resource' = 'hive2es/info','es.read.metadata' = 'true','es.mapping.names' = 'cookieid:_metadata._id, area:area, media_view_tags:media_view_tags, interest:interest');
6、在hive端检索数据

hive> select * from hive_es;
OK
AVOwmuAVAOB0VDYE1GoM	beijing	diannaokeji	kejiwang
Time taken: 0.126 seconds, Fetched: 1 row(s)
7、以上实现了hive读取es中的数据,下面从hive端导入数据。

本步骤的测试数据如下:

1,shanghai,diannaokeji,kejiwang

8、创建hive本地表

CREATE EXTERNAL TABLE hive_es_native (cookieid string,area string,media_view_tags string,interest string ) row format delimited  fields terminated by ',';
9、向hive本地表

hive_es_native中导入数据,导入方法是:

hive> load data local inpath '/home/hadoop/xuguokun/test.txt' overwrite into table hive_es_native;
Loading data to table mydatabase.hive_es_native
Table mydatabase.hive_es_native stats: [numFiles=1, numRows=0, totalSize=32, rawDataSize=0]
OK
Time taken: 0.51 seconds
10.通过hive向es中写入数据,并查看最终hive_es中的数据
hive> insert overwrite table hive_es select * from hive_es_native;
Query ID = hadoop_20160325192049_caab7883-e8ee-4fc7-9d01-cf34d2ee6473
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1458955914553_0002, Tracking URL = http://Master:8088/proxy/application_1458955914553_0002/ Kill Command = /usr/local/hadoop/bin/hadoop job  -kill job_1458955914553_0002
Hadoop job information for Stage-0: number of mappers: 1; number of reducers: 0
2016-03-25 19:21:09,038 Stage-0 map = 0%,  reduce = 0%
2016-03-25 19:21:22,042 Stage-0 map = 100%,  reduce = 0%, Cumulative CPU 2.63 sec
MapReduce Total cumulative CPU time: 2 seconds 630 msec
Ended Job = job_1458955914553_0002
MapReduce Jobs Launched:
Stage-Stage-0: Map: 1   Cumulative CPU: 2.63 sec   HDFS Read: 4160 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 630 msec
OK
Time taken: 36.247 seconds
hive> select * from hive_es;
OK
AVOwucV0AOB0VDYE1GoX	shanghai	diannaokeji	kejiwang
AVOwmuAVAOB0VDYE1GoM	beijing	diannaokeji	kejiwang
Time taken: 0.126 seconds, Fetched: 2 row(s)
11、实验到此结束
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: