hadoop pipes编程示例
2013-02-25 18:41
316 查看
hadoop
pipes是hadoop的c++正式接口,通过socket与Map/Reduce框架通信,具体原理这里不在详述,下面通过一个单词统计的示例来说明用法。
1.代码
#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
const std::string WORDCOUNT = "WORDCOUNT";
const std::string INPUT_WORDS = "INPUT_WORDS";
const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
class WordCountMap: public HadoopPipes::Mapper {
public:
HadoopPipes::TaskContext::Counter*
inputWords;
WordCountMap(HadoopPipes::TaskContext& context)
{
inputWords =
context.getCounter(WORDCOUNT, INPUT_WORDS);
}
void
map(HadoopPipes::MapContext& context) {
std::vector<std::string> words
=
HadoopUtils::splitString(context.getInputValue(), " ");
for(unsigned
int i=0; i < words.size(); ++i) {
context.emit(words[i], "1");
}
context.incrementCounter(inputWords, words.size());
}
};
class WordCountReduce: public HadoopPipes::Reducer {
public:
HadoopPipes::TaskContext::Counter*
outputWords;
WordCountReduce(HadoopPipes::TaskContext& context)
{
outputWords
= context.getCounter(WORDCOUNT, OUTPUT_WORDS);
}
void
reduce(HadoopPipes::ReduceContext& context) {
int sum =
0;
while
(context.nextValue()) {
sum += HadoopUtils::toInt(context.getInputValue());
}
context.emit(context.getInputKey(),
HadoopUtils::toString(sum));
context.incrementCounter(outputWords, 1);
}
};
int main(int argc, char *argv[]) {
return
HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap,
WordCountReduce>());
}
2.编译
makefile如下:
CC = g++
HADOOP_INSTALL = /home/keke/hadoop-0.20.2-cdh3u4
PLATFORM = Linux-i386-32
CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
wordcount:wordcount.cpp
$(CC) $(CPPFLAGS) $< -Wall
-L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes -lhadooputils
-lpthread -lcrypto -g -O2 -o $@
3.运行
先将只执行文件复制到HDFS上面,例如放在HDFS的bin下
执行:
hadoop pipes -D hadoop.pipes.java.recordreader=true -D
hadoop.pipes.java.recordwrite=true -input /user/keke/input -output
output -program bin/wordcount
pipes是hadoop的c++正式接口,通过socket与Map/Reduce框架通信,具体原理这里不在详述,下面通过一个单词统计的示例来说明用法。
1.代码
#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
const std::string WORDCOUNT = "WORDCOUNT";
const std::string INPUT_WORDS = "INPUT_WORDS";
const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
class WordCountMap: public HadoopPipes::Mapper {
public:
HadoopPipes::TaskContext::Counter*
inputWords;
WordCountMap(HadoopPipes::TaskContext& context)
{
inputWords =
context.getCounter(WORDCOUNT, INPUT_WORDS);
}
void
map(HadoopPipes::MapContext& context) {
std::vector<std::string> words
=
HadoopUtils::splitString(context.getInputValue(), " ");
for(unsigned
int i=0; i < words.size(); ++i) {
context.emit(words[i], "1");
}
context.incrementCounter(inputWords, words.size());
}
};
class WordCountReduce: public HadoopPipes::Reducer {
public:
HadoopPipes::TaskContext::Counter*
outputWords;
WordCountReduce(HadoopPipes::TaskContext& context)
{
outputWords
= context.getCounter(WORDCOUNT, OUTPUT_WORDS);
}
void
reduce(HadoopPipes::ReduceContext& context) {
int sum =
0;
while
(context.nextValue()) {
sum += HadoopUtils::toInt(context.getInputValue());
}
context.emit(context.getInputKey(),
HadoopUtils::toString(sum));
context.incrementCounter(outputWords, 1);
}
};
int main(int argc, char *argv[]) {
return
HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap,
WordCountReduce>());
}
2.编译
makefile如下:
CC = g++
HADOOP_INSTALL = /home/keke/hadoop-0.20.2-cdh3u4
PLATFORM = Linux-i386-32
CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
wordcount:wordcount.cpp
$(CC) $(CPPFLAGS) $< -Wall
-L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes -lhadooputils
-lpthread -lcrypto -g -O2 -o $@
3.运行
先将只执行文件复制到HDFS上面,例如放在HDFS的bin下
执行:
hadoop pipes -D hadoop.pipes.java.recordreader=true -D
hadoop.pipes.java.recordwrite=true -input /user/keke/input -output
output -program bin/wordcount
相关文章推荐
- Hadoop&nbsp;pipes编程
- Hadoop&nbsp;Streaming和Pipes
- hadoop&nbsp;c++&nbsp;pipes接口实现
- Hadoop&nbsp;Streaming&nbsp;编程
- hadoop&nbsp;pipes
- Hadoop&nbsp;Streaming高级编程
- Hadoop&nbsp;pipes设计原理
- VC环境下UDP&nbsp;Socket编程示例
- java自己造 编程IDE【2】,消…
- .NET 下的 POP3 编程代码共享
- Hadoop-2.6.0 集群的安装配置
- &nbsp;LoadRunner编程之文件的操…
- kernel&nbsp;module编程(六):printk-…
- 配置hadoop时,jobtracker&nbsp;没…
- 8个实用的Linux netcat命令示例
- hadoop编程入门学习笔记-2 通过示例程序理解hadoop
- 通过&nbsp;Perl&nbsp;编程访问&amp;nbs…
- Ruby&nbsp;随记&nbsp;-简单语法示例
- Hadoop&nbsp;Shell
- hadoop&nbsp;Streaming之aggregate