您的位置:首页 > 其它

strom实现数单词功能

2015-06-05 10:29 204 查看

package com.mytest.myExample;

import java.util.HashMap;

import java.util.Map;

import java.util.Random;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class PrintMyName {



//编写一个Spout

public static class WordReaderSpout extends BaseRichSpout{



SpoutOutputCollector _collector;

// Random ran;

@Override

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {



this._collector = collector;

}

private Integer count =0;

@Override

public void nextTuple() {

String[] sentences = new String[]{

"nihao","bl","hello","bl","thank you","very","bl"};

if(count>=sentences.length){

return;

}else{

_collector.emit(new Values(sentences[count]));

count++;

}



}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {



declarer.declare(new Fields("word"));

}



}

//编写一个bolt处理处理数据

public static class SearchBolt extends BaseRichBolt{

Map<Integer,String> counts = new HashMap<Integer,String>();

@Override

public void prepare(Map stormConf, TopologyContext context,

OutputCollector collector) {



}

int countt = 0;

@Override

public void execute(Tuple input) {

String word = input.getString(0);

System.out.println("###### " + word);

if(word.trim().equals("bl")){

counts.put(countt, " @@@@@@@@@@@This is bl"+countt);

countt++;

}



}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// TODO Auto-generated method stub



}



@Override

public void cleanup(){

for(Map.Entry<Integer, String> entry :counts.entrySet()){

System.out.println("@@@@"+entry.getKey()+entry.getValue());

}

}



}

public static void main(String[] args) {

TopologyBuilder build = new TopologyBuilder();

build.setSpout("haveBL", new WordReaderSpout());

build.setBolt("searchbl", new SearchBolt()).

shuffleGrouping("haveBL");

// fieldsGrouping("haveBL", new Fields("word"));

Config conf = new Config();

conf.setDebug(false);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("searchMyName", conf, build.createTopology());

try {

Thread.sleep(2000);

cluster.shutdown();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: