您的位置:首页 > 大数据

大数据学习笔记-------------------(17_2)

2017-04-22 00:00 281 查看

17.2 简单消费者实例

已经创建一个producer给Kafka集群发送信息。现在需要创建一个consumer来接收来自集群中的信息。KafkaConsumer API习惯用语接收来自Kafka集群的信息。KafkaConsumer类结构定义如下:

publicKafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

config:返回消费者配置信息的map

KafkaConsumer类的重要方法列表如下:



17.2.1 consumerRecord API

ConsumerRecordAPI用于从Kafka集群接收记录。API由topic名称、分区号(从中接收记录)和指向Kafka分区中的记录的偏移量组成。ConsumerRecord类用于创建具有特定topic名称,分区计数、<key,value>对的使用者记录。它有以下签名:

publicConsumerRecord(string topic,int partition, long offset,K key, V value)

topic:从Kafka集群接收消费者记录的主题名称

partition:topic的分区数

key:记录的key,如果没有key存在将返回null

value:记录内容

ConsumerRecords API充当消费者记录的容器。API用于保存特定主题每个分区的ConsumerRecord列表。它的构造器定义如下:

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<ConsumerRecord<K,V>>>records)

TopicPartition:返回特定topic分区map

Records:返回ConsumerRecord列表

ConsumerRecord类的定义方法列表:



17.2.3 配置设置

Consumer客户端API主要配置如下所示:



17.2.4 简单消费者应用程序

首先,启动ZooKeeper和Kafka代理。然后使用名为SimpleConsumer.java的Java类创建一个SimpleConsumer应用程序,并键入以下代码:

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0)
{
System.out.println("Enter topic name");
return;
}
String topicName = args[0].toString();
Properties props = new Properties();
//Kafka consumer configuration settings
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
//KafkaConsumer subscribes list of topics here.
System.out.println("Subscribed to topic " + topicName); //print the topic name
int i = 0;
while (true) {
Apache Kafka
33
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
// print the offset,key and value for the consumer records.
}
}
}


编译:应用程序用下面的命令进行编译。

javac-cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行:用下面的命令进行执行。

java -cp“/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

输入:打开生产者CLI发送一些信息到topic。可以输入相同如“Hello Consumer”

输出:



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: