消息队列kafka(一)--基本使用
2017-10-28 17:39
399 查看
一、kafka简介
kafka是一种高吞吐量的消息队列。
二、kafka特点
1、轻量级,比如activeMQ等消息队列更轻量级。
2、消息在kafka中,无论消息是否被消费,都不会被删除,会保留所有消息。
3、消息删除的策略,基于时间。在config/server.properties中配置。即
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
4、采用scala语言编写
三、kafka构成
1、topic
相当于队列queue
2、broker
kafka集群中的每一台机器,即称为broker。一个topic可以有多个broker。即是同个queue下的消息可以分布在多台机器上。
3、partition
一个topic可以有多个partition,每一个partition对应一个文件,每个文件内包含数据及相应索引。
4、producer
消息的生产者
5、consumer
消息的消费者
6、consumer group
消费者组,对同一个topic,同一消费组内的消费者,只有一个能收到消息。
而不同消费组的消费者,则都可以收到消息,相当于广播。
四、安装及使用
1、下载kafka安装包kafka_2.11-0.8.2.1.tgz
2、解压到某个文件夹下: tar zxvf kafka_2.11-0.8.2.1.tgz
3、安装zookeeper并启动
4、进入解压目录下,开启kafka服务: bin/kafka-server-start.sh config/server.properties
5、创建topic : bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic_1
6、查看创建的所有topic信息:
命令: bin/kafka-topics.sh --describe --zookeeper localhost:2181
或者指定topic信息: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_topic_1
也可使用命令 孙悦/kafka-topics.sh --list --zookeeper localhost:2181
7、消息生产者,发送消息到自定义的topic
命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_1
接下来便可发送消息
8、消息消费者,
命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic_1 --from-beginning
五、使用java产生消息
1、(特别注意)若将消息发送给远程服务器上,则需修改kafka安装时的配置文件kafka/config/server.properties
(在版本kafka_2.11-0.8.2.1中)将 #host.name=localhost 去掉注释并改为本机的ip地址(不能使用localhost), 如 host.name=192.168.0.107
(在版本kafka_2.12-1.0.0中)将#listeners=PLAINTEXT://:9092 去掉注释并改为本机的ip地址(不能使用localhost), 如 listeners = PLAINTEXT://192.168.0.107:9092
将#zookeeper.connect=localhost:2181 去掉注释并改为本机的ip地址(不能使用localhost), 如 zookeeper.connect=192.168.0.107:2181
在,则需添加
2、添加maven依赖
3、消息产生的代码
输出:
send msg: this is a test msgTue Oct 31 23:11:21 CST 2017
send msg: this is a test msgTue Oct 31 23:11:23 CST 2017
send msg: this is a test msgTue Oct 31 23:11:24 CST 2017
send msg: this is a test msgTue Oct 31 23:11:25 CST 2017
send msg: this is a test msgTue Oct 31 23:11:26 CST 2017
send msg: this is a test msgTue Oct 31 23:11:27 CST 2017
六、使用java消费消息
1、添加mavan依赖,同上
2、消息消费的代码
输出:
receive msg : this is a test msgTue Oct 31 23:11:21 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:23 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:24 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:25 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:26 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:27 CST 2017
kafka是一种高吞吐量的消息队列。
二、kafka特点
1、轻量级,比如activeMQ等消息队列更轻量级。
2、消息在kafka中,无论消息是否被消费,都不会被删除,会保留所有消息。
3、消息删除的策略,基于时间。在config/server.properties中配置。即
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
4、采用scala语言编写
三、kafka构成
1、topic
相当于队列queue
2、broker
kafka集群中的每一台机器,即称为broker。一个topic可以有多个broker。即是同个queue下的消息可以分布在多台机器上。
3、partition
一个topic可以有多个partition,每一个partition对应一个文件,每个文件内包含数据及相应索引。
4、producer
消息的生产者
5、consumer
消息的消费者
6、consumer group
消费者组,对同一个topic,同一消费组内的消费者,只有一个能收到消息。
而不同消费组的消费者,则都可以收到消息,相当于广播。
四、安装及使用
1、下载kafka安装包kafka_2.11-0.8.2.1.tgz
2、解压到某个文件夹下: tar zxvf kafka_2.11-0.8.2.1.tgz
3、安装zookeeper并启动
4、进入解压目录下,开启kafka服务: bin/kafka-server-start.sh config/server.properties
5、创建topic : bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic_1
6、查看创建的所有topic信息:
命令: bin/kafka-topics.sh --describe --zookeeper localhost:2181
或者指定topic信息: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_topic_1
也可使用命令 孙悦/kafka-topics.sh --list --zookeeper localhost:2181
7、消息生产者,发送消息到自定义的topic
命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_1
接下来便可发送消息
8、消息消费者,
命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic_1 --from-beginning
五、使用java产生消息
1、(特别注意)若将消息发送给远程服务器上,则需修改kafka安装时的配置文件kafka/config/server.properties
(在版本kafka_2.11-0.8.2.1中)将 #host.name=localhost 去掉注释并改为本机的ip地址(不能使用localhost), 如 host.name=192.168.0.107
(在版本kafka_2.12-1.0.0中)将#listeners=PLAINTEXT://:9092 去掉注释并改为本机的ip地址(不能使用localhost), 如 listeners = PLAINTEXT://192.168.0.107:9092
将#zookeeper.connect=localhost:2181 去掉注释并改为本机的ip地址(不能使用localhost), 如 zookeeper.connect=192.168.0.107:2181
在,则需添加
2、添加maven依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.2.1</version> </dependency>
3、消息产生的代码
public class KafkaSimpleProducerMain { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.put("metadata.broker.list", "192.168.0.107:9092"); //指定kafka服务的地址和端口号 properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); //指定key的序列化方式 properties.put("serializer.class", "kafka.serializer.StringEncoder"); //指定value的序列化方式 ProducerConfig producerConfig = new ProducerConfig(properties); Producer<String, String> producer = new Producer<>(producerConfig); String topic = "my_topic"; //指定topic while (true) { String msg = "this is a test msg" + new Date(); KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic, msg); producer.send(keyedMessage); //发送消息 System.out.println("send msg: " + msg); TimeUnit.SECONDS.sleep(1); } } }
输出:
send msg: this is a test msgTue Oct 31 23:11:21 CST 2017
send msg: this is a test msgTue Oct 31 23:11:23 CST 2017
send msg: this is a test msgTue Oct 31 23:11:24 CST 2017
send msg: this is a test msgTue Oct 31 23:11:25 CST 2017
send msg: this is a test msgTue Oct 31 23:11:26 CST 2017
send msg: this is a test msgTue Oct 31 23:11:27 CST 2017
六、使用java消费消息
1、添加mavan依赖,同上
2、消息消费的代码
public class KafkaSimpleConsumerMain { public static void main(String[] args) { Properties properties = new Properties(); properties.put("zookeeper.connect","192.168.0.107:2181"); //指定zookeeper properties.put("zookeeper.session.timeout.ms", "300000"); //设置超时时间 properties.put("serializer.class", "kafka.serializer.StringEncoder"); //指定value的序列化方式 properties.put("group.id","my_group_1"); ConsumerConfig consumerConfig = new ConsumerConfig(properties); String topic = "my_topic"; ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String,Integer> topicConutMap = new HashMap<>(); topicConutMap.put(topic,1); //指定每次取数个数 Map<String, List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicConutMap); KafkaStream<byte[],byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while(iterator.hasNext()){ String msg = new String(iterator.next().message()); System.out.println("receive msg : "+msg); } } }
输出:
receive msg : this is a test msgTue Oct 31 23:11:21 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:23 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:24 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:25 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:26 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:27 CST 2017
相关文章推荐
- 消息队列 Kafka 的基本知识及 .NET Core 客户端
- 消息队列 Kafka 的基本知识及 .NET Core 客户端
- kafka分布式消息队列 — 基本概念介绍
- 消息队列(Message Queue)基本概念和使用场景分析
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- Kafka,Mq,Redis作为消息队列使用时的差异?
- kafka分布式消息队列使用(springboot和springmvc)
- Kafka,Mq,Redis作为消息队列使用时的差异?
- kafka分布式消息队列使用(springboot和springmvc)
- 分布式消息队列kafka系列介绍 — 基本概念
- 使用kafka消息队列解决分布式事务(可靠消息最终一致性方案-本地消息服务)
- 使用 Kafka 和 Redis 作为消息队列的差异
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- 消息队列的基本使用思路和接口介绍
- Linux进程间通信(IPC)编程实践(十二)Posix消息队列--基本API的使用
- 消息队列 Kafka 的基本知识及 .NET Core 客户端
- RabbitMQ_消息队列基本使用_1
- RabbitMQ_消息队列基本使用_2
- 分布式消息队列Kafka 之 kafka简单部署及使用