您的位置:首页 > 大数据

大数据学习笔记-------------------(17_3)

2016-10-14 10:27 218 查看

17.3 消费者群例子

消费群是多线程或多机器接收KafkaTopic。

17.3.1 消费者群

Ø 消费者可以通过使用相同的“group.id”来加入组。

Ø 组的最大并行数目是组中消费者数<=分区数。

Ø Kafka将Topic分区分配给组中的消费者,以便每个分区都由组中的一个消费者使用。

Ø Kafka保证消息只能被组中的一个消费者读取。

Ø 消费者可以按照消息存储在日志中的顺序查看消息。

17.3.2消费者重现平衡

添加更多进程/线程将导致Kafka重新平衡。如果任何消费者或broker无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。在重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程:

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2)
{
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}


编译:应用程序用下面的命令进行编译。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” ConsumerGroup.java


执行:用下面的命令进行执行。

java -cp“/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup<topic-name> my-group
java -cp"/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.ConsumerGroup <topic-name> my-group


输入:打开生产者CLI,发送像下面的信息

Test consumer group 01
Test consumer group 02


第一个进程输出:

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumergroup 01


第二个进程输出:

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumergroup 02
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: