您的位置:首页 > 其它

Storm实验 -- 单词计数2

2015-05-03 18:25 351 查看
在上一次单词计数的基础上做如下改动:

1. 使用可靠的消息处理机制

2. 配置 worker 、executor、task 数量

3. 使用集群模式提交

数据源spout

package com.zhch.v2;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class SentenceSpout extends BaseRichSpout {
private FileReader fileReader = null;
private boolean completed = false;

private ConcurrentHashMap<UUID, Values> pending;
private SpoutOutputCollector collector;

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
this.pending = new ConcurrentHashMap<UUID, Values>();

try {
this.fileReader = new FileReader(map.get("wordsFile").toString());
} catch (Exception e) {
throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");
}
}

@Override
public void nextTuple() {
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}

String line;
BufferedReader reader = new BufferedReader(fileReader);
try {
while ((line = reader.readLine()) != null) {
Values values = new Values(line);
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, values);
this.collector.emit(values, msgId); //发射时带上msgId,实现可靠的消息处理
}
} catch (Exception e) {
throw new RuntimeException("Error reading tuple", e);
} finally {
completed = true;
}
}

@Override
public void ack(Object msgId) {
this.pending.remove(msgId); //确认时,从列表中删除该tuple
}

@Override
public void fail(Object msgId) {
this.collector.emit(this.pending.get(msgId), msgId); //失败或超时,重新发射
}
}

实现语句分割bolt

package com.zhch.v2;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}

@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(tuple, new Values(word)); //将输出的tuple和输入的tuple锚定,实现可靠的消息处理
}
this.collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}

实现单词计数bolt

package com.zhch.v2;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap<String, Long> counts = null;

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap<String, Long>();
}

@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);

BufferedWriter writer = null;
try {
writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));
Iterator<String> keys = this.counts.keySet().iterator();
while (keys.hasNext()) {
String w = keys.next();
Long c = this.counts.get(w);
writer.write(w + " : " + c);
writer.newLine();
writer.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (writer != null) {
try {
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
writer = null;
}
}

this.collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
}

实现单词计数topology

package com.zhch.v2;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology-v2";

public static void main(String[] args) throws Exception {
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(SENTENCE_SPOUT_ID, spout, 2); //使用2个spout executor
builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4) //使用2个spiltBolt executor,4个task
.shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt, 2) //使用2个countBolt executor
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

Config config = new Config();
config.put("wordsFile", args[0]);

if (args != null && args.length > 1) {
config.setNumWorkers(2); //使用2个worker processes
//集群模式启动
StormSubmitter.submitTopology(args[1], config, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
}

编写完程序后使用命令 mvn clean install 打包,然后提交到Storm集群

[grid@hadoop4 stormTarget]$ storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v2.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v2

运行结果:

[grid@hadoop5 stormData]$ cat result.txt
can : 1
second : 1
simple : 1
set : 1
data : 2
unbounded : 1
has : 1
Apache : 1
open : 1
over : 1
free : 1
easy : 2
fast: : 1
reliably : 1
any : 1
with : 1
million : 1
is : 6
learning : 1
analytics : 1
torm : 1
node : 1
processed : 2
what : 1
batch : 1
operate : 1
will : 1
language : 1
fault-tolerant : 1
[grid@hadoop6 stormData]$ cat result.txt
to : 3
for : 2
distributed : 2
use : 2
used : 1
Storm : 4
It : 1
online : 1
cases: : 1
of : 2
programming : 1
more : 1
clocked : 1
scalable : 1
processing : 2
guarantees : 1
be : 2
ETL : 1
continuous : 1
it : 2
Hadoop : 1
makes : 1
your : 1
a : 4
at : 1
did : 1
fun : 1
machine : 1
up : 1
and : 5
process : 1
RPC : 1
many : 1
system : 1
source : 1
realtime : 3
benchmark : 1
per : 2
doing : 1
lot : 1
streams : 1
computation : 2
tuples : 1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: