您的位置:首页 > 产品设计 > UI/UE

hive自定义RowSequence函数

2017-02-26 16:20 696 查看
为了实现hive表中自动添加一列唯一key,使得在分布式节点里存在的key唯一,下面我们通过自定义函数来实现。

第一种方案:

package org.rowsequence;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;

/**
* UDFRowSequence.
*/
@Description(name = "row_sequence",
value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false)
public class RowSequence extends UDF {
private LongWritable result = new LongWritable();

public RowSequence() {
result.set(0);
}

public LongWritable evaluate() {
result.set(result.get() + 1);
return result;
}
}


add jar /home/hadoop/hive_study/hive_udf/hive_udf.jar


create temporary function row_sequence as 'org.rowsequence.RowSequence';  ```


select row_sequence(),city from city;


注:上述程序受限Map个数,map数越多,容易出现多个map生成key,出现重复key,为了解决上述问题,下面这个案例便是很好解决。

第二种方案:

package com.hadoopbook.hive;

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Enumeration;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.junit.Test;

/**
* UDFRowSequence.确保在不同的node,取的唯一的key,不同的进程
*/
@Description(name ="row_sequence", value ="_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false)
public class RowSequence extends UDF {
// 结构类型
// ip+年月日小时分钟秒+进程id+序列数
private Text result = new Text();
private static LongWritable seq_num = new LongWritable();
private String lastdate ="";

public RowSequence() {
result.set("");
seq_num.set(0);
}

public Text evaluate() {
synchronized (seq_num) {
seq_num.set(seq_num.get() + 1);
}
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyMMddHHmmss");
String dateString = formatter.format(currentTime);
// 若时间改变,从新开始计数
if (!lastdate.equals(dateString)) {
seq_num.set(0);
}
lastdate = dateString;
String ip = get_ip().replace(".","");
synchronized (seq_num) {
result.set(ip + dateString + getPid()+seq_num.get());
}
return result;
}

private String get_ip() {
Enumeration allNetInterfaces = null;
try {
allNetInterfaces = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return"127.0.0.1";
}
InetAddress ip = null;
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = (NetworkInterface) allNetInterfaces
.nextElement();
Enumeration addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
ip = (InetAddress) addresses.nextElement();
if (ip != null && ip instanceof Inet4Address) {
return ip.getHostAddress().toString();
}
}
}
return"127.0.0.1";
}
private int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format:"pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
} catch (Exception e) {
return -1;
}
}
@Test
public void testEvaluate() throws InterruptedException {
Text val = this.evaluate();
System.out.println(val.toString());
val = this.evaluate();
System.out.println(val.toString());
Thread.sleep(1000);
val = this.evaluate();
System.out.println(val.toString());
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息