您的位置:首页 > 其它

【storm-kafka】storm和kafka结合处理流式数据

2016-08-31 09:27 375 查看

首先简单描述下storm

Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm经常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理非常简单,而且,在同类的流式计算工具,Storm的性能也是非常出众的。

关于kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统。

最近项目中有个需求是需要从kafka中将订阅到的流式数据实时持久化到elasticsearch和accumulo中。这里主要记录的是关于kafka和storm的整合。关于zookeeper,kafka,storm,elasticsearch等的安装可以网上找。

首先就是配置maven

<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<!-- 由于storm环境中有该jar,所以不用pack到最终的task.jar中 -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<!-- kafka目前已经有2.10了,但是我用了,任务执行报错,目前只能用kafka_2.9.2,我kafka服务端也是用最新的2.10版本 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.2</version>
<!-- 排除以下jar,由于storm服务端有log4j,避免冲突报错-->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j<artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.4.4</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j<artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>utf-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest><manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<encoding>utf-8</encoding>
</configuration>
</plugin>
</plugins>
</build>


由于storm-kafka已经实现了spout,我们直接用就可以。

Bolt代码

public class FilterBolt extends BaseRichBolt{
private OutputCollector collector;

/**
* 初始化工作
* /
public void prepare(Map map, TopologyContext context, OutputCollector collector){
this.collector = collector;
}

/**
* 执行逻辑,目的是过滤无用的字符串
*/
public void execute(Tuple input){
String str = input.getString(0);
if(StringUtils.isNotBlank(str)){
String [] lines = str.split("\n");
for(String line : lines){
if(StringUtils.isBlank(line) || line.charAt(0) == '#'){
continue;
}
//发射到下一个bolt
collector.emit(new Values(line));
}
//汇报给storm,该tuple执行成功
collector.ack(input);
}else{
//执行失败
collector.fail(input);
}
}

/**
* 申明传入到一个Bolt的字段名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("line"));
}
}


下面是转换解析string,生成json,并将json保存到elasticsearch中。

public class TransferBolt extends BaseRichBolt{

private Logger LOG = LoggerFactory.getLogger(TransferBolt.class);

private OutputCollector collector;

public void prepare(Map map, TopologyContext context, OutputCollector collector){
this.collector = collector;
}

public void execute(Tuple input){
String line = input.getString(0);
JSONObject json = JSONObject.toJson(line);
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest = new IndexRequest("test","element",json.getString("id")).source(json.getJSONObject("source").toString());
bulkRequest.add(indexRequest);
BulkResponse response = client.bulk(bulkRequest).actionGet();
client.admin().indices().prepareRefresh("test").execute().actionGet();
}

}


topology

public class KafkaTopology{

public static void main(String[] args) throws Exception{
String zks = PropertiesUtils.getString(KafkaProperties.ZK_HOSTS);
String topic = PropertiesUtils.getString(KafkaProperties.TOPIC);
String zkRoot = PropertiesUtils.getString(KafkaProperties.ZK_ROOT);
String id = PropertiesUtils.getString(KafkaProperties.STORM_ID);
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts,topic,zkRoot,id);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList(PropertiesUtil.getString(KafkaProperties.ZK_SERVERS).split(","));
spoutConfig.zkPort = PropertiesUtil.getInt(KafkaProperties.ZK_PORT);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader",new KafkaSpout(spoutConfig),1);
builder.setBolt("filter-bolt",new FilterBolt(),1).shuffleGrouping("kafka-reader");
builder.setBolt("input-line",new TransferBolt(),1).shuffleGrouping("reader-input");
Config config = new Config();
String name = KafkaTopology.class.getSimpleName();
config.setNumWorkers(PropertiesUtil.getInt(KafkaProperties.NUM_WORKERS));
StormSubmitter.submitTopologyWithProgressBar(name,config,builder.createTopology());
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: