kafka的集群安装及配置文件说明
2016-05-27 11:29
399 查看
--------------------------------------------------------前提--------------------------------1、安装好 zookeeper集群2、安装好 java jdk目的:配置三台机器的kafka集群名字机器节点名字:master,worker1,worker2--------------------------------------------------------目录---------------------------------kafka的安装kafka的配置kafka的连接实验zookeeper客户端下查看运行信息其他配置说明----------------------------------------------------------------------------------------1、下载kafka软件 https://kafka.apache.org/downloads.html 选择自己的版本下面是我选择的版本2、下载解压到自己想要的文件下,我kafka地址:/usr/local/kafka3、配置文件(server.properties)master的配置文件如下:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 #每个broker id ############################# Socket Server Settings ############################# listeners=PLAINTEXT://:9092 # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces host.name=master # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). advertised.host.name=master # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400#cache的大小,存储這么多就开始发送 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400#1m # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600#请求信息的最大数,不能超过java堆栈的大小 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/usr/local/kafka/logs#可<span style="font-family: Arial, Helvetica, sans-serif;">多个,中间</span><span style="font-family: Arial, Helvetica, sans-serif;">用逗号分开,新建的topic存储的时候是看那个更少就存那个</span> # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=2 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168#保存7天 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824#默认消息不可以超出的大小 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824#消息持久化文件的最大化大小。超过会新起一个 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000#多长时间检查一次连接情况 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,worker1:2181,worker2:2181#zookeeper的接口 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000worker的配置文件如下:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=1 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://:9092 # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces host.name=worker1 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). advertised.host.name=worker1 # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/usr/local/kafka/logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=2 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,worker1:2181,worker2:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000worker2配置文件如下:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=2 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://:9092 # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces host.name=worker2 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). advertised.host.name=worker2 # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/usr/local/kafka/logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=2 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,worker1:2181,worker2:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=60004、实验集群(用官方quickstart:https://kafka.apache.org/documentation.html#quickstart)step1:启动zookeeper集群step2:在每个节点上启动kafka(./kafka-server-start.sh -deamon ../config/server.properties)或者:
bin/kafka-server-start.sh config/server-1.properties &
step3:创建自己的topic,我的topic名字为:mytopic
命令: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic mytopicstep4:查看topic情况bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopicstep5:来看看通信情况worker1运行(光标一直动):
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic mytopicmaster运行:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
此时,在怕producer端:master处上面输入一些字符发现,consumer端同步了数据,那么说明kafka集群已经完事5、zookeeper端查看运行信息step1:启动zookeeper的客户端(./zkCli.sh -server 127.0.0.1:2181)step2:查看zookeeper的下的信息step3:查看broker的信息现在再来看看其他说明:logs目录:server.log:运行日志state-change.log:zookeeper的leader切换日志controller.log :控制机器的logkafkaServer-gc.log :kafka运行信息consumer.properties
# Zookeeper connection string# comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"zookeeper.connect=127.0.0.1:2181#无效配置,系统采用的是sever.properties的配置# timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000#连接时间#consumer group idgroup.id=test-consumer-group#groud id 对应一个consumer组#consumer timeout#consumer.timeout.ms=5000producer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...metadata.broker.list=localhost:9092 #无效配置,系统采用的是sever.properties的配置# name of the partitioner class for partitioning events; default partition spreads data randomly#partitioner.class=# specifies whether the messages are sent asynchronously (async) or synchronously (sync)producer.type=sync #采用同步发送形式# specify the compression codec for all data generated: none, gzip, snappy, lz4.# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectivelycompression.codec=none #是否使用压缩# message encoderserializer.class=kafka.serializer.DefaultEncoder#有字节序列化等方式# allow topic level compression#compressed.topics=############################# Async Producer ############################## maximum time, in milliseconds, for buffering data on the producer queue#queue.buffering.max.ms=# the maximum size of the blocking queue for buffering on the producer#queue.buffering.max.messages=# Timeout for event enqueue:# 0: events will be enqueued immediately or dropped if the queue is full# -ve: enqueue will block indefinitely if the queue is full# +ve: enqueue will block up to this many milliseconds if the queue is full#queue.enqueue.timeout.ms=# the number of messages batched at the producer#batch.num.messages=更多信息解释访问:https://kafka.apache.org/documentation.html#configuration-----------------------------------------------end-----------------------------
相关文章推荐
- 找小水王
- du df 空间被占用,空间未释放
- 对xml的操作-java版
- 浅谈URL最后带斜杠对SEO优化的影响
- C++中new和malloc的区别
- java中获取浏览器信息
- 临时2
- 【数据库】如何调试sql语句、存储过程——PLSQL、VS
- Java 多线程以及区别
- 多进程:信号量的监听与处理函数
- Python入门:类及对象浅析
- new/delete 和malloc/free 的区别一般汇总
- ZOJ 2229 Ride to School
- NodeJS、NPM安装配置步骤(windows版本)
- gluPerspective右手坐标系中透视投影剖析-线性中都要除以-z 和 zn、zf是距离
- AFNetworking3.0
- android 事件派发流程详解
- windows更新npm
- vector利用swap()函数进行内存的释放
- angularjs +boostrap后台模板实现原理