storm下运行C++程序(一)
2013-11-12 14:26
239 查看
转载,原文地址:http://blog.csdn.net/yan_mount/article/details/11527799
学习storm有段时间了,也搭建了一个简单的环境,很欣赏它的一些理念,考虑到很多程序是C++实现的,如果要使用该平台的话,需要为这些程序实现一个接口,方便统一在storm中运行,折腾了几天,初步成功的实现了一个C++的bolt,特分享如下:
view plaincopy
MyShellBolt extends ShellBolt implements IRichBolt{
public MyShellBolt()
{
super("/bin/sh","start.sh");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
view plaincopy
chmod a+x Bolt
./Bolt
[cpp]
view plaincopy
void StormUtils::storm_emit(const string msg[],size_t size)
{
Json::Value out;
out["command"]="emit";
for(size_t i=0;i<size;i++)
{
out["tuple"].append(msg[i]);
}
outs << out.toStyledString() << "end\n"<<endl;
return;
}
[cpp]
view plaincopy
void StormUtils::init(const string& handshake)
{
Json::Reader reader;
Json::Value config;
if (reader.parse(handshake, config))
{//parse handshake OK
const char* piddir=config["pidDir"].asCString();
int iPid = (int)getpid();
char pid[64];
sprintf(pid, "%d", iPid);
stringstream ss;
ss<<piddir<<"/"<<iPid;
ofstream file(ss.str().c_str(),ofstream::out);
file.close();
msg_map msg;
msg_pair my_pair("pid",pid);
msg.insert(my_pair);
storm_send(msg);
}
return;
}
其中要注意的是:
1,init握手函数的json输出中,pid的value的number,所以不能加“”,其它输出value都是string。
2,每次输出到storm平台的json串最后都要加一行end,如:
outs<<out.toStyledString()<< "end\n"<<endl;
3,jsoncpp的reader.parse函数在遇到非json格式串时,会被阻塞住(不知原因),所以自己还简单判断了下storm传给bolt的消息串是否是json格式,否则丢弃
其中bolt的主体循环流程为:
[cpp]
view plaincopy
void process(::StormUtils& stormUtils)
{
Json::Reader reader;
Json::Value value;
string msg;
while(true)
{
//this function will be blocked from stdin
stormUtils.read_msg(std::cin,msg);
stormUtils.storm_log("read_msg:"+msg);
size_t pos=msg.find("{");
if (pos==string::npos)
{//no {} in string,then discard
continue;
}
else if (pos>0)
{//erase the invalid part from msg
msg=msg.erase(0,pos);
}
if (reader.parse(msg, value))
{
bool hasID=value.isMember("id");
if (hasID)
{//get tuple
//tuple is array
const char* ID=value["id"].asCString();
const Json::Value tuples = value["tuple"];
string tuple = tuples[0u].asString();
vector<string> words=stormUtils.split(tuple," ");
vector<string>::iterator it;
for(it=words.begin();it!=words.end();it++)
{
stormUtils.storm_log("emit:"+*it);
string outMsg[]={*it};
stormUtils.storm_emit(outMsg,1);
}
stormUtils.storm_log(ID);
stormUtils.storm_ack(ID);
}
}
else
{
stormUtils.storm_log("msg parse error:"+msg);
}
msg.clear();//ready to read again from stdin
}
return;
}
conf.setDebug(true);
这样在本地模式运行时,从日志里就可以看到C++程序中打印的日志,如:
4861 [Thread-21] INFO backtype.storm.task.ShellBolt - Shell msg: read_msg:{"id":"-1825914362791431189","stream":"default","comp":"MySpout","tuple":["snow white and the seven dwarfs"],"task":1}
系统启动C程序的日志:
4724 [Thread-20] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 2039
Shell msg: read_msg:[3][3][3][3][3][3]
造成程序僵死,用单元测试也发现jsoncpp有这个问题,还请高手指点验证
总之,用C++实现的Bolt基本跑通,为后续真正的业务模块(C++实现)的使用打下基础
学习storm有段时间了,也搭建了一个简单的环境,很欣赏它的一些理念,考虑到很多程序是C++实现的,如果要使用该平台的话,需要为这些程序实现一个接口,方便统一在storm中运行,折腾了几天,初步成功的实现了一个C++的bolt,特分享如下:
1,需要先定义一个java的壳:
[java]view plaincopy
MyShellBolt extends ShellBolt implements IRichBolt{
public MyShellBolt()
{
super("/bin/sh","start.sh");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
2,脚本start.sh中:
[plain]view plaincopy
chmod a+x Bolt
./Bolt
这个地方使用了脚本来中转一下,因为直接在java中调用super("./Bolt")时,遇到了各种问题,要么找不到文件,要么没有执行权限,最后使用脚本才解决这个问题。
3,仿照storm自带的python程序的例子编写了C++的库:
主要是采用jsoncpp库(jsoncpp-src-0.5.0)实现json串的解析和输出 比如:
[cpp]
view plaincopy
void StormUtils::storm_emit(const string msg[],size_t size)
{
Json::Value out;
out["command"]="emit";
for(size_t i=0;i<size;i++)
{
out["tuple"].append(msg[i]);
}
outs << out.toStyledString() << "end\n"<<endl;
return;
}
[cpp]
view plaincopy
void StormUtils::init(const string& handshake)
{
Json::Reader reader;
Json::Value config;
if (reader.parse(handshake, config))
{//parse handshake OK
const char* piddir=config["pidDir"].asCString();
int iPid = (int)getpid();
char pid[64];
sprintf(pid, "%d", iPid);
stringstream ss;
ss<<piddir<<"/"<<iPid;
ofstream file(ss.str().c_str(),ofstream::out);
file.close();
msg_map msg;
msg_pair my_pair("pid",pid);
msg.insert(my_pair);
storm_send(msg);
}
return;
}
其中要注意的是:
1,init握手函数的json输出中,pid的value的number,所以不能加“”,其它输出value都是string。
2,每次输出到storm平台的json串最后都要加一行end,如:
outs<<out.toStyledString()<< "end\n"<<endl;
3,jsoncpp的reader.parse函数在遇到非json格式串时,会被阻塞住(不知原因),所以自己还简单判断了下storm传给bolt的消息串是否是json格式,否则丢弃
其中bolt的主体循环流程为:
[cpp]
view plaincopy
void process(::StormUtils& stormUtils)
{
Json::Reader reader;
Json::Value value;
string msg;
while(true)
{
//this function will be blocked from stdin
stormUtils.read_msg(std::cin,msg);
stormUtils.storm_log("read_msg:"+msg);
size_t pos=msg.find("{");
if (pos==string::npos)
{//no {} in string,then discard
continue;
}
else if (pos>0)
{//erase the invalid part from msg
msg=msg.erase(0,pos);
}
if (reader.parse(msg, value))
{
bool hasID=value.isMember("id");
if (hasID)
{//get tuple
//tuple is array
const char* ID=value["id"].asCString();
const Json::Value tuples = value["tuple"];
string tuple = tuples[0u].asString();
vector<string> words=stormUtils.split(tuple," ");
vector<string>::iterator it;
for(it=words.begin();it!=words.end();it++)
{
stormUtils.storm_log("emit:"+*it);
string outMsg[]={*it};
stormUtils.storm_emit(outMsg,1);
}
stormUtils.storm_log(ID);
stormUtils.storm_ack(ID);
}
}
else
{
stormUtils.storm_log("msg parse error:"+msg);
}
msg.clear();//ready to read again from stdin
}
return;
}
4,打包
把shell脚本和Bolt执行文件放在jar包的/resources目录下即可5,验证
无论在本地模式还是集群模式下都运行成功,调试时在topology中记得打开conf.setDebug(true);
这样在本地模式运行时,从日志里就可以看到C++程序中打印的日志,如:
4861 [Thread-21] INFO backtype.storm.task.ShellBolt - Shell msg: read_msg:{"id":"-1825914362791431189","stream":"default","comp":"MySpout","tuple":["snow white and the seven dwarfs"],"task":1}
系统启动C程序的日志:
4724 [Thread-20] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 2039
6,问题
1,不知为什么,bolt程序会接收到非json格式的输入:Shell msg: read_msg:[3][3][3][3][3][3]
造成程序僵死,用单元测试也发现jsoncpp有这个问题,还请高手指点验证
总之,用C++实现的Bolt基本跑通,为后续真正的业务模块(C++实现)的使用打下基础
相关文章推荐
- 生成不重复的随机数对(C/C++)
- C++函数高频调用的效率问题----内联函数能解决所有问题吗
- C++四则计算器源码
- C语言警告:warning C4018: “<”: 有符号/无符号不匹配
- c++中的数据类型
- C++构造函数和析构函数的调用顺序
- Deep C (and C++) by Olve Maudal and Jon Jagger
- C语言实现快速排序
- C++继承的例子 (1)
- C++继承的例子 (1)
- C++关于内联函数的一些思考
- Effictive C++學習(一)——概念問題
- C语言内涵教程的资料
- Keil: error C141: syntax error near 'unsigned'
- c++ 中缓冲区的理解
- c++欧几里得求最大公约数
- C/C++静态变量
- C++ serialize giude
- c++ ifstream ofstream 文件流
- 再议c语言的强符号和弱符号