您的位置:首页 > 其它

Hbase结合MapReduce的批量导入

2014-04-30 00:00 357 查看
对以下手机流量信息进行模拟导入,放置到HDFS文件系统input文件夹下

136315798506613726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200

136315799505213826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200

136315799107613926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200

136315440002213926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200

136315799304418211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200

136315799507484138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200

136315799305513560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200

136315799503315920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200

136315798301913719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200

136315798404113660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200

136315797309815013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200

136315798602915989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200

136315799209313560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200

136315798604113480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200

136315798404013602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200

136315799509313922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200

136315798204013502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200

136315798607218320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200

136315799004313925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200

136315798807213760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200

136315798507913823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200

136315798506913600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200

01
package
Hbase;
02
03
import
java.io.IOException;
04
import
java.text.SimpleDateFormat;
05
import
java.util.Date;
06
07
import
org.apache.hadoop.conf.Configuration;
08
import
org.apache.hadoop.hbase.client.Put;
09
import
org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
10
import
org.apache.hadoop.hbase.mapreduce.TableReducer;
11
import
org.apache.hadoop.hbase.util.Bytes;
12
import
org.apache.hadoop.io.LongWritable;
13
import
org.apache.hadoop.io.NullWritable;
14
import
org.apache.hadoop.io.Text;
15
import
org.apache.hadoop.mapreduce.Counter;
16
import
org.apache.hadoop.mapreduce.Job;
17
import
org.apache.hadoop.mapreduce.Mapper;
18
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
19
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
20
21
public
class
BatchImport {
22
static
class
BatchImportMapper
extends
Mapper<LongWritable,Text,LongWritable,Text>{
23
SimpleDateFormat dateformat1=
new
SimpleDateFormat(
"yyyyMMddHHmmss"
);
24
Text v2 =
new
Text();
25
26
protected
void
map(LongWritable key,Text value,Context context)
throws
java.io.IOException ,InterruptedException {
27
final
String[] splited = value.toString().split(
"\t"
);
28
try
{
29
final
Date date =
new
Date(Long.parseLong(splited[
0
].trim()));
30
final
String dateFormat = dateformat1.format(date);
31
String rowKey = splited[
1
]+
":"
+dateFormat;
32
v2.set(rowKey+
"\t"
+value.toString());
33
context.write(key,v2);
34
}
catch
(NumberFormatException e) {
35
final
Counter counter = context.getCounter(
"BatchImport"
,
"ErrorFormat"
);
36
counter.increment(1L);
37
System.out.println(
"出错了"
+splited[
0
]+
" "
+e.getMessage());
38
}
39
};
40
}
41
42
static
class
BatchImportReducer
extends
TableReducer<LongWritable,Text,NullWritable>{
43
protected
void
reduce(LongWritable key,java.lang.Iterable<Text> values,Context context) 
throws
java.io.IOException ,InterruptedException {
44
for
(Text text : values) {
45
final
String[] splited = text.toString().split(
"\t"
);
46
47
final
Put put =
new
Put(Bytes.toBytes(splited[
0
]));
48
put.add(Bytes.toBytes(
"cf"
),Bytes.toBytes(
"date"
),Bytes.toBytes(splited[
1
]));
49
put.add(Bytes.toBytes(
"cf"
),Bytes.toBytes(
"msisdn"
),Bytes.toBytes(splited[
2
]));
50
put.add(Bytes.toBytes(
"cf"
),Bytes.toBytes(
"apmac"
),Bytes.toBytes(splited[
3
]));
51
//省略其他字段,调用put.add(....)即可
52
context.write(NullWritable.get(),put);
53
}
54
};
55
}
56
57
public
static
void
main(String[] args)
throws
Exception {
58
59
60
final
Configuration configuration =
new
Configuration();
61
//设置zookeeper
62
configuration.set(
"hbase.zookeeper.quorum"
,
"hadoop"
);
63
//设置hbase表名称
64
configuration.set(TableOutputFormat.OUTPUT_TABLE,
"wlan_log"
);
65
//将该值改大,防止hbase超时退出
66
configuration.set(
"dfs.socket.timeout"
,
"180000"
);
67
68
final
Job job =
new
Job(configuration,
"HBaseBatchImport"
);
69
70
job.setMapperClass(BatchImportMapper.
class
);
71
job.setReducerClass(BatchImportReducer.
class
);
72
//设置map的输出,不设置reduce的输出类型
73
job.setMapOutputKeyClass(LongWritable.
class
);
74
job.setMapOutputValueClass(Text.
class
);
75
76
job.setInputFormatClass(TextInputFormat.
class
);
77
//不再设置输出路径,而是设置输出格式类型
78
job.setOutputFormatClass(TableOutputFormat.
class
);
79
80
FileInputFormat.setInputPaths(job,
"hdfs://hadoop:9000/input"
);
81
82
job.waitForCompletion(
true
);
83
}
84
}
先后启动Hadoop-->zookeeper-->habase如下





创建与之对应的表结构

RowKey设计:msisdn:日期时间串(yyyyMMddHHmmss)





执行java程序后





01
查询手机
13450456688
的所有上网记录
02
public
static
void
scan(String tableName)
throws
IOException{
03
HTable table =
new
HTable(getConfiguration(),tableName);
04
Scan scan =
new
Scan();
05
scan.setStartRow(Bytes.toBytes(
"13450456688:/"
));
06
scan.setStopRow(Bytes.toBytes(
"13450456688::"
));
07
ResultScanner scanner =table.getScanner(scan);
08
int
i=
0
;
09
for
(Result result : scanner) {
10
 
System.out.println(
"Scan:"
+i+++
" "
+result);
11
}
12
}
01
查询
134
号段的所有上网记录
02
public
static
void
scanPeriod(String tableName)
throws
IOException{
03
HTable table =
new
HTable(getConfiguration(),tableName);
04
Scan scan =
new
Scan();
05
scan.setStartRow(Bytes.toBytes(
"134/"
));
06
scan.setStopRow(
07
Bytes.toBytes(
"134:"
));
08
scan.setMaxVersions(
1
);
09
ResultScanner scanner =table.getScanner(scan);
10
int
i=
0
;
11
for
(Result result : scanner) {
12
 
System.out.println(
"Scan:"
+i+++
" "
+result);
13
}
14
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: