大数据学习笔记-------------------(17_2)
2017-04-22 00:00
281 查看
17.2 简单消费者实例
已经创建一个producer给Kafka集群发送信息。现在需要创建一个consumer来接收来自集群中的信息。KafkaConsumer API习惯用语接收来自Kafka集群的信息。KafkaConsumer类结构定义如下:publicKafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
config:返回消费者配置信息的map
KafkaConsumer类的重要方法列表如下:
![](https://static.oschina.net/uploads/img/201704/22170410_QfC1.jpg)
17.2.1 consumerRecord API
ConsumerRecordAPI用于从Kafka集群接收记录。API由topic名称、分区号(从中接收记录)和指向Kafka分区中的记录的偏移量组成。ConsumerRecord类用于创建具有特定topic名称,分区计数、<key,value>对的使用者记录。它有以下签名:publicConsumerRecord(string topic,int partition, long offset,K key, V value)
topic:从Kafka集群接收消费者记录的主题名称
partition:topic的分区数
key:记录的key,如果没有key存在将返回null
value:记录内容
ConsumerRecords API充当消费者记录的容器。API用于保存特定主题每个分区的ConsumerRecord列表。它的构造器定义如下:
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<ConsumerRecord<K,V>>>records)
TopicPartition:返回特定topic分区map
Records:返回ConsumerRecord列表
ConsumerRecord类的定义方法列表:
![](https://static.oschina.net/uploads/img/201704/22170410_TfwG.jpg)
17.2.3 配置设置
Consumer客户端API主要配置如下所示:![](https://static.oschina.net/uploads/img/201704/22170411_9qqN.jpg)
17.2.4 简单消费者应用程序
首先,启动ZooKeeper和Kafka代理。然后使用名为SimpleConsumer.java的Java类创建一个SimpleConsumer应用程序,并键入以下代码:import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0) { System.out.println("Enter topic name"); return; } String topicName = args[0].toString(); Properties props = new Properties(); //Kafka consumer configuration settings props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topicName)); //KafkaConsumer subscribes list of topics here. System.out.println("Subscribed to topic " + topicName); //print the topic name int i = 0; while (true) { Apache Kafka 33 ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); // print the offset,key and value for the consumer records. } } }
编译:应用程序用下面的命令进行编译。
javac-cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
执行:用下面的命令进行执行。
java -cp“/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
输入:打开生产者CLI发送一些信息到topic。可以输入相同如“Hello Consumer”
输出:
![](https://static.oschina.net/uploads/img/201704/22170411_kQQR.jpg)
相关文章推荐
- IOS 学习笔记17 iPhone开发之使用NSUserDefaults存储数据
- 大数据学习笔记-------------------(17_2)
- Cocos2d-X 学习笔记 17 Cocos2dx Jason数据解析
- Androidx学习笔记(17)-- 数据存储之XML
- 用data.DataReader读取股价数据并分析--python学习笔记17
- 面向对象的程序设计学习笔记-17-静态成员数据
- 大数据学习笔记-------------------(17_1)
- 大数据学习笔记-------------------(17_3)
- JavaWeb学习笔记-mybatis-17-数据模型分析
- 大数据学习笔记-------------------(17_1)
- 大数据学习笔记-------------------(17_3)
- android菜鸟学习笔记17----Android数据存储(一)文件读写
- C++学习笔记17,构造函数体内初始化数据成员与构造函数初始化器的区别(一)
- ASP.NET 3.5核心编程学习笔记(17):基于数据源的数据绑定
- java学习笔记(二) ----基本数据类型应用
- MyGeneration学习笔记(8) :dOOdad提供的数据绑定、特殊函数和事务处理
- .net学习笔记数据绑定20060524
- Java开发学习笔记之三:HTTP客户请求数据格式
- MyGeneration学习笔记(8) :dOOdad提供的数据绑定、特殊函数和事务处理
- Chap 4 学习笔记-使用C#存储变量数据