您的位置:首页 > 编程语言 > ASP

Storm-kafka集成——1.1.0版本storm中tuple取KafkaSpout数据详解

2018-04-02 23:07 519 查看
问题描述:
KafkaSpout拉取kafka topic数据,下一级bolt从kafkaspout获取数据,tuple到底采用什么方法取出spout中的消息呢?
KafkaSpout创建:/*
*根据数据源topic和zk_id创建并返回kafkaSpout
* */
public static KafkaSpout init(String spout_topic,String zk_id){
KafkaSpoutConfig<String,String> kafkaSpoutConfig = KafkaSpoutConfig
.builder(bootstrap_address,spout_topic)
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.setProp(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000)
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000)
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName())
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName())
.setOffsetCommitPeriodMs(10000)//控制spout多久向kafka提交一次offset
.setGroupId(zk_id)
.setMaxUncommittedOffsets(250)
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.build();
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
return kafkaSpout;
}bolt逻辑处理:
public void execute(Tuple tuple, BasicOutputCollector collector) {
this.collector = collector;
System.out.println("MC开始读取数据");
String message = tuple.getStringByField("value");

//038,2018-03-21 14:51:47,17134906630,2018-03-21 14:53:03,V0330700,E02,036,null,E020005,01,V0350100
String strs[] = message.split(",", -1);
String userProv = strs[0];//归属省份编码
String occurtime = strs[1];//最新信令产生时间
String deviceNumber = strs[2];//电话号码
String eventArea = strs[4]; //信令发生地市
String eventProv = strs[6]; //信令发生省份
String userArea = strs[10];//归属地市编码

//过滤漫入用户,只要漫出用户
if(userProv.equals("038") && !eventProv.equals("038")){
String provDescGuishu = null;
String areaDescGuishu = null;
String provDescOccur = null;
String areaDescOccur = null;
String typeGuishu = "省内";
String community = null;
String mrmcType = "漫出";
String roamType = "国内";
String longitude = null;
String latitude = null;

String areaInfoGuishu = JedisUtil.get("zbarea|" + userArea);//用户归属地市新信息
if(areaInfoGuishu != null && !areaInfoGuishu.equals("")){
String strs1[] = areaInfoGuishu.split(":");
provDescGuishu = strs1[0];
areaDescGuishu = strs1[1];
}

String areaInfo_occur = JedisUtil.get("zbarea|" + eventArea);
if(areaInfo_occur != null && !areaInfo_occur.equals("")){
String strs2[] = areaInfo_occur.split(":");
provDescOccur = strs2[0];
areaDescOccur = strs2[1];
}

//格式化目标数据:
//电话|信令发生时间|用户归属省份描述|用户归属地市描述|用户归属类型|用户到访省份描述|用户到访地市描述|用户到访小区描述|漫入漫出类型描述|漫游类型|经度|纬度
String mrmc2merge = deviceNumber + "#" + occurtime + "#" + provDescGuishu + "#" + areaDescGuishu + "#" + typeGuishu + "#" + provDescOccur + "#" + areaDescOccur + "#" + community + "#" + mrmcType + "#" + roamType + "#" + longitude + "#" + latitude;
System.out.println("MC数据" + mrmc2merge);
collector.emit(new Values(mrmc2merge));
}else{
System.out.println(deviceNumber + "******被过滤漫入用户");
}
}topo结构构造TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("s1mme_spout",init(s1mme_spout_topic,zk_s1mme_id),12);
builder.setBolt("s1mme_split",new S1mmeSplitBolt(),36).shuffleGrouping("s1mme_spout");
builder.setSpout("sgs_spout",init(sgs_spout_topic,zk_sgs_id),3);
builder.setBolt("sgs_split",new SgsSpiltBolt(),6).shuffleGrouping("sgs_spout");
builder.setSpout("mrmc_spout",init(mrmc_spout_topic,zk_mrmc_id),1);
builder.setBolt("mrmc_split",new MrmcSplitBolt(),3).shuffleGrouping("mrmc_spout");首先我们来看tuple具备的属性:
tuple的几个主要方法:
tuple.getFields();//获取kafkaSpout的topic相关属性。tuple的属性列表:topic, partition, offset, key, value
tuple.getValues();//获取所有属性的值
tuple.getValue();//根据属性列表下标获取值
tuple.toString();//获取tuple的所有信息,包括数据来源,消息id,数据value等信息
tuple.getMessageId();//获取消息id
tuple.getSourceComponent();//获取数据源
tuple.getSourceStreamId();//获取本条消息的id
tuple.getSourceTask();



上述代码的输出结果对应下图:



总结:我们在取值的时候主要是根据tuple的属性列表以及对应下标索引进行取值,特别是使用getString,getLong,这类的方法时,1.X版本与0.X版本有所不同,在1.X版本必须采取上述取值方法
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: