Hadoop,Hbase 编程实战基于Hash 的 distinct 操作
2016-04-06 14:37
501 查看
题目描述:主要是实现从hadoop hdfs 文件中读取数据,然后进行基于hash 的distinct 操作,然后将数据存到hbase中,具体描述见下图
package homework; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; /** * @author: wjf * @version: 2016年4月3日 上午11:30:39 */ /* * 非常重要 * 1:修改文件名 * 2: 删除包名 */ public class Hw1Grap4 { /** * this map is to store the original data */ private static Map<String,ArrayList<String>> datamap=new HashMap<String,ArrayList<String>>(); /** * @param file * @param compareOperator * @param selectNumber * @param compareValue * @param discount * @param disnumber * @throws IOException */ public static void readDataFromHdfs(String file,String compareOperator,int selectNumber,double compareValue,int discount,int disnumber[]) throws IOException{ //configure hdfs Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(file), conf); //read data from hdfs file Path path = new Path(file); FSDataInputStream in_stream = fs.open(path); BufferedReader in = new BufferedReader(new InputStreamReader(in_stream)); String tempLine; int count=0; int tempvalue; // do select and distinct operation while ((tempLine=in.readLine())!=null) { count++; String[] items=tempLine.split("\\|"); boolean flag=false; // compare the value to select if(compareOperator.equals("gt")){ tempvalue= Integer.valueOf(items[selectNumber]); if(tempvalue > compareValue){ flag=true; System.out.println("gt"); } } else if(compareOperator.equals("ge")){ tempvalue= Integer.valueOf(items[selectNumber]); if(tempvalue >= compareValue)flag=true; } else if(compareOperator.equals("eq")){ tempvalue= Integer.valueOf(items[selectNumber]); if(tempvalue == compareValue)flag=true; } else if(compareOperator.equals("lt")){ tempvalue= Integer.valueOf(items[selectNumber]); if(tempvalue < compareValue)flag=true; } else if(compareOperator.equals("le")){ tempvalue= Integer.valueOf(items[selectNumber]); if(tempvalue <= compareValue)flag=true; } else if(compareOperator.equals("ne")){ tempvalue= Integer.valueOf(items[selectNumber]); if(tempvalue != compareValue)flag=true; } else{ System.out.println("compareOperator error"); System.exit(0); } // this record is passed by this selection if(flag == true){ StringBuilder key=new StringBuilder(""); for(int i=0;i<discount;i++){ key.append(items[disnumber[i]]); } System.out.println(key.toString()); if(datamap.containsKey(key)){ // the temp record is the same with some record in the dataMap by comparing the key continue; } ArrayList<String> record=new ArrayList<String>(); for(int i=0;i<discount;i++){ record.add(items[disnumber[i]]); } datamap.put(key.toString(), record); } } in.close(); fs.close(); } /** * @param disnumber * @param discount * @throws IOException */ public static void writeDataToHbase(int[] disnumber,int discount) throws IOException{ // set HTable descriptor String tableName="result"; HTableDescriptor htd=new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor cf=new HColumnDescriptor("res"); htd.addFamily(cf); //configure hbase Configuration cfg=HBaseConfiguration.create(); HBaseAdmin hAdmin=new HBaseAdmin(cfg); //if exists then delete if(hAdmin.tableExists(tableName)){ hAdmin.disableTable(tableName); hAdmin.deleteTable(tableName); } // create a new table named tableName hAdmin.createTable(htd); //put data into the hbaseTable HTable table=new HTable(cfg,tableName); int count=0; for(String key:datamap.keySet()){ Put put =new Put(String.valueOf(count).getBytes()); for(int i=0;i<discount;i++){ put.add("res".getBytes(),("R"+String.valueOf(disnumber[i])).getBytes(),(datamap.get(key).get(i)).getBytes()); table.put(put); } count++; } table.close(); } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { // if the paras is wrong then exit if(args.length!=3){ System.out.println("the args is wrong....!"); System.exit(0); } // file path String file="hdfs://master:9000"+args[0].substring(2); // select args String query=args[1]; // distinct args String dis=args[2]; // get the column number of select operation int selectNumber = 0; String compareOperator = null; double compareValue = 0; for(int i=0;i<query.length();i++){ if(query.charAt(i)=='R'){ selectNumber=(query.charAt(i+1))-'0'; compareOperator=query.substring(i+3,i+5); compareValue=Double.valueOf(query.substring(i+6)); break; } } // get the number in distinct operation int[] disnumber=new int[dis.length()/2]; int discount=0; for(int i=0;i<dis.length();i++){ if(dis.charAt(i)=='R'){ disnumber[discount++]=dis.charAt(i+1)-'0'; } } // read data from hdfs and intermediate processing readDataFromHdfs(file, compareOperator, selectNumber, compareValue, discount, disnumber); // write data to hbase tbale writeDataToHbase(disnumber,discount); } }
相关文章推荐
- php中如何给类规范的注释
- Java关于大数处理的类
- 哈工大深研院数字图像处理第一次大作业:不调用Matlab函数实现图像增强
- java注解详解
- C# Cast
- java 中文转拼音
- Simple C# FTP Class
- LuaFramework_UGUI学习笔记一
- 设计模式---工厂模式
- VBOX中的Ubuntu的安装和配置
- python学习笔记(控制语句)
- java 判断字符串输入是中文还是英文
- Lua_第 21 章 操作系统库
- 《Effective C++》阅读笔记02
- java多线程--信号量Semaphore的使用
- 第三章 第五节 JAVA接口
- HDU2053.JAVA
- Eclipse中进行Gradle+Jetty部署的web项目的断点调试
- 面向对象
- Java五种单例模式与线程安全