您的位置:首页 > 其它

STORM入门之(集成KafkaBolt)

2017-08-21 17:11 453 查看
根据第一篇文章:STORM入门之(Topology简易Demo)进行扩展集成Kafka

STORM入门之(Topology简易Demo)传送门:http://blog.csdn.net/yl3395017/article/details/77449275

新增Bolt:

package com.storm.bolt;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class KafkaMsgBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void execute(Tuple arg0) {
String message = (String) arg0.getValue(0);
System.out.print("kafka message is:"+message);
collector.emit(new Values("forward",message));

}

@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
collector = arg2;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("key", "message"));

}

}


Topology新增静态方法,用于配置KafkaBolt

/**
* 构建KafkaBolt
* metadata.broker.list=10.2.4.13:9092,10.2.4.14:9092,10.2.4.12:9092
* bootstrap.servers=10.
4000
2.4.13:9092,10.2.4.14:9092,10.2.4.12:9092
* producer.type=async
* request.required.ack=1
* serializer.class=kafka.serializer.StringEncoder
* key.serializer=org.apache.kafka.common.serialization.StringSerializer
* value.serializer=org.apache.kafka.common.serialization.StringSerializer
* sendtopic=test
*/
private static void builtKafkaBolt(TopologyBuilder builder){
//kafka producer config
Properties prop = new Properties();
prop.put("metadata.broker.list", "10.2.4.13:9092,10.2.4.14:9092,10.2.4.12:9092");
prop.put("bootstrap.servers", "10.2.4.13:9092,10.2.4.14:9092,10.2.4.12:9092");
prop.put("producer.type","async");
prop.put("request.required.acks","1");
prop.put("serializer.class", "kafka.serializer.StringEncoder");
prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//kafkaBolt
KafkaBolt bolt = new KafkaBolt();
bolt.withTopicSelector(new DefaultTopicSelector("test_rce_yjd"));
bolt.withProducerProperties(prop);
bolt.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
//构建KafkaBolt
builder.setBolt("msgSentenceBolt", new KafkaMsgBolt()).shuffleGrouping("BoltA");
//转发kafka消息
builder.setBolt("forwardToKafka", bolt).shuffleGrouping("msgSentenceBolt");

}

修正Topology

public class Topology {
private static TopologyBuilder builder = null;
public static void main(String args[]) throws AuthorizationException, AlreadyAliveException, InvalidTopologyException {
builder = new TopologyBuilder();

builder.setSpout("SpoutA",new SpoutA(),1);
builder.setBolt("BoltA",new BoltA(),1).shuffleGrouping("SpoutA");
//构建KafkaBolt
builtKafkaBolt(builder);

Config conf = new Config();
conf.setDebug(true);

if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("soc", conf, builder.createTopology());
}
}

结果查看

KAFKA查看命令(根据实际情况自己修正路径):
/kafka/bin/kafka-console-consumer.sh --zookeeper 10.2.4.12:2181,10.2.4.13:2181,10.2.4.14:2181 --topic test_rce_yjd


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: