您的位置:首页 > 其它

Kafka基本概念及环境搭建

2017-02-10 22:11 447 查看
kafka的深入了解可以参见文献[1-2],本文仅是简单的介绍基本概念,以及动手搭建简单的kafka练习环境。本文一些介绍摘自参考文献.

关于Kafka

Kafka是一种分布式的,基于发布/订阅的消息系统。支持高吞吐量的数据实时存储,结合合适的消费者模式(实时计算/离线计算),同时支持消息的实时/离线处理。主要设计目标如下:

以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能

高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输

支持Kafka Server间的消息分区,及分布式消费,同一个Topic可以同时供不同的Consumer Group消费,实现消息的多样化处理

同时支持离线数据处理和实时数据处理

可以在一定程度上保证消息的有序性(同一个partition下消息有序性)

每个Topic的partition可以保存多个replication(副本),增强了消息的健壮可靠性

Kafka基本概念

Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker,主要负责消息的存储

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)

Partition

parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件

Producer

负责发布消息到Kafka broker。Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。使用的更多的是第二种。

Consumer

消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

Consumer group

每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)

Offset

消息存储在kafka的broker上,Consumer每次从特定topic的特定partition下pull消息时,需要知道从什么位置开始拉取,这个位置就是offset。offset通常由consumer结合zookeeper进行维护,当然consumer也可以选择从指定的offset开始消费消息

Kafka系统框架



Kafka环境搭建

本文环境:

系统:CentOS release 5.5 (Final),32位

依靠包:jdk(本文使用了jdk1.8.0_121,32位)

kafka安装包:kafka_2.10-0.10.1.1.tgz(0.10.11版本)

1.安装jdk

//解压安装包
tar -xvf jdk-8u121-linux-i586.tar.gz
//将解压后的文件夹,移动到opt目录。
//当然,这步并非是必须的,这么做仅仅是因为/opt是常用软件的安放路径而已
mv -f jdk1.8.0_121 /opt/jdk1.8.0_121
//建立jdk1.8.0_121的软连接,方便管理
cd /opt ; ln -sf jdk1.8.0_121 jdk
//设置jdk的环境变量
在此,不建议在/etc/profile中修改,因为/etc/profile是系统的环境变量
可以修改~/.bash_profile,也就是当前登录用户的环境设置
新增如下配置:
JAVA_HOME=/opt/jdk
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
保存后,使用 source ~/.bash_profile 来使修改立即生效


2.安装kafka

假设kafka安装包位于/home目录下,并打算将其安装在/home下
tar -xvf kafka_2.10-0.10.1.1.tgz
ln -sf kafka_2.10-0.10.1.1 kafka


3.其他配置

关闭防火墙服务
service iptables stop  //停止防火墙服务
chkconfig iptables off  //系统启动时,不启动防火墙服务
修改/etc/hosts文件
// /etc/hosts是本地的一个ip地址到主机名映射的小型关系库
新增 127.0.0.1 localhost


4.启动服务

我们以默认配置启动zookeeper和kafka。
默认的配置包括,zookeeper本地的端口为2181,日志和快照数据保存于/tmp/zookeeper
kafka默认broke.id为0,服务端口为9092,默认每个topic只创建一个partition,每个partition只有一个副本,Kafka消息保存路径为/tmp/kafka-logs
先启动zookeeper服务,再启动kafka服务;结束服务的顺序相反。
[root@localhost bin]# pwd
/home/kafka/bin
[root@localhost bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
也可以使用 ./zookeeper-server-start.sh ../config/zookeeper.properties  & 进入后台执行
[root@localhost bin]# ./kafka-server-start.sh ../config/server.properties

确认zookeeper和kakfa服务正常
[root@localhost ~]# jps
4019 kafka           //kafka
3486 QuorumPeerMain  //zookeeper服务启动入口方法
4319 Jps


5.测试kafka安装

创建topic

[root@localhost bin]# ./kafka-topics.sh --zookeeper localhost:2181 --create --topic test02 --replication-factor 1 --partitions 1
Created topic "test02".
[root@localhost bin]#


列出当前kafka中的topic

[root@localhost bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list
test02


删除topic

[root@localhost bin]# ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic test02
Topic test02 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
这里Topic仅仅被标记为删除,但并没有删除,如果要开启删除功能,需要在启动kafka服务前在server.properties将delete.topic.enable置为ture


kafka生成/消费测试

保持zookeeper服务和kafka服务的连接,开启两个shell窗口

窗口1,消息生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test01
//9092 broker server服务的默认端口


窗口2,消息消费者
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test01 --from-beginning


窗口1,输入消息内容,以回车键发送;将会在窗口2中看到生产者的消息


6.启动kafka服务失败的可能原因

1.注意jdk环境的配置错误引发的 could not find main class kafka.kafka …等

2.主机名与IP地址对应关系

[2017-02-06 21:28:48,900] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)

java.net.UnknownHostException: localhost.localdomain: localhost.localdomain: Temporary failure in name resolution



可以修改/etc/hosts 中的主机名与IP地址对应关系解决

[1].Kafka深度解析.http://www.jasongj.com/2015/01/02/Kafka深度解析

[2].Kafka入门经典教程.http://www.aboutyun.com/thread-12882-1-1.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: