您的位置:首页 > 运维架构

数据算法-hadoop5 反转排序

2017-10-16 10:27 337 查看
  反转排序,看书看了半天,才搞明白是什么鬼。反转排序和控制反转的反转估计是一个意思,就是把排序的权利反转给了开发者。

  主要通过组合键自定义排序和patitioner按照自然键分区实现

  例子是文档前后领域的词频,一个词词频必须要先算出词频总数,就必须自己通过方法先算出词频总数。也就是compareTo方法先统计*的数量。

文档如下

w1 w2 w3 w4 w5 w6

map通过组合键生成词频

(w1,w2) 1

(w1,w3) 1

(w1,*) 2

(w2,w1) 1

(w2,w3) 1

(w2,w4) 1

(w2,*) 3

(w3,w1) 1

(w3,w2) 1

(w3,w4) 1

(w3,w5) 1

(w3,*) 4

(w4,w1) 1

(w4,w2) 1

(w4,w3) 1

(w4,w5) 1

(w4,*) 4

(w5,w3) 1

(w5,w4) 1

(w5,w5) 1

(w5,*) 3

(w6,w4) 1

(w6,w5) 1

(w6,*) 2

通过compareto把*提前,patitioner按第一个键归类得到

(w1,*),(w1,w2),(w1,w3) 2,1,1

(w2,*),(w2,w1),(w2,w3),(w2,w4) 3,1,1,1

(w3,*),(w3,w1),(w3,w2),(w3,w4),(w3,w5) 4,1,1,1,1

(w4,*),(w4,w1),(w4,w2),(w4,w3),(w4,w5) 4,1,1,1,1

(w5,*),(w5,w3),(w5,w4),(w5,w5) 3,1,1,1

(w6,*),(w6,w4),(w6,w5) 2,1,1

public class RelativeFrequencyTaskTest {
private static Log log = null;
static {
DOMConfigurator.configureAndWatch("./conf/log4j.xml");
log = LogFactory.getLog("default");
}

// public Class myclass;
/**
* name
*/
public static final String JOB_NAME = "RelationFilterMR";

/**
* 入口
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {

Configuration conf1 = new Configuration();
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.5.2");

// conf1.set("mapreduce.app-submission.cross-platform", "true");
Job job = Job.getInstance(conf1, "RelativeFrequency");

job.setMapperClass(RelativeFrequencyMapper.class);
job.setReducerClass(RelativeFrequencyReducer.class);
job.setOutputKeyClass(PairOfWords.class);
job.setOutputValueClass(IntWritable.class);
job.setPartitionerClass(OrderInversionPartitioner.class);
// 设置Reduce任务数
job.setNumReduceTasks(10);

FileInputFormat.setInputPaths(job, new Path("C:\\demo\\05\\input.txt"));
FileOutputFormat.setOutputPath(job, new Path("C:\\demo\\05\\out"));
if (job.waitForCompletion(true)) {
log.info("MR run successfully");

} else {
log.error("MR run failed");

}

}

}


public class RelativeFrequencyMapper extends
Mapper<LongWritable, Text, PairOfWords, IntWritable> {

private int neighborWindow = 2;
private final PairOfWords pair = new PairOfWords();
IntWritable ONE = new IntWritable(1);
IntWritable totalCount = new IntWritable();

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(" ");

if ((tokens == null) || (tokens.length < 2)) {
return;
}

for (int i = 0; i < tokens.length; i++) {
String word = tokens[i];
pair.setLeftElement(word);
int start = 0;
if (i - neighborWindow >= 0) {
start = i - neighborWindow;
}
int end = 0;
if (i + neighborWindow >= tokens.length) {
end = tokens.length - 1;
} else {
end = i + neighborWindow;
}

for (int j = start; j <= end; j++) {
if (i == j) {
continue;
}
pair.setRightElement(tokens[j]);
context.write(pair, ONE);

}
pair.setRightElement("*");
totalCount.set(end - start);
context.write(pair, totalCount);
}
}

}


public class RelativeFrequencyReducer extends
Reducer<PairOfWords, IntWritable, PairOfWords, DoubleWritable> {

private double totalCount = 0;
private final DoubleWritable relativeCount = new DoubleWritable();
private String currentWord = "NOT_DEFINED";

@Override
public void reduce(PairOfWords key, Iterable<IntWritable> values,
Context context) throws java.io.IOException, InterruptedException {
if (key.getRightElement().equals("*")) {
if (key.getLeftElement().equals(currentWord)) {
totalCount += totalCount + getTotalCount(values);
} else {
currentWord = key.getLeftElement();
totalCount = getTotalCount(values);
}
} else {
int count = getTotalCount(values);
relativeCount.set((double) count / totalCount);
context.write(key, relativeCount);
}
}
private int getTotalCount(Iterable<IntWritable> values) {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
return sum;
}
}


这个方法最重要,书上竟然没有

public class PairOfWords implements WritableComparable<PairOfWords> {

private String leftElement;
private String rightElement;

public PairOfWords() {

}

public PairOfWords(String left, String right) {
leftElement = left;
rightElement = right;
}

public String getLeftElement() {
return leftElement;
}

public void setLeftElement(String leftElement) {
this.leftElement = leftElement;
}

public String getRightElement() {
return rightElement;
}

public void setRightElement(String rightElement) {
this.rightElement = rightElement;
}

@Override
public void readFields(DataInput in) throws IOException {
leftElement = in.readUTF();
rightElement = in.readUTF();

}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(leftElement);
out.writeUTF(rightElement);

}

@Override
public int compareTo(PairOfWords other) {
//排序,有*的排到前面,使其可以先算总数
int returnVal = this.rightElement.compareTo(other.getRightElement());
if(returnVal != 0){
return returnVal;
}
if(this.leftElement.toString().equals('*')){
return -1;
}else if(other.getLeftElement().toString().equals('*')){
return 1;
}
return this.leftElement.compareTo(other.getLeftElement());
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
//
if (!(obj instanceof PairOfWords)) {
return false;
}
//
PairOfWords pair = (PairOfWords) obj;
return leftElement.equals(pair.getLeftElement())
&& rightElement.equals(pair.getRightElement());
}

@Override
public int hashCode() {
return leftElement.hashCode() + rightElement.hashCode();
}

@Override
public String toString() {
return "(" + leftElement + ", " + rightElement + ")";
}

}


public class OrderInversionPartitioner extends
Partitioner<PairOfWords, IntWritable> {

@Override
public int getPartition(PairOfWords pair, IntWritable value,  int number) {
//使具有相同左词的所有WordPai对象被发送到同一个reducer
return Math.abs(pair.getLeftElement().hashCode() % number);
}

}


输入

java is a great language

java is a programming language

java is green fun language

java is great

programming with java is fun

结果

(is, a) 0.14285714285714285

(is, fun) 0.14285714285714285

(is, great) 0.14285714285714285

(is, green) 0.07142857142857142

(is, java) 0.35714285714285715

(is, programming) 0.07142857142857142

(is, with) 0.07142857142857142

(great, a) 0.2

(great, is) 0.4

(great, java) 0.2

(great, language) 0.2

(language, a) 0.3333333333333333

(language, fun) 0.16666666666666666

(language, great) 0.16666666666666666

(language, green) 0.16666666666666666

(language, programming) 0.16666666666666666

(with, is) 0.3333333333333333

(with, java) 0.3333333333333333

(with, programming) 0.3333333333333333

(a, great) 0.125

(a, is) 0.25

(a, java) 0.25

(a, language) 0.25

(a, programming) 0.125

(java, a) 0.16666666666666666

(java, fun) 0.08333333333333333

(java, great) 0.08333333333333333

(java, green) 0.08333333333333333

(java, is) 0.4166666666666667

(java, programming) 0.08333333333333333

(java, with) 0.08333333333333333

(programming, a) 0.2

(green, fun) 0.2

(fun, green) 0.2

(fun, is) 0.4

(green, is) 0.2

(programming, is) 0.2

(fun, java) 0.2

(green, java) 0.2

(programming, java) 0.2

(fun, language) 0.2

(green, language) 0.2

(programming, language) 0.2

(programming, with) 0.2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop
相关文章推荐