storm1.0.1从kafka中取值
2016-12-19 09:45
309 查看
maven引用:
storm拓扑主程序:
spout:
<properties> <stormversion>1.0.1</stormversion> <kafkaversion>0.9.0.1</kafkaversion> </properties>
<!-- storm引用 --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${stormversion}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${stormversion}</version> </dependency> <!-- kafka引用 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafkaversion}</version> </dependency>
storm拓扑主程序:
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { LOG.info("LogsKafkaTopo begin start,init config."); // 加载spring SpringContainer.getSpringContext().start(); //zk IP和端口 BrokerHosts brokerHosts = new ZkHosts(DataCenterConfig.getBrokerZkStr()); //kafka topic名称、zookeeper、spoutId SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, DataCenterConfig.getPvuvTopic(), DataCenterConfig.getZkRoot(), "logsScheme"); //启动之后不重新取kafka的数据 spoutConfig.ignoreZkOffsets = false; spoutConfig.scheme = new SchemeAsMultiScheme((Scheme) new LogsScheme()); //从最后开始取 spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("logsScheme", new KafkaSpout(spoutConfig)); builder.setBolt("handleValidateLogsBolt", new HandleValidateLogsBolt(), 4) .shuffleGrouping("logsScheme"); builder.setBolt("handleCorrect", new HandleQueueLogsBolt(), 4) .fieldsGrouping("handleValidateLogsBolt", new Fields("correctlogs")); Config conf = new Config(); conf.setNumWorkers(1); conf.setNumAckers(0); //conf.setDebug(true); conf.setDebug(false); if (args != null && args.length > 0) { // 服务器运行 LOG.info("LogsKafkaTopo submit topology by server."); StormSubmitter.submitTopology(DataCenterConfig.getTopoName(), conf, builder.createTopology()); }else{ // 本地运行 LOG.info("LogsKafkaTopo submit topology by local cluster."); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(DataCenterConfig.getTopoName(), conf, builder.createTopology()); } }
public class DataCenterConfig{ /** ES连接ip地址**/ private static String esIp; /** ES 端口 **/ private static String esPort; /** pvuv订阅主题 **/ private static String pvuvTopic; /** ZK root节点**/ private static String zkRoot; /** ZK 地址端口 **/ private static String brokerZkStr; /** storm topo名字 **/ private static String topoName;
}
spout:
public class LogsScheme implements Scheme { /** 日志 **/ private static final Logger LOGGER = LoggerFactory.getLogger(LogsScheme.class); public List<Object> deserialize(ByteBuffer ser) { String msg = getStringByByteBuffer(ser); return new Values(msg); } public Fields getOutputFields() { return new Fields("pvanduvlogs"); } /** * * ByteBuffer转换成字符串 * * @param buffer * @return */ private String getStringByByteBuffer(ByteBuffer buffer) { Charset charset = null; CharsetDecoder decoder = null; CharBuffer charBuffer = null; try { charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); return charBuffer.toString(); } catch (Exception e) { LoggerUtils.error(e, LOGGER, "ByteBuffer转换成字符串失败,buffer={0}", buffer); return ""; } } }
相关文章推荐
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- storm1.0.1从kafka中取值
- 大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程
- Kafka+Storm+HBase项目Demo(7)--Trident使用