您的位置:首页 > 编程语言

Hadoop,Hbase 编程实战基于Hash 的 distinct 操作

2016-04-06 14:37 501 查看
题目描述:主要是实现从hadoop hdfs 文件中读取数据,然后进行基于hash 的distinct 操作,然后将数据存到hbase中,具体描述见下图



package homework;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;

/**
* @author: wjf
* @version: 2016年4月3日 上午11:30:39
*/
/*
* 非常重要
* 1:修改文件名
* 2: 删除包名
*/
public class Hw1Grap4 {

/**
* this map is to store the original data
*/
private static Map<String,ArrayList<String>> datamap=new HashMap<String,ArrayList<String>>();

/**
* @param file
* @param compareOperator
* @param selectNumber
* @param compareValue
* @param discount
* @param disnumber
* @throws IOException
*/
public static void readDataFromHdfs(String file,String compareOperator,int selectNumber,double compareValue,int discount,int disnumber[]) throws IOException{
//configure hdfs
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(file), conf);

//read data from hdfs file
Path path = new Path(file);
FSDataInputStream in_stream = fs.open(path);
BufferedReader in = new BufferedReader(new InputStreamReader(in_stream));
String tempLine;
int count=0;
int tempvalue;

// do select and distinct operation
while ((tempLine=in.readLine())!=null) {
count++;
String[] items=tempLine.split("\\|");
boolean flag=false;
// compare the value to select
if(compareOperator.equals("gt")){
tempvalue= Integer.valueOf(items[selectNumber]);

if(tempvalue > compareValue){
flag=true;
System.out.println("gt");
}
}
else if(compareOperator.equals("ge")){
tempvalue= Integer.valueOf(items[selectNumber]);
if(tempvalue >= compareValue)flag=true;
}
else if(compareOperator.equals("eq")){
tempvalue= Integer.valueOf(items[selectNumber]);
if(tempvalue == compareValue)flag=true;
}
else if(compareOperator.equals("lt")){
tempvalue= Integer.valueOf(items[selectNumber]);
if(tempvalue < compareValue)flag=true;
}
else if(compareOperator.equals("le")){
tempvalue= Integer.valueOf(items[selectNumber]);
if(tempvalue <= compareValue)flag=true;
}
else if(compareOperator.equals("ne")){
tempvalue= Integer.valueOf(items[selectNumber]);
if(tempvalue != compareValue)flag=true;
}
else{
System.out.println("compareOperator error");
System.exit(0);
}

// this record is passed by this selection
if(flag == true){
StringBuilder key=new StringBuilder("");
for(int i=0;i<discount;i++){
key.append(items[disnumber[i]]);
}
System.out.println(key.toString());
if(datamap.containsKey(key)){
// the temp record is the same with some record in the dataMap by comparing the key
continue;
}
ArrayList<String> record=new ArrayList<String>();
for(int i=0;i<discount;i++){
record.add(items[disnumber[i]]);
}
datamap.put(key.toString(), record);
}
}
in.close();
fs.close();

}

/**
* @param disnumber
* @param discount
* @throws IOException
*/
public static void writeDataToHbase(int[] disnumber,int discount) throws IOException{
// set HTable descriptor
String tableName="result";
HTableDescriptor htd=new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cf=new HColumnDescriptor("res");
htd.addFamily(cf);

//configure hbase
Configuration cfg=HBaseConfiguration.create();
HBaseAdmin hAdmin=new HBaseAdmin(cfg);

//if exists then delete
if(hAdmin.tableExists(tableName)){
hAdmin.disableTable(tableName);
hAdmin.deleteTable(tableName);
}
// create a new table named tableName
hAdmin.createTable(htd);

//put data into the hbaseTable
HTable table=new HTable(cfg,tableName);
int count=0;
for(String key:datamap.keySet()){
Put put =new Put(String.valueOf(count).getBytes());
for(int i=0;i<discount;i++){
put.add("res".getBytes(),("R"+String.valueOf(disnumber[i])).getBytes(),(datamap.get(key).get(i)).getBytes());
table.put(put);
}
count++;
}
table.close();
}

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// if the paras is wrong then exit
if(args.length!=3){
System.out.println("the args is wrong....!");
System.exit(0);
}
// file path
String file="hdfs://master:9000"+args[0].substring(2);
// select args
String query=args[1];
// distinct args
String dis=args[2];

// get the column number of select operation
int selectNumber = 0;
String compareOperator = null;
double compareValue = 0;
for(int i=0;i<query.length();i++){
if(query.charAt(i)=='R'){
selectNumber=(query.charAt(i+1))-'0';
compareOperator=query.substring(i+3,i+5);
compareValue=Double.valueOf(query.substring(i+6));
break;
}
}
// get the number in distinct operation
int[] disnumber=new int[dis.length()/2];
int discount=0;
for(int i=0;i<dis.length();i++){
if(dis.charAt(i)=='R'){
disnumber[discount++]=dis.charAt(i+1)-'0';
}
}
//       read data from hdfs and intermediate processing
readDataFromHdfs(file, compareOperator, selectNumber, compareValue, discount, disnumber);
//       write data to hbase tbale
writeDataToHbase(disnumber,discount);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: