(三)storm-kafka源码走读之如何构建一个KafkaSpout
2014-11-08 14:06
513 查看
上一节介绍了config的相关信息,这一节说下,这些参数分别是什么,在zookeeper中的存放路径是怎样的,之前QQ群里有很多不知道该怎么传入正确的参数来new 一个kafkaSpout,其主要还是参数传递正确就可。
看SpoutConfig的构造函数
需要一个BrokerHosts,看代码:
其中主要StringParser做解析的,看俺如何解析的就知道了
好了,这里就说到这了。
刚才说到brokerZKStr需要,还有一个参数就是zkpath,这个可以自己定,也有个默认值 “/brokers”
SpoutConfig还有个zkroot,这个zkroot其实就是Consumer端消费的信息存放地方,好了给个例子:
By default, the offsets will be stored in the same Zookeeper cluster that Storm uses. You can override
this via your spout config like this:
这里就成功建了一个KafkaSpout,如果项目运行成功的话,
可以到zk master上看下相关信息,
而对于StaticHosts来说,看官方解释:
This is an alternative implementation where broker -> partition information is static. In order to construct an instance of this class you need to first construct an instance of GlobalPartitionInformation.
个人认为是需要开发人员,自己知道partition与broker之间的对应关系,正确关联起来。而storm-kafka 0.9.0.1的版本是,不需要指定,我只需要传入zkServer list,partition总数,由kafkautil利用两个for(遍历所有broker和partition)循环,临时生成Consumer去连接消费一下试试,如果有数据,那么就把partitionId和brokerHost关系存到Map中去。可想而知0.9.3-rc1为什么要改成这样了。如果该broker没有该partition信息,后果会怎样???笔者没有测试过,有测试过的请留言,说一下情况。
看SpoutConfig的构造函数
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id; }
需要一个BrokerHosts,看代码:
public class ZkHosts implements BrokerHosts { private static final String DEFAULT_ZK_PATH = "/brokers"; public String brokerZkStr = null; public String brokerZkPath = null; // e.g., /kafka/brokers public int refreshFreqSecs = 60; public ZkHosts(String brokerZkStr, String brokerZkPath) { this.brokerZkStr = brokerZkStr; this.brokerZkPath = brokerZkPath; } public ZkHosts(String brokerZkStr) { this(brokerZkStr, DEFAULT_ZK_PATH); } }需要brokerZKStr,这个其实就是hosts列表,多个host以逗号隔开,因为zookeeper解析string时是以逗号分隔的,这里附上zookeeper的解析代码
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
其中主要StringParser做解析的,看俺如何解析的就知道了
public ConnectStringParser(String connectString) { // parse out chroot, if any int off = connectString.indexOf('/'); if (off >= 0) { String chrootPath = connectString.substring(off); // ignore "/" chroot spec, same as null if (chrootPath.length() == 1) { this.chrootPath = null; } else { PathUtils.validatePath(chrootPath); this.chrootPath = chrootPath; } connectString = connectString.substring(0, off); } else { this.chrootPath = null; } String hostsList[] = connectString.split(","); for (String host : hostsList) { int port = DEFAULT_PORT; int pidx = host.lastIndexOf(':'); if (pidx >= 0) { // otherwise : is at the end of the string, ignore if (pidx < host.length() - 1) { port = Integer.parseInt(host.substring(pidx + 1)); } host = host.substring(0, pidx); } serverAddresses.add(InetSocketAddress.createUnresolved(host, port)); } }
好了,这里就说到这了。
刚才说到brokerZKStr需要,还有一个参数就是zkpath,这个可以自己定,也有个默认值 “/brokers”
SpoutConfig还有个zkroot,这个zkroot其实就是Consumer端消费的信息存放地方,好了给个例子:
String topic = “test”; // String zkRoot = “/kafkastorm”; // String spoutId = “id”; //读取的status会被存在,/kafkastorm/id下面,所以id类似consumer group BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181"); // 这里使用默认的/brokers SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 下一节介绍 scheme /*spoutConfig.zkServers = new ArrayList<String>(){{ // 只有在local模式下需要记录读取状态时,才需要设置 add("10.118.136.107"); }}; spoutConfig.zkPort = 2181;*/
spoutConfig.forceFromStart = true; spoutConfig.startOffsetTime = -1;//从最新的开始消费 spoutConfig.metricsTimeBucketSizeInSecs = 6; builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), 1);
By default, the offsets will be stored in the same Zookeeper cluster that Storm uses. You can override
this via your spout config like this:
spoutConfig.zkServers = ImmutableList.of("otherserver.com"); spoutConfig.zkPort = 2191;
这里就成功建了一个KafkaSpout,如果项目运行成功的话,
可以到zk master上看下相关信息,
./bin/zkCli.sh -server 10.1.110.24:2181
而对于StaticHosts来说,看官方解释:
StaticHosts
This is an alternative implementation where broker -> partition information is static. In order to construct an instance of this class you need to first construct an instance of GlobalPartitionInformation.Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0 partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1 partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo);
个人认为是需要开发人员,自己知道partition与broker之间的对应关系,正确关联起来。而storm-kafka 0.9.0.1的版本是,不需要指定,我只需要传入zkServer list,partition总数,由kafkautil利用两个for(遍历所有broker和partition)循环,临时生成Consumer去连接消费一下试试,如果有数据,那么就把partitionId和brokerHost关系存到Map中去。可想而知0.9.3-rc1为什么要改成这样了。如果该broker没有该partition信息,后果会怎样???笔者没有测试过,有测试过的请留言,说一下情况。
Reference
http://www.cnblogs.com/fxjwind/p/3808346.html相关文章推荐
- (三)storm-kafka源代码走读之怎样构建一个KafkaSpout
- (五)storm-kafka源码走读之KafkaSpout
- storm-kafka源码走读之KafkaSpout
- (四)storm-kafka源码走读之自定义Scheme
- (六)storm-kafka源码走读之PartitionManager
- (一)storm-kafka源码走读之前言
- (二)storm-kafka源码走读之Config相关类走读
- 如何构建一个 LFS 系统
- 介绍一个关于如何使用vs2005构建三层系统的教程
- 菜鸟如何构建一个入门级***检测系统
- 如何构建一个带有自己标签的Eclipse?
- 如何构建一个名字成员在类外可以被修改而在内部是只读的对象
- [qtp]如何构建一个QTP测试框架
- 如何构建一个可扩展的流媒体平台?
- 发布一个小小的源码 StormRCPack
- 求助:如何用路由器构建一个小型的家庭局域网
- 理解Web框架,和如何构建一个CSS框架
- LGame(Android及J2SE游戏引擎)入门示例——如何构建一个游戏
- 如何构建一个ERP系统(需求分析、系统架构、系统设计、系统编码、测试、交付程序及文文件)。
- 如何获得另外一个应用程序中的控件句柄。 - 日志 - tomore - 问友源码空间