您的位置:首页 > 其它

Kafka-2.11学习笔记(一)简单介绍

2015-08-05 21:19 309 查看
鲁春利的工作笔记,谁说程序员不能有文艺范?

下载地址:http://kafka.apache.org/downloads.html
当前最新版本:Scala 2.11 - kafka_2.11-0.8.2.1.tgz
wget http://apache.fayea.com/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz[/code] 
Kafka是一个高性能的分布式消息队列系统,它最初由 LinkedIn 公司开发,使用 Scala语言编写,之后成为 Apache 项目的一部分。

基本术语:
kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。
Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition。
本质上kafka只支持Topic,每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer,发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。

安装kafka
# 解压
tar -zxv -f kafka_2.11-0.8.2.1.tgz
mv kafka_2.11-0.8.2.1 kafka0.8.2.1


配置环境变量
vim ~/.bash_profile

KAFKA_HOME=/usr/local/kafka0.8.2.1
PATH=$KAFKA_HOME/bin:$PATH

export KAFKA_HOME
export PATH


Kafka依赖于Zookeeper,需要提前配置Zookeeper集群
ZooKeeper集群配置参照:/article/4522457.html

或参考官网:http://kafka.apache.org/documentation.html#quickstart

修改配置文件(config/zookeeper.properties)
vim zookeeper.properties

# 修改dataDir
dataDir=/usr/local/zookeeper3.4.6/data


修改配置文件(config/server.properties)
vim server.properties

# 修改如下参数
# 每个broker的id必须是唯一的,这里设置成主机IP的最后一组值
broker.id=117
# 日志文件的存放目录
log.dirs=/usr/local/kafka0.8.2.1/logs
# 使用zookeeper集群
zookeeper.connect=nnode:2181,dnode1:2181,dnode2:2181
具体参数配置参考:http://kafka.apache.org/documentation.html#configuration

启动ZK server
# 切换到hadoop用户,在三台机器上依次执行命令
zkServer.sh start
说明:这里使用的不是kafka自带的zookeeper实现,如果按照自带的zookeeper启动的话:
[hadoop@nnode kafka0.8.2.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2015-11-13 15:53:05,890] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-11-13 15:53:05,903] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-11-13 15:53:05,903] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-11-13 15:53:05,904] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-11-13 15:53:05,904] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2015-11-13 15:53:05,970] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-11-13 15:53:05,972] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2015-11-13 15:53:05,999] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer)


启动Kafka server
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-server-start.sh config/server.properties
[2015-11-12 23:25:56,994] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,069] INFO Property broker.id is overridden to 117 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,069] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,069] INFO Property log.dirs is overridden to /usr/local/kafka0.8.2.1/logs (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,069] INFO Property log.retention.check.interval.ms is overridden to 300000 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,070] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,070] INFO Property log.segment.bytes is overridden to 1073741824 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,070] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,071] INFO Property num.network.threads is overridden to 3 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,071] INFO Property num.partitions is overridden to 1 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,071] INFO Property num.recovery.threads.per.data.dir is overridden to 1 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,071] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,071] INFO Property socket.receive.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,072] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2015-11-12 23:25:57,072] INFO Property socket.send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)

# 中间略

[2015-11-12 23:25:58,316] INFO 117 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-11-12 23:25:58,480] INFO Registered broker 117 at path /brokers/ids/117 with address nnode:9092. (kafka.utils.ZkUtils$)
[2015-11-12 23:25:58,557] INFO [Kafka Server 117], started (kafka.server.KafkaServer)
[2015-11-12 23:25:58,723] INFO New leader is 117 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)


创建话题(Topic)
拥有单个分区和一个副本的话题(Topic)。

[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --create --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --replication-factor 1 --partitions 1 --topic mytopic
Created topic "mytopic".
查看刚才创建的话题

[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
mytopic
test

[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --list --zookeeper nnode:2181,dnode1:2181,dnode2:2181
mytopic
test
[hadoop@nnode kafka0.8.2.1]$


观察Server端的输出
[2015-11-12 23:55:08,212] INFO Completed load of log mytopic-0 with log end offset 0 (kafka.log.Log)
[2015-11-12 23:55:08,215] INFO Created log for partition [mytopic,0] in /usr/local/kafka0.8.2.1/logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> delete, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000, segment.jitter.ms -> 0}. (kafka.log.LogManager)
[2015-11-12 23:55:08,219] WARN Partition [mytopic,0] on broker 117: No checkpointed highwatermark is found for partition [mytopic,0] (kafka.cluster.Partition)


发送一些消息
Kafka命令行通过标准输入或文件接收一些数据,并将它发送到Kafka cluster。默认情况下,每一行作为一个单独的消息发送出去。

# 查看帮助
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-console-producer.sh

#
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-console-producer.sh --topic mytopic --broker-list localhost:9092
hello world
[2015-11-13 00:07:39,294] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
this is my first message
在使用时提示:WARN Property topic is not valid (kafka.utils.VerifiableProperties)

启动消费者读取消息
# 查看帮助
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-console-consumer.sh

#
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-console-consumer.sh --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --topic mytopic --from-beginning
hello world
this is my first message
^CConsumed 2 messages
[hadoop@nnode kafka0.8.2.1]$


查看主题详情
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
Topic: test     Partition: 0    Leader: 117     Replicas: 117   Isr: 117
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --describe --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --topic test
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
Topic: test     Partition: 0    Leader: 117     Replicas: 117   Isr: 117


删除主题
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --delete --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --list --zookeeper nnode:2181,dnode1:2181,dnode2:2181
mytopic
test - marked for deletion
[hadoop@nnode kafka0.8.2.1]$
说明:kafka-topics.sh直接参数为--list, --describe, --create, --alter or --delete

本文出自 “闷葫芦的世界” 博客,请务必保留此出处http://luchunli.blog.51cto.com/2368057/1682051
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: