您的位置:首页 > 运维架构

Hadoop初探之MapReduce+HBase实例

2015-07-28 08:55 447 查看
一、环境配置

这里选择的环境是hadoop-0.20.2和hbase-0.90.4,Hadoop环境配置参看这里,HBase环境配置请看这里

需要注意的是,本文的需求是在Hadoop上跑MapReduce job来分析日志并将结果持久化到HBase,所以,在编译程序时,Hadoop需要用到HBase和Zookeeper包,因此,需要分别将hbase-0.90.4.jar和zookeeper-3.3.2.jar拷贝到Hadoop的lib目录下,具体操作如下:

#cp /root/hbase-0.90.4/hbase-0.90.4.jar /root/hadoop-0.20.2/lib

#cp /root/hbase-0.90.4/lib/zookeeper-3.3.2.jar /root/hadoop-0.20.2/lib

二、实例编写

日志文件xxxlog.txt的内容如下:

version-------------time-----------------id-------rt----filter--------id----rt-----filter

1.0^A2014-03-03 00:00:01^Ad2000^C4^C3040^Bd2001^C7^C0

1.0^A2014-03-03 00:00:01^Ad3000^C4^C3041^Bd2001^C7^C0

同样,需要将此文件放到hdfs目录下,比如:hadoop fs -put /tmp/input。

为持久化在HBase中创建table和family,比如:./hbase shell,create 'xxxlog', 'dsp_filter'。

为了清晰便于扩展,将Maper、Reducer、Driver分开,具体如下:

1、Maper

#vi xxxLogMaper.java

import java.io.IOException;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

public class xxxLogMaper

extends Mapper {

public final static String CONTROL_A = "^A";

public final static String CONTROL_B = "^B";

public final static String CONTROL_C = "^C";

public final static int PV_TIME = 1;

public final static int DSP_INFO_LIST = 5;

public final static int DSP_ID = 0;

public final static int DSP_FILTER = 2;

public void map(Object key, Text value, Context context) {

try {

System.out.println("\n------------map come on-----------");

System.out.println("\nline=-----------"+value.toString());

String[] line = value.toString().split(CONTROL_A);

String pvtime = "";

System.out.println("\npvtime=-----------"+line[PV_TIME]);

String year = line[PV_TIME].substring(0, 4);

String month = line[PV_TIME].substring(5, 7);

String day = line[PV_TIME].substring(8, 10);

String hour = line[PV_TIME].substring(11, 13);

String minute = "";

int m_tmp = Integer.parseInt(line[PV_TIME].substring(14, 16));

if (m_tmp >= 0 && m_tmp <= 30) {

minute = "00";

} else {

minute = "30";

}

pvtime = year + month + day + hour + minute;

String[] dspInfoList = line[DSP_INFO_LIST].split(CONTROL_B);

String dspid = "";

String dspfilter = "";

Text k = new Text();

IntWritable v = new IntWritable(1);

for(int i=0; i<dspinfolist.length; i++)="" {

System.out.println("\n------------map-----------");

System.out.println("\ndspinfo="+dspInfoList[i]);

String[] dspInfo = dspInfoList[i].split(CONTROL_C);

dspid = dspInfo[DSP_ID];

dspfilter = dspInfo[DSP_FILTER];

//key=ddspid^Afilter^Apvtime, value=1

k.set(dspid+CONTROL_A+dspfilter+CONTROL_A+pvtime);

context.write(k, v);

System.out.println("\nkey="+k.toString());

System.out.println("\nvalue="+v.toString());

}

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

2、Reducer

import java.io.IOException;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.hbase.KeyValue;

public class BidLogReducer

extends TableReducer {

public final static String COL_FAMILY = "dsp_filter";

public final static String COL_NAME = "sum";

private final static String ZK_HOST = "localhost";

private final static String TABLE_NAME = "xxxlog";

public void reduce(Text key, Iterable values, Context context)

throws IOException, InterruptedException {

System.out.println("\n------------reduce come on-----------");

String k = key.toString();

IntWritable v = new IntWritable();

int sum = 0;

for (IntWritable val:values) {

sum += val.get();

}

//v.set(sum);

//context.write(key, v);

System.out.println("\n------------reduce-----------");

System.out.println("\ncur-key="+key.toString());

System.out.println("\ncur-value="+sum);

Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum.", ZK_HOST);

HTablePool pool = new HTablePool(conf, 3);

HTable table = (HTable)pool.getTable(TABLE_NAME);

Get getrow = new Get(k.getBytes());

Result r = table.get(getrow);

int m_tmp = 0;

for(KeyValue kv:r.raw()) {

System.out.println("\nraw-KeyValugge---"+kv);

System.out.println("\nraw-row=>"+Bytes.toString(kv.getRow()));

System.out.println("\nraw-family=>"+Bytes.toString(kv.getFamily()));

System.out.println("\nraw-qualifier=>"+Bytes.toString(kv.getQualifier()));

System.out.println("\nraw-value=>"+Bytes.toString(kv.getValue()));

m_tmp += Integer.parseInt(Bytes.toString(kv.getValue()));

}

sum = sum + m_tmp;

v.set(sum);

System.out.println("\nreal-key="+key.toString());

System.out.println("\nreal-value="+v.toString());

Put putrow = new Put(k.getBytes());

putrow.add(COL_FAMILY.getBytes(), COL_NAME.getBytes(), String.valueOf(v).getBytes());

try {

context.write(new ImmutableBytesWritable(key.getBytes()), putrow);

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

3、Driver

#vi xxxLogDriver.java

#vi xxxLogReducer.java

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class xxxLogDriver {

public final static String ZK_HOST = "localhost";

public final static String TABLE_NAME = "xxxlog";

public static void main(String[] args) throws Exception {

//Hbase Configuration

Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum.", ZK_HOST);

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: please input args");

System.exit(2);

}

Job job = new Job(conf,"xxxLog");

job.setJarByClass(xxxLogDriver.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.out.println("\n------------driver come on-----------");

job.setMapperClass(xxxLogMaper.class);

job.setReducerClass(xxxLogReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

TableMapReduceUtil.initTableReducerJob(TABLE_NAME, xxxLogReducer.class, job);

System.exit(job.waitForCompletion(true)? 0 : 1);

}

}

三、编译运行

在当前目录下编译源码,具体如下:

#javac -classpath /root/hadoop-0.20.2/hadoop-0.20.2-core.jar:/root/hadoop-0.20.2/lib/commons-cli-1.2.jar:/root/hbase-0.90.4/hbase-0.90.4.jar -d ./ xxxLogMaper.java xxxLogReducer.java
xxxLogDriver.java

需要注意的是,必须三个一起编译否则出错:

xxxLogDriver.java:22: error: cannot find symbol

job.setMapperClass(xxxLogMaper.class);

打包class文件,具体如下:

#jar cvf xxxLog.jar *class

#rm -rf *class

运行任务,具体如下:

#hadoop jar xxxLog.jar xxxLogDriver /tmp/input /tmp/output

查询结果,具体如下:

#./hbase shell

hbase(main):014:0>scan 'xxxlog'

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: