您的位置:首页 > 运维架构 > Apache

MapReduce中碰到数据覆盖现象,org.apache.hadoop.io.Text.getBytes 问题

2015-10-13 11:43 363 查看
示例代码

package com.enfang.mapreduce.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class hbaseReduce {
private static Log log = LogFactory.getLog(hbaseReduce.class);
public static class MyMapper extends TableMapper<Text, Text>  {
private static int ff=30000;
private Text textvalue = new Text();
private Text keytext = new Text();

public void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException {
keytext.set("all");

ff--;
if(ff<17000){
textvalue.set("CUST_ID:"+ff+"\tBILL_ID:"+ff);

}else{

textvalue.set("yoyoyo");

}

String value=new String(textvalue.getBytes());

if(value.startsWith("yoyoyoD")){
log.info("value====>"+value);

}

context.write(keytext, textvalue);

}
}

public static class MyTableReducer extends TableReducer<Text, Text, ImmutableBytesWritable>  {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int sum=0;
String keys =new String(key.getBytes());
log.info("keys====>"+keys);

Iterator<Text>  it=values.iterator();
while(it.hasNext()){
Text  val=it.next();
String value=new String(val.toString());
if(value.startsWith("yoyoyoD")){
log.info("value====>"+value);

}

log.info("value====>"+value);
}

}
}

public static void main(String[] args) {

try {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.property.clientPort", "2181");
config.set("hbase.zookeeper.quorum","10.77.17.93");
config.set("hbase.master", "10.77.17.93:6000");
Job job = Job.getInstance(config,"ExampleMuliTable");
job.setJarByClass(hbaseReduce.class);     // class that contains mapper and reducer
List<Scan> scans = new ArrayList<Scan>();
Scan scan2 = new Scan();
scan2.setCaching(100);
scan2.setCacheBlocks(false);
scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("CM_TAXPAYER_APPLY"));

Scan scan1 = new Scan();
scan1.setCaching(100);
scan1.setCacheBlocks(false);
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("CM_TAXPAYER_MANAGEMENT"));

scans.add(scan2);
scans.add(scan1);

TableMapReduceUtil.initTableMapperJob(
scans,
MyMapper.class,     // mapper class
Text.class,         // mapper output key
Text.class,  // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
"total-access",        // output table
MyTableReducer.class,    // reducer class
job);

job.setNumReduceTasks(1);   // at least one, adjust as required

boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}


运行结果

[ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16993	BILL_ID:16993 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16994	BILL_ID:16994 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16995	BILL_ID:16995 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16996	BILL_ID:16996 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16997	BILL_ID:16997 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16998	BILL_ID:16998 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16999	BILL_ID:16999 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999	BILL_ID:16999 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999	BILL_ID:16999 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999	BILL_ID:16999 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999	BILL_ID:16999 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999	BILL_ID:16999 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999	BILL_ID:16999 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999	BILL_ID:16999 (hbaseReduce.java:123)
[ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999	BILL_ID:16999 (hbaseReduce.java:123)


问题原因: TEXT类中

@Override
public byte[] getBytes() {
return bytes;
}


public void set(byte[] utf8, int start, int len) {
setCapacity(len, false);
System.arraycopy(utf8, start, bytes, 0, len);
this.length = len;
}


getBytes把所有的拿出来,而set却只是把bytes前几项可替换了下

解决办法:
1.使用
public byte[] copyBytes() {
byte[] result = new byte[length];
System.arraycopy(bytes, 0, result, 0, length);
return result;
}

2.使用
public String toString() {
try {
return decode(bytes, 0, length);
} catch (CharacterCodingException e) {
throw new RuntimeException("Should not have happened " , e);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: