您的位置:首页 > 其它

storm1.0.1从kafka中取值

2016-12-19 09:45 316 查看
maven引用:

<properties>
<stormversion>1.0.1</stormversion>
<kafkaversion>0.9.0.1</kafkaversion>
</properties>

<!-- storm引用 -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${stormversion}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${stormversion}</version>
</dependency>
<!-- kafka引用 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafkaversion}</version>
</dependency>


storm拓扑主程序:

public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {

LOG.info("LogsKafkaTopo begin start,init config.");
// 加载spring
SpringContainer.getSpringContext().start();
//zk IP和端口
BrokerHosts brokerHosts = new ZkHosts(DataCenterConfig.getBrokerZkStr());
//kafka topic名称、zookeeper、spoutId
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, DataCenterConfig.getPvuvTopic(),
DataCenterConfig.getZkRoot(), "logsScheme");
//启动之后不重新取kafka的数据
spoutConfig.ignoreZkOffsets = false;
spoutConfig.scheme = new SchemeAsMultiScheme((Scheme) new LogsScheme());
//从最后开始取
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("logsScheme", new KafkaSpout(spoutConfig));
builder.setBolt("handleValidateLogsBolt", new HandleValidateLogsBolt(), 4)
.shuffleGrouping("logsScheme");
builder.setBolt("handleCorrect", new HandleQueueLogsBolt(), 4)
.fieldsGrouping("handleValidateLogsBolt", new Fields("correctlogs"));
Config conf = new Config();
conf.setNumWorkers(1);
conf.setNumAckers(0);
//conf.setDebug(true);
conf.setDebug(false);
if (args != null && args.length > 0) {
// 服务器运行
LOG.info("LogsKafkaTopo submit topology by server.");
StormSubmitter.submitTopology(DataCenterConfig.getTopoName(), conf, builder.createTopology());
}else{
// 本地运行
LOG.info("LogsKafkaTopo submit topology by local cluster.");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(DataCenterConfig.getTopoName(), conf,
builder.createTopology());
}
}

public class DataCenterConfig{

/** ES连接ip地址**/
private static String esIp;

/** ES 端口 **/
private static String esPort;

/** pvuv订阅主题 **/
private static String pvuvTopic;

/** ZK root节点**/
private static String zkRoot;

/** ZK 地址端口 **/
private static String brokerZkStr;

/** storm topo名字 **/
private static String topoName;

}


spout:

public class LogsScheme implements Scheme {
/** 日志 **/
private static final Logger LOGGER = LoggerFactory.getLogger(LogsScheme.class);

public List<Object> deserialize(ByteBuffer ser) {
String msg = getStringByByteBuffer(ser);
return new Values(msg);
}

public Fields getOutputFields() {
return new Fields("pvanduvlogs");
}
/**
*
* ByteBuffer转换成字符串
*
* @param buffer
* @return
*/
private String getStringByByteBuffer(ByteBuffer buffer) {
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;
try {
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
return charBuffer.toString();
} catch (Exception e) {
LoggerUtils.error(e, LOGGER, "ByteBuffer转换成字符串失败,buffer={0}", buffer);
return "";
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: