大数据学习笔记-------------------(17_3)
2016-10-14 10:27
218 查看
17.3 消费者群例子
消费群是多线程或多机器接收KafkaTopic。17.3.1 消费者群
Ø 消费者可以通过使用相同的“group.id”来加入组。Ø 组的最大并行数目是组中消费者数<=分区数。
Ø Kafka将Topic分区分配给组中的消费者,以便每个分区都由组中的一个消费者使用。
Ø Kafka保证消息只能被组中的一个消费者读取。
Ø 消费者可以按照消息存储在日志中的顺序查看消息。
17.3.2消费者重现平衡
添加更多进程/线程将导致Kafka重新平衡。如果任何消费者或broker无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。在重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程:import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2) { System.out.println("Usage: consumer <topic> <groupname>"); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
编译:应用程序用下面的命令进行编译。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” ConsumerGroup.java
执行:用下面的命令进行执行。
java -cp“/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup<topic-name> my-group java -cp"/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.ConsumerGroup <topic-name> my-group
输入:打开生产者CLI,发送像下面的信息
Test consumer group 01 Test consumer group 02
第一个进程输出:
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumergroup 01
第二个进程输出:
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumergroup 02
相关文章推荐
- IOS 学习笔记17 iPhone开发之使用NSUserDefaults存储数据
- 大数据学习笔记-------------------(17_2)
- Cocos2d-X 学习笔记 17 Cocos2dx Jason数据解析
- Androidx学习笔记(17)-- 数据存储之XML
- 大数据学习笔记-------------------(17_1)
- 面向对象的程序设计学习笔记-17-静态成员数据
- 大数据学习笔记-------------------(17_1)
- 大数据学习笔记-------------------(17_2)
- JavaWeb学习笔记-mybatis-17-数据模型分析
- 大数据学习笔记-------------------(17_3)
- 用data.DataReader读取股价数据并分析--python学习笔记17
- android菜鸟学习笔记17----Android数据存储(一)文件读写
- C++学习笔记17,构造函数体内初始化数据成员与构造函数初始化器的区别(一)
- ASP.NET 3.5核心编程学习笔记(17):基于数据源的数据绑定
- java学习笔记(二) ----基本数据类型应用
- MyGeneration学习笔记(8) :dOOdad提供的数据绑定、特殊函数和事务处理
- .net学习笔记数据绑定20060524
- Java开发学习笔记之三:HTTP客户请求数据格式
- MyGeneration学习笔记(8) :dOOdad提供的数据绑定、特殊函数和事务处理
- Chap 4 学习笔记-使用C#存储变量数据