strom实现数单词功能
2015-06-05 10:29
204 查看
package com.mytest.myExample;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class PrintMyName {
//编写一个Spout
public static class WordReaderSpout extends BaseRichSpout{
SpoutOutputCollector _collector;
// Random ran;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this._collector = collector;
}
private Integer count =0;
@Override
public void nextTuple() {
String[] sentences = new String[]{
"nihao","bl","hello","bl","thank you","very","bl"};
if(count>=sentences.length){
return;
}else{
_collector.emit(new Values(sentences[count]));
count++;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
//编写一个bolt处理处理数据
public static class SearchBolt extends BaseRichBolt{
Map<Integer,String> counts = new HashMap<Integer,String>();
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
}
int countt = 0;
@Override
public void execute(Tuple input) {
String word = input.getString(0);
System.out.println("###### " + word);
if(word.trim().equals("bl")){
counts.put(countt, " @@@@@@@@@@@This is bl"+countt);
countt++;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public void cleanup(){
for(Map.Entry<Integer, String> entry :counts.entrySet()){
System.out.println("@@@@"+entry.getKey()+entry.getValue());
}
}
}
public static void main(String[] args) {
TopologyBuilder build = new TopologyBuilder();
build.setSpout("haveBL", new WordReaderSpout());
build.setBolt("searchbl", new SearchBolt()).
shuffleGrouping("haveBL");
// fieldsGrouping("haveBL", new Fields("word"));
Config conf = new Config();
conf.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("searchMyName", conf, build.createTopology());
try {
Thread.sleep(2000);
cluster.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.mytest.myExample;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class PrintMyName {
//编写一个Spout
public static class WordReaderSpout extends BaseRichSpout{
SpoutOutputCollector _collector;
// Random ran;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this._collector = collector;
}
private Integer count =0;
@Override
public void nextTuple() {
String[] sentences = new String[]{
"nihao","bl","hello","bl","thank you","very","bl"};
if(count>=sentences.length){
return;
}else{
_collector.emit(new Values(sentences[count]));
count++;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
//编写一个bolt处理处理数据
public static class SearchBolt extends BaseRichBolt{
Map<Integer,String> counts = new HashMap<Integer,String>();
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
}
int countt = 0;
@Override
public void execute(Tuple input) {
String word = input.getString(0);
System.out.println("###### " + word);
if(word.trim().equals("bl")){
counts.put(countt, " @@@@@@@@@@@This is bl"+countt);
countt++;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public void cleanup(){
for(Map.Entry<Integer, String> entry :counts.entrySet()){
System.out.println("@@@@"+entry.getKey()+entry.getValue());
}
}
}
public static void main(String[] args) {
TopologyBuilder build = new TopologyBuilder();
build.setSpout("haveBL", new WordReaderSpout());
build.setBolt("searchbl", new SearchBolt()).
shuffleGrouping("haveBL");
// fieldsGrouping("haveBL", new Fields("word"));
Config conf = new Config();
conf.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("searchMyName", conf, build.createTopology());
try {
Thread.sleep(2000);
cluster.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
相关文章推荐
- IO-00 Hello World! (5)
- Highcharts tooltip工具提示
- [.net 面向对象编程基础] (7) 基础中的基础——流程控制语句
- 文件操作之木马后门的配置
- CURL不依赖COOKIEJAR获取COOKIE
- python执行环境
- Maximal Square
- POJ 1201-Intervals(差分约束系统)
- 深入理解Linux修改hostname
- cocos2d的-X- luaproject的LUA脚本加密
- 数据结构系列之归并排序
- 对Ajax的支持——深入浅出学Spring Web MVC
- 在eclipse中安装m2eclipse
- CodeForces479C Exams(贪心)
- Android 4.4前后版本读取图库图片和拍照完美解决方案
- file_get_contents与curl的比较
- Android 布局 fill_parent、wrap_content和match_parent的区别和作用
- Js、css代码文件规范
- iOS 绘图详解
- eval、json.parse()的介绍和使用注意点