【storm-kafka】storm和kafka结合处理流式数据
2016-08-31 09:27
375 查看
首先简单描述下storm
Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm经常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理非常简单,而且,在同类的流式计算工具,Storm的性能也是非常出众的。关于kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统。最近项目中有个需求是需要从kafka中将订阅到的流式数据实时持久化到elasticsearch和accumulo中。这里主要记录的是关于kafka和storm的整合。关于zookeeper,kafka,storm,elasticsearch等的安装可以网上找。
首先就是配置maven
<dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> <!-- 由于storm环境中有该jar,所以不用pack到最终的task.jar中 --> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <!-- kafka目前已经有2.10了,但是我用了,任务执行报错,目前只能用kafka_2.9.2,我kafka服务端也是用最新的2.10版本 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.2.2</version> <!-- 排除以下jar,由于storm服务端有log4j,避免冲突报错--> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j<artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>1.4.4</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j<artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <encoding>utf-8</encoding> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest><manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build>
由于storm-kafka已经实现了spout,我们直接用就可以。
Bolt代码
public class FilterBolt extends BaseRichBolt{ private OutputCollector collector; /** * 初始化工作 * / public void prepare(Map map, TopologyContext context, OutputCollector collector){ this.collector = collector; } /** * 执行逻辑,目的是过滤无用的字符串 */ public void execute(Tuple input){ String str = input.getString(0); if(StringUtils.isNotBlank(str)){ String [] lines = str.split("\n"); for(String line : lines){ if(StringUtils.isBlank(line) || line.charAt(0) == '#'){ continue; } //发射到下一个bolt collector.emit(new Values(line)); } //汇报给storm,该tuple执行成功 collector.ack(input); }else{ //执行失败 collector.fail(input); } } /** * 申明传入到一个Bolt的字段名称 */ public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("line")); } }
下面是转换解析string,生成json,并将json保存到elasticsearch中。
public class TransferBolt extends BaseRichBolt{ private Logger LOG = LoggerFactory.getLogger(TransferBolt.class); private OutputCollector collector; public void prepare(Map map, TopologyContext context, OutputCollector collector){ this.collector = collector; } public void execute(Tuple input){ String line = input.getString(0); JSONObject json = JSONObject.toJson(line); BulkRequest bulkRequest = new BulkRequest(); IndexRequest indexRequest = new IndexRequest("test","element",json.getString("id")).source(json.getJSONObject("source").toString()); bulkRequest.add(indexRequest); BulkResponse response = client.bulk(bulkRequest).actionGet(); client.admin().indices().prepareRefresh("test").execute().actionGet(); } }
topology
public class KafkaTopology{ public static void main(String[] args) throws Exception{ String zks = PropertiesUtils.getString(KafkaProperties.ZK_HOSTS); String topic = PropertiesUtils.getString(KafkaProperties.TOPIC); String zkRoot = PropertiesUtils.getString(KafkaProperties.ZK_ROOT); String id = PropertiesUtils.getString(KafkaProperties.STORM_ID); BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,topic,zkRoot,id); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConfig.zkServers = Arrays.asList(PropertiesUtil.getString(KafkaProperties.ZK_SERVERS).split(",")); spoutConfig.zkPort = PropertiesUtil.getInt(KafkaProperties.ZK_PORT); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader",new KafkaSpout(spoutConfig),1); builder.setBolt("filter-bolt",new FilterBolt(),1).shuffleGrouping("kafka-reader"); builder.setBolt("input-line",new TransferBolt(),1).shuffleGrouping("reader-input"); Config config = new Config(); String name = KafkaTopology.class.getSimpleName(); config.setNumWorkers(PropertiesUtil.getInt(KafkaProperties.NUM_WORKERS)); StormSubmitter.submitTopologyWithProgressBar(name,config,builder.createTopology()); } }
相关文章推荐
- 大数据处理 Hadoop、HBase、ElasticSearch、Storm、Kafka、Spark
- 实时数据处理插件开发flume+kafka+storm:flume
- 实时数据处理环境搭建flume+kafka+storm:1.zookeeper 安装配置
- 实时数据处理环境搭建flume+kafka+storm:3.kafka安装
- Flume+Kafka+Storm+Redis构建大数据实时处理系统 - 大数据
- 如何在E-MapReduce上提交Storm作业处理Kafka数据
- 流式数据分析模型kafka+storm
- 实时数据处理环境搭建flume+kafka+storm:4.storm安装配置
- Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示
- Flume+Kafka+Storm+Redis构建大数据实时处理系统 - 大数据
- Spark streaming + Kafka 流式数据处理,结果存储至MongoDB、Solr、Neo4j(自用)
- storm kafka出现错误或fail后,是否继续处理数据?
- 实时数据处理环境搭建flume+kafka+storm:2.flume 安装
- 大数据处理之流式计算 storm安装
- 大数据处理 Hadoop、HBase、ElasticSearch、Storm、Kafka、Spark
- Storm的“翻版”:LinkedIn开源实时数据处理系统Samza
- 使用 Twitter Storm 处理实时的大数据
- 使用 Twitter Storm 处理实时的大数据
- 使用 Twitter Storm 处理实时的大数据
- 流式数据处理的计算模型