您的位置:首页 > 其它

消息队列kafka(一)--基本使用

2017-10-28 17:39 399 查看
一、kafka简介

kafka是一种高吞吐量的消息队列。

二、kafka特点

1、轻量级,比如activeMQ等消息队列更轻量级。

2、消息在kafka中,无论消息是否被消费,都不会被删除,会保留所有消息。

3、消息删除的策略,基于时间。在config/server.properties中配置。即

# The minimum age of a log file to be eligible for deletion

log.retention.hours=168

4、采用scala语言编写

三、kafka构成

1、topic

相当于队列queue

2、broker

kafka集群中的每一台机器,即称为broker。一个topic可以有多个broker。即是同个queue下的消息可以分布在多台机器上。

3、partition

一个topic可以有多个partition,每一个partition对应一个文件,每个文件内包含数据及相应索引。

4、producer

消息的生产者

5、consumer

消息的消费者

6、consumer group

消费者组,对同一个topic,同一消费组内的消费者,只有一个能收到消息。

而不同消费组的消费者,则都可以收到消息,相当于广播。

四、安装及使用

1、下载kafka安装包kafka_2.11-0.8.2.1.tgz

2、解压到某个文件夹下:   tar zxvf kafka_2.11-0.8.2.1.tgz

3、安装zookeeper并启动

4、进入解压目录下,开启kafka服务:   bin/kafka-server-start.sh  config/server.properties

5、创建topic :   bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic_1

6、查看创建的所有topic信息:

命令:  bin/kafka-topics.sh --describe --zookeeper localhost:2181

或者指定topic信息:  bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic my_topic_1

也可使用命令 孙悦/kafka-topics.sh --list --zookeeper localhost:2181

7、消息生产者,发送消息到自定义的topic  

命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_1

接下来便可发送消息

8、消息消费者,

命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic_1 --from-beginning

五、使用java产生消息

1、(特别注意)若将消息发送给远程服务器上,则需修改kafka安装时的配置文件kafka/config/server.properties

(在版本kafka_2.11-0.8.2.1中)将 #host.name=localhost 去掉注释并改为本机的ip地址(不能使用localhost), 如 host.name=192.168.0.107

(在版本kafka_2.12-1.0.0中)将#listeners=PLAINTEXT://:9092 去掉注释并改为本机的ip地址(不能使用localhost), 如 listeners = PLAINTEXT://192.168.0.107:9092

将#zookeeper.connect=localhost:2181  去掉注释并改为本机的ip地址(不能使用localhost), 如 zookeeper.connect=192.168.0.107:2181

在,则需添加

2、添加maven依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.1</version>
</dependency>


3、消息产生的代码

public class KafkaSimpleProducerMain {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("metadata.broker.list", "192.168.0.107:9092");  //指定kafka服务的地址和端口号
properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); //指定key的序列化方式
properties.put("serializer.class", "kafka.serializer.StringEncoder");  //指定value的序列化方式
ProducerConfig producerConfig = new ProducerConfig(properties);

Producer<String, String> producer = new Producer<>(producerConfig);

String topic = "my_topic"; //指定topic

while (true) {
String msg = "this is a test msg" + new Date();
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic, msg);
producer.send(keyedMessage);  //发送消息
System.out.println("send msg: " + msg);

TimeUnit.SECONDS.sleep(1);
}
}
}


输出:

send msg: this is a test msgTue Oct 31 23:11:21 CST 2017

send msg: this is a test msgTue Oct 31 23:11:23 CST 2017

send msg: this is a test msgTue Oct 31 23:11:24 CST 2017

send msg: this is a test msgTue Oct 31 23:11:25 CST 2017

send msg: this is a test msgTue Oct 31 23:11:26 CST 2017

send msg: this is a test msgTue Oct 31 23:11:27 CST 2017

六、使用java消费消息

1、添加mavan依赖,同上

2、消息消费的代码

public class KafkaSimpleConsumerMain {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect","192.168.0.107:2181");  //指定zookeeper
properties.put("zookeeper.session.timeout.ms", "300000");  //设置超时时间
properties.put("serializer.class", "kafka.serializer.StringEncoder");  //指定value的序列化方式
properties.put("group.id","my_group_1");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);

String topic = "my_topic";
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);

Map<String,Integer> topicConutMap = new HashMap<>();
topicConutMap.put(topic,1);  //指定每次取数个数
Map<String, List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicConutMap);
KafkaStream<byte[],byte[]> stream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String msg = new String(iterator.next().message());
System.out.println("receive msg : "+msg);
}
}
}

输出:

receive msg : this is a test msgTue Oct 31 23:11:21 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:23 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:24 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:25 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:26 CST 2017

receive msg : this is a test msgTue Oct 31 23:11:27 CST 2017
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: