stom实时单词统计
2016-12-25 17:12
453 查看
注:我是用一起写office写的,发到博客上格式就变了,,,变了,,,
1.微批处理可以根据数据的条数或者间隔时间来定。
实时处理有两种方式。
一是持续流处理,
二是微批处理。
2数据纪录处理情况
一是至少一次, 存在重复处理
二是有且仅有一次 严格
三是至多一次 存在漏出里
3.配置
stom.yaml(python格式,注意文件的配置)
# Licensed to the Apache Software Foundation (ASF)
under one
# or more contributor license agreements. See the
NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses
this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License
at
#
# http://www.apache.org/licenses/LICENSE-2.0 #
# Unless required by applicable law or agreed to in
writing, software
# distributed under the License is distributed on an
"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
# See the License for the specific language governing
permissions and
# limitations under the License.
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "hadoop02-linux.alibaba.com"
# - "server2"
#
nimbus.host: "hadoop02-linux.alibaba.com"
#
#
storm.local.dir: "/opt/modules/apache-storm-0.9.6/worspace"
# ##### These may optionally be filled in:
#
## List of custom serializations
# topology.kryo.register:
# - org.mycompany.MyType
# - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
# - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
# - "server1"
# - "server2"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
ui.port: 8082
## Metrics Consumers
# topology.metrics.consumer.register:
# - class: "backtype.storm.metric.LoggingMetricsConsumer"
# parallelism.hint: 1
# - class: "org.mycompany.MyMetricsConsumer"
# parallelism.hint: 1
# argument:
# - endpoint: "metrics-collector.mycompany.org"
4.nimbus
当集群中没有人提交任务,没有supervisor节点挂掉,主节点短时间挂掉是没有问题的,只要配合运维,监控重新启动就可以。
启动,并查看进程
运行官方示例程序
bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar
storm.starter.WordCountTopology wordcount
注意的是: 停止Storm集群之前,一定要先将运行在Storm集群上的Topology先停掉,否则下次重启Storm集群,Storm会将这些Topology都启动起来
5.停止集群,没有脚本文件,可以自行编写,或者使用kill
跑bolt是线程跑的。
6.编写word topology
SentenceSpout:
package com.ibeifeng.bigdata.storm.topo;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* spout采集数据源
* @author lm
*
*/
public class SentenceSpout implements IRichSpout{
//抽象类
BaseRichSpout
//BaseRichSpout
private
static final Logger log = LoggerFactory.getLogger(SentenceSpout.class);
private
SpoutOutputCollector collector;
private
static final String[] Sentence =
{"hadoop
yarn mapreduce",
"flume
error liangman yunkjhhj",
"abc
dec abc abd fff",
"dhjfh
jkdshg hjdkfgh"};
@Override
public
void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector
= collector;
}
@Override
public
void close() {
//
TODO Auto-generated method stub
}
@Override
public
void activate() {
//
和页面激活有关,一般不写
}
@Override
public
void deactivate() {
//
和页面上的失效有关,一般不写
}
@Override
public
void nextTuple() {
//random
get the data
String
tmp = Sentence[ new Random().nextInt(Sentence.length)];
if(tmp.contains("error")){
log.error("数据错误");
}else{
//告诉发送的value是什么,但是没告诉key是什么,declareOutputFields会告诉key的名称
//key
and value 是一一对应的
this.collector.emit(new
Values(tmp));
}
}
@Override
public
void ack(Object msgId) {
//
TODO Auto-generated method stub
}
@Override
public
void fail(Object msgId) {
//
TODO Auto-generated method stub
}
@Override
public
void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new
Fields("seq"));
}
@Override
public
Map<String, Object> getComponentConfiguration() {
//针对本组件单独的配置
return
null;
}
}
SplitBolt
package com.ibeifeng.bigdata.storm.topo;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 开发一个bolt
* @author lm
*
*/
public class SplitBolt implements IRichBolt{
private
OutputCollector collector;
@Override
public
void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector
= collector;
}
/**
*接收到前面的一条tuple就调用一次
*
*/
@Override
public
void execute(Tuple input) {
//
String
sentence = input.getStringByField("seq");
if(StringUtils.isNotBlank(sentence)){
String[]
words = sentence.split(" ");
for(String
word:words){
this.collector.emit(new
Values(word));
}
}
}
@Override
public
void cleanup() {
//
TODO Auto-generated method stub
}
@Override
public
void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new
Fields("word"));
}
@Override
public
Map<String, Object> getComponentConfiguration() {
//
TODO Auto-generated method stub
return
null;
}
}
CountBolt
package com.ibeifeng.bigdata.storm.topo;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 计数bolt
* @author lm
*
*/
public class CountBolt extends BaseRichBolt{
private
OutputCollector collector;
private
Map<String ,Integer> map;
@Override
public
void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//
TODO Auto-generated method stub
this.collector=collector;
map
= new HashMap<String,Integer>();
}
@Override
public
void execute(Tuple input) {
String
word = input.getStringByField("word");
int
count = 1;
if(map.containsKey(word)){
count
= map.get(word) + 1;
}
this.collector.emit(new
Values(word,count));
}
@Override
public
void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new
Fields("word","count"));
}
}
PrintBolt
package com.ibeifeng.bigdata.storm.topo;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class PrintBolt extends BaseRichBolt{
private
static final Logger logger = LoggerFactory.getLogger(PrintBolt.class);
@Override
public
void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public
void execute(Tuple input) {
String
word = input.getStringByField("word");
Integer
value = input.getIntegerByField("count");
logger.info("单次"+word+"数量"+value);
//单词 数量
}
@Override
public
void declareOutputFields(OutputFieldsDeclarer declarer) {
//
TODO Auto-generated method stub
}
}
主程序:
package com.ibeifeng.bigdata.storm.topo;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* 一个实时计数单词的例子
* @author lm
*
*/
public class WordCountToplogy {
public
static final String SPOUT_ID ="sentenceSpout";
public
static final String SPILT_ID ="spiltBolt";
public
static final String COUNT_ID ="countBolt";
public
static final String PRINT_ID ="printBolt";
public
static void main(String[] args){
//点和边的连接
//边就是数据流分组的问题
TopologyBuilder
builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID,
new SentenceSpout());
builder.setBolt(SPILT_ID,
new SplitBolt()).shuffleGrouping(SPOUT_ID);
builder.setBolt(COUNT_ID,
new CountBolt()).fieldsGrouping(SPILT_ID,new Fields("word"));
builder.setBolt(PRINT_ID,
new PrintBolt()).globalGrouping(COUNT_ID);
Config
conf = new Config();
if(args
== null || args.length == 0){
//本地执行
LocalCluster
lc = new LocalCluster();
lc.submitTopology("wordcount",
conf, builder.createTopology());
}else{
//指定多少个进程执行
conf.setNumWorkers(1);
try
{
StormSubmitter.submitTopology(args[0],
conf, builder.createTopology());
}
catch (AlreadyAliveException | InvalidTopologyException e) {
//
TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
打包:在集群上运行
工作节点:
打印节点的输出信息:
1.微批处理可以根据数据的条数或者间隔时间来定。
实时处理有两种方式。
一是持续流处理,
二是微批处理。
2数据纪录处理情况
一是至少一次, 存在重复处理
二是有且仅有一次 严格
三是至多一次 存在漏出里
3.配置
stom.yaml(python格式,注意文件的配置)
# Licensed to the Apache Software Foundation (ASF)
under one
# or more contributor license agreements. See the
NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses
this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License
at
#
# http://www.apache.org/licenses/LICENSE-2.0 #
# Unless required by applicable law or agreed to in
writing, software
# distributed under the License is distributed on an
"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
# See the License for the specific language governing
permissions and
# limitations under the License.
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "hadoop02-linux.alibaba.com"
# - "server2"
#
nimbus.host: "hadoop02-linux.alibaba.com"
#
#
storm.local.dir: "/opt/modules/apache-storm-0.9.6/worspace"
# ##### These may optionally be filled in:
#
## List of custom serializations
# topology.kryo.register:
# - org.mycompany.MyType
# - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
# - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
# - "server1"
# - "server2"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
ui.port: 8082
## Metrics Consumers
# topology.metrics.consumer.register:
# - class: "backtype.storm.metric.LoggingMetricsConsumer"
# parallelism.hint: 1
# - class: "org.mycompany.MyMetricsConsumer"
# parallelism.hint: 1
# argument:
# - endpoint: "metrics-collector.mycompany.org"
4.nimbus
当集群中没有人提交任务,没有supervisor节点挂掉,主节点短时间挂掉是没有问题的,只要配合运维,监控重新启动就可以。
启动,并查看进程
运行官方示例程序
bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar
storm.starter.WordCountTopology wordcount
注意的是: 停止Storm集群之前,一定要先将运行在Storm集群上的Topology先停掉,否则下次重启Storm集群,Storm会将这些Topology都启动起来
5.停止集群,没有脚本文件,可以自行编写,或者使用kill
跑bolt是线程跑的。
6.编写word topology
SentenceSpout:
package com.ibeifeng.bigdata.storm.topo;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* spout采集数据源
* @author lm
*
*/
public class SentenceSpout implements IRichSpout{
//抽象类
BaseRichSpout
//BaseRichSpout
private
static final Logger log = LoggerFactory.getLogger(SentenceSpout.class);
private
SpoutOutputCollector collector;
private
static final String[] Sentence =
{"hadoop
yarn mapreduce",
"flume
error liangman yunkjhhj",
"abc
dec abc abd fff",
"dhjfh
jkdshg hjdkfgh"};
@Override
public
void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector
= collector;
}
@Override
public
void close() {
//
TODO Auto-generated method stub
}
@Override
public
void activate() {
//
和页面激活有关,一般不写
}
@Override
public
void deactivate() {
//
和页面上的失效有关,一般不写
}
@Override
public
void nextTuple() {
//random
get the data
String
tmp = Sentence[ new Random().nextInt(Sentence.length)];
if(tmp.contains("error")){
log.error("数据错误");
}else{
//告诉发送的value是什么,但是没告诉key是什么,declareOutputFields会告诉key的名称
//key
and value 是一一对应的
this.collector.emit(new
Values(tmp));
}
}
@Override
public
void ack(Object msgId) {
//
TODO Auto-generated method stub
}
@Override
public
void fail(Object msgId) {
//
TODO Auto-generated method stub
}
@Override
public
void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new
Fields("seq"));
}
@Override
public
Map<String, Object> getComponentConfiguration() {
//针对本组件单独的配置
return
null;
}
}
SplitBolt
package com.ibeifeng.bigdata.storm.topo;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 开发一个bolt
* @author lm
*
*/
public class SplitBolt implements IRichBolt{
private
OutputCollector collector;
@Override
public
void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector
= collector;
}
/**
*接收到前面的一条tuple就调用一次
*
*/
@Override
public
void execute(Tuple input) {
//
String
sentence = input.getStringByField("seq");
if(StringUtils.isNotBlank(sentence)){
String[]
words = sentence.split(" ");
for(String
word:words){
this.collector.emit(new
Values(word));
}
}
}
@Override
public
void cleanup() {
//
TODO Auto-generated method stub
}
@Override
public
void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new
Fields("word"));
}
@Override
public
Map<String, Object> getComponentConfiguration() {
//
TODO Auto-generated method stub
return
null;
}
}
CountBolt
package com.ibeifeng.bigdata.storm.topo;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 计数bolt
* @author lm
*
*/
public class CountBolt extends BaseRichBolt{
private
OutputCollector collector;
private
Map<String ,Integer> map;
@Override
public
void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//
TODO Auto-generated method stub
this.collector=collector;
map
= new HashMap<String,Integer>();
}
@Override
public
void execute(Tuple input) {
String
word = input.getStringByField("word");
int
count = 1;
if(map.containsKey(word)){
count
= map.get(word) + 1;
}
this.collector.emit(new
Values(word,count));
}
@Override
public
void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new
Fields("word","count"));
}
}
PrintBolt
package com.ibeifeng.bigdata.storm.topo;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class PrintBolt extends BaseRichBolt{
private
static final Logger logger = LoggerFactory.getLogger(PrintBolt.class);
@Override
public
void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public
void execute(Tuple input) {
String
word = input.getStringByField("word");
Integer
value = input.getIntegerByField("count");
logger.info("单次"+word+"数量"+value);
//单词 数量
}
@Override
public
void declareOutputFields(OutputFieldsDeclarer declarer) {
//
TODO Auto-generated method stub
}
}
主程序:
package com.ibeifeng.bigdata.storm.topo;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* 一个实时计数单词的例子
* @author lm
*
*/
public class WordCountToplogy {
public
static final String SPOUT_ID ="sentenceSpout";
public
static final String SPILT_ID ="spiltBolt";
public
static final String COUNT_ID ="countBolt";
public
static final String PRINT_ID ="printBolt";
public
static void main(String[] args){
//点和边的连接
//边就是数据流分组的问题
TopologyBuilder
builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID,
new SentenceSpout());
builder.setBolt(SPILT_ID,
new SplitBolt()).shuffleGrouping(SPOUT_ID);
builder.setBolt(COUNT_ID,
new CountBolt()).fieldsGrouping(SPILT_ID,new Fields("word"));
builder.setBolt(PRINT_ID,
new PrintBolt()).globalGrouping(COUNT_ID);
Config
conf = new Config();
if(args
== null || args.length == 0){
//本地执行
LocalCluster
lc = new LocalCluster();
lc.submitTopology("wordcount",
conf, builder.createTopology());
}else{
//指定多少个进程执行
conf.setNumWorkers(1);
try
{
StormSubmitter.submitTopology(args[0],
conf, builder.createTopology());
}
catch (AlreadyAliveException | InvalidTopologyException e) {
//
TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
打包:在集群上运行
工作节点:
打印节点的输出信息:
相关文章推荐
- textarea实时统计输入单词个数
- SparkStreaming的实时单词统计小例子
- Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数
- java统计一个文件的字符数,单词数,行数
- codevs1040 统计单词个数
- 第二学期第二周任务三之统计字符串中单词的个数
- 统计文本文件中单词出现频率(用java集合框架编写)
- 我工作这几年(五)-- 在代码中加入一些关键统计信息来实时监控程序的运行状态
- Hive mapreduce SQL实现原理——SQL最终分解为MR任务,而group by在MR里和单词统计MR没有区别了
- 一个实用的FSO-实时统计在线人数
- 集体智慧编程-单词统计
- 统计字符串中的单词个数
- MR案例:统计单词个数
- 统计单词数
- 八、手把手教MapReduce 单词统计案例编程
- 利用正则表达式统计单词个数
- 用C语言实现了对英文文章中单词频率的统计,得到出现最多的前十个!
- 使用单词树进行词频统计算法
- noip2011普及组——统计单词数