MapReduce中碰到数据覆盖现象,org.apache.hadoop.io.Text.getBytes 问题
2015-10-13 11:43
363 查看
示例代码
运行结果
问题原因: TEXT类中
getBytes把所有的拿出来,而set却只是把bytes前几项可替换了下
解决办法:
1.使用
2.使用
package com.enfang.mapreduce.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class hbaseReduce { private static Log log = LogFactory.getLog(hbaseReduce.class); public static class MyMapper extends TableMapper<Text, Text> { private static int ff=30000; private Text textvalue = new Text(); private Text keytext = new Text(); public void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException { keytext.set("all"); ff--; if(ff<17000){ textvalue.set("CUST_ID:"+ff+"\tBILL_ID:"+ff); }else{ textvalue.set("yoyoyo"); } String value=new String(textvalue.getBytes()); if(value.startsWith("yoyoyoD")){ log.info("value====>"+value); } context.write(keytext, textvalue); } } public static class MyTableReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int sum=0; String keys =new String(key.getBytes()); log.info("keys====>"+keys); Iterator<Text> it=values.iterator(); while(it.hasNext()){ Text val=it.next(); String value=new String(val.toString()); if(value.startsWith("yoyoyoD")){ log.info("value====>"+value); } log.info("value====>"+value); } } } public static void main(String[] args) { try { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.property.clientPort", "2181"); config.set("hbase.zookeeper.quorum","10.77.17.93"); config.set("hbase.master", "10.77.17.93:6000"); Job job = Job.getInstance(config,"ExampleMuliTable"); job.setJarByClass(hbaseReduce.class); // class that contains mapper and reducer List<Scan> scans = new ArrayList<Scan>(); Scan scan2 = new Scan(); scan2.setCaching(100); scan2.setCacheBlocks(false); scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("CM_TAXPAYER_APPLY")); Scan scan1 = new Scan(); scan1.setCaching(100); scan1.setCacheBlocks(false); scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("CM_TAXPAYER_MANAGEMENT")); scans.add(scan2); scans.add(scan1); TableMapReduceUtil.initTableMapperJob( scans, MyMapper.class, // mapper class Text.class, // mapper output key Text.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob( "total-access", // output table MyTableReducer.class, // reducer class job); job.setNumReduceTasks(1); // at least one, adjust as required boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
运行结果
[ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16993 BILL_ID:16993 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16994 BILL_ID:16994 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16995 BILL_ID:16995 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16996 BILL_ID:16996 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16997 BILL_ID:16997 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16998 BILL_ID:16998 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>CUST_ID:16999 BILL_ID:16999 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999 BILL_ID:16999 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999 BILL_ID:16999 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999 BILL_ID:16999 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999 BILL_ID:16999 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999 BILL_ID:16999 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999 BILL_ID:16999 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999 BILL_ID:16999 (hbaseReduce.java:123) [ INFO] 2015十月13 10:21:47 - value====>yoyoyoD:16999 BILL_ID:16999 (hbaseReduce.java:123)
问题原因: TEXT类中
@Override public byte[] getBytes() { return bytes; }
public void set(byte[] utf8, int start, int len) { setCapacity(len, false); System.arraycopy(utf8, start, bytes, 0, len); this.length = len; }
getBytes把所有的拿出来,而set却只是把bytes前几项可替换了下
解决办法:
1.使用
public byte[] copyBytes() { byte[] result = new byte[length]; System.arraycopy(bytes, 0, result, 0, length); return result; }
2.使用
public String toString() { try { return decode(bytes, 0, length); } catch (CharacterCodingException e) { throw new RuntimeException("Should not have happened " , e); } }
相关文章推荐
- apache重写字段详细说明
- comet4j集成项目报错JSON转换异常:html><head><title>Apache ....问题的解决
- Apache Benchmark安装、参数含义&使用总结、结果分析
- Linux:一台apache服务器上部署多个项目的apache配置
- 使用Apache Benchmark做压力测试遇上的5个常见问题
- windows下 简单配置PHP + apache +mysql
- Apache Commons Configuration使用入门
- iOS 搭建Apache服务器(10.10系统)
- mac 下apache访问access forbidden
- Deprecated: Function eregi() is deprecated in D:\Apache24\htdocs\processfeedback.php on line 21
- Windows下Apache使用问题总结(持续更新)
- CentOs开启Apache的rewrite_module模块,支持.htaccess
- java.lang.NullPointerException(at org.apache.jsp.loginb_jsp._jspInit(loginb_jsp.java:22))
- java学习——apache commons fileupload实现上传
- 学习Thrift的小认识
- apache和PHP如何整合在一起
- Apache POI 之 初学实战篇 (七) --- 抽取Excel内容
- 在apache上安装gitweb实现浏览器访问git服务器
- CentOS 6.3下源码安装LAMP(Linux+Apache+Mysql+Php)环境
- apache和PHP如何整合在一起