翻译规则词汇化概率用streaming实现的实例
2013-11-16 21:57
393 查看
使用combiner后能够减少io操作,700万左右的测试数据速度提高了1倍,数据更大提升效果更明显。
因为运算量较小,此处使用了streaming方式,性能和pipes相差不大。
mapper负责解析输入的对齐格式<src>source</src><tgt>target</tgt><alignment>alignment</alignment>
combiner负责合并src和tgt的频度,减少给reduce带来的io负担
reduce负责count(src,tgt)/count(src)运算。
mapper代码:
combiner代码
reducer代码:
因为运算量较小,此处使用了streaming方式,性能和pipes相差不大。
mapper负责解析输入的对齐格式<src>source</src><tgt>target</tgt><alignment>alignment</alignment>
combiner负责合并src和tgt的频度,减少给reduce带来的io负担
reduce负责count(src,tgt)/count(src)运算。
mapper代码:
#include <iostream> #include <fstream> #include <string> #include <stdlib.h> #include <vector> #include "boost/lexical_cast.hpp" using namespace std; void split_string_by_tag(const string & src, vector<string> & tgt_vec, const string & tag) { tgt_vec.clear(); size_t offset = 0; while(offset < src.size()) { size_t pos = src.find(tag, offset); if(pos == string::npos) { tgt_vec.push_back(src.substr(offset)); offset = src.size(); break; } tgt_vec.push_back(src.substr(offset, pos-offset)); offset = pos + tag.size(); } } int main(){ string line; while (getline(cin,line)) { std::string::size_type ep=line.find("</srcword>"); std::string src=line.substr(9,ep-9); std::string::size_type sp=line.find("<s>",ep+1); ep=line.find("</tgtword>",sp); std::string tgt=line.substr(sp,ep-sp); sp=line.find("<alignment>"); ep=line.find("</align"); string align=line.substr(sp+11,ep-sp-11); vector<string> src_vec,tgt_vec,align_vec; split_string_by_tag(src,src_vec," "); split_string_by_tag(tgt,tgt_vec," "); split_string_by_tag(align,align_vec," "); vector<int> src_aligns,tgt_aligns; src_aligns.resize((int)src_vec.size(),0); tgt_aligns.resize((int)tgt_vec.size(),0); for(int i=0;i<(int)align_vec.size();++i){ sp=align_vec[i].find("-"); int fi=boost::lexical_cast<int>(align_vec[i].substr(0,sp)); int ti=boost::lexical_cast<int>(align_vec[i].substr(sp+1)); src_aligns[fi]=1; tgt_aligns[ti]=1; cout<<src_vec[fi]<<"\t"<<tgt_vec[ti]<<endl; cout<<tgt_vec[ti]<<"\t"<<src_vec[fi]<<endl; } for(int i=0;i<(int)src_vec.size();++i){ if(src_aligns[i]==0){ cout<<src_vec[i]<<"\t"<<"NULL"<<endl; cout<<"NULL"<<"\t"<<src_vec[i]<<endl; } } for(int i=0;i<(int)tgt_vec.size();++i){ if(tgt_aligns[i]==0){ cout<<"NULL"<<"\t"<<tgt_vec[i]<<endl; cout<<tgt_vec[i]<<"\t"<<"NULL"<<endl; } } } }
combiner代码
#include <iostream> #include <fstream> #include <string> #include <stdlib.h> #include <vector> #include <map> #include "boost/lexical_cast.hpp" using namespace std; void split_string_by_tag(const string & src, vector<string> & tgt_vec, const string & tag) { tgt_vec.clear(); size_t offset = 0; while(offset < src.size()) { size_t pos = src.find(tag, offset); if(pos == string::npos) { tgt_vec.push_back(src.substr(offset)); offset = src.size(); break; } tgt_vec.push_back(src.substr(offset, pos-offset)); offset = pos + tag.size(); } } int main(){ string line; map<string,int> tgt_cnt; string last_key=""; while (getline(cin,line)) { string::size_type p=line.find("\t"); p=line.find("\t",p+1); if(p!=string::npos){ tgt_cnt[line.substr(0,p)]+=boost::lexical_cast<int>(line.substr(p+1)); }else tgt_cnt[line]+=1; } map<string,int >::const_iterator cit; for(cit=tgt_cnt.begin();cit!=tgt_cnt.end();++cit){ cout<<cit->first<<"\t"<<cit->second<<endl; } }
reducer代码:
#include <iostream> #include <fstream> #include <string> #include <stdlib.h> #include <vector> #include <map> #include "boost/lexical_cast.hpp" using namespace std; void split_string_by_tag(const string & src, vector<string> & tgt_vec, const string & tag) { tgt_vec.clear(); size_t offset = 0; while(offset < src.size()) { size_t pos = src.find(tag, offset); if(pos == string::npos) { tgt_vec.push_back(src.substr(offset)); offset = src.size(); break; } tgt_vec.push_back(src.substr(offset, pos-offset)); offset = pos + tag.size(); } } int main(){ string line; map<string,map<string,int> > hash; map<string,int> tgt_cnt; string last_key=""; while (getline(cin,line)) { string::size_type p=line.find("\t"); string src=line.substr(0,p); string::size_type ep=line.find("\t",p+1); string tgt=line.substr(p+1); int cnt=1; if(ep!=string::npos){ tgt=line.substr(p+1,ep-1-p); cnt=boost::lexical_cast<int>(line.substr(ep+1)); } //tgt=tgt.substr(0,p); hash[src][tgt]+=cnt; tgt_cnt[src]+=cnt; /*if(src!=last_key&&!last_key.empty()){ map<string,map<string,int> >::const_iterator cit; for(cit=hash.begin();cit!=hash.end();++cit){ int sum=tgt_cnt[cit->first]; for(map<string,int>::const_iterator cit2=cit->second.begin();cit2!=cit->second.end();++cit2){ cout<<cit->first<<"\t"<<cit2->first<<"\t"<<(float)(cit2->second)/(float)sum<<endl; } } } last_key=src; hash.clear(); tgt_cnt.clear();*/ } map<string,map<string,int> >::const_iterator cit; for(cit=hash.begin();cit!=hash.end();++cit){ int sum=tgt_cnt[cit->first]; for(map<string,int>::const_iterator cit2=cit->second.begin();cit2!=cit->second.end();++cit2){ cout<<cit->first<<"\t"<<cit2->first<<"\t"<<(float)(cit2->second)/(float)sum<<endl; } } }
相关文章推荐
- Hadoop_2.1.0 MapReduce序列图
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- Hadoop安装感悟
- hadoop安装lzo
- HDFS 文件操作
- HBase基本原理
- HDFS DatanodeProtocol——sendHeartbeat
- HDFS DatanodeProtocol——register
- Hadoop集群提交作业问题总结
- Hadoop源码分析 HDFS ClientProtocol——addBlock
- Hadoop源码分析HDFS ClientProtocol——create
- Hadoop源码分析FSNamesystem几个重要的成员变量
- Hadoop源码分析HDFS ClientProtocol——getBlockLocations