kafka集群配置启动及编程
2016-07-03 21:27
323 查看
kafka集群配置启动及编程
1. 说明
本文描述在同一台机器上配置3个zookeeper节点,3个kafka节点的方法。本文描述的集群搭建方式,基于windows系统,其他操作系统类似。
2. 准备
到apache官网上下载kafka的压缩包,我下载的是:kafka_2.10-0.9.0.1.tgz。解压压缩包,解压文件夹复制两份。三个文件夹分别命名为:kafka_2.10-0.9.0.1,kafka_2.10-0.9.0.1-2,kafka_2.10-0.9.0.1-3。
3. zookeeper配置及启动
3.1 节点配置
3.1.1 节点1配置
编辑kafka_2.10-0.9.0.1/config/ zookeeper.properties文件,添加以下内容:
tickTime=2000
dataDir=./data/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=127.0.0.1:2886:3886
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2887:3887
创建kafka_2.10-0.9.0.1/bin/windows/data/zookeeper/myid文件,添加以下内容:
1 #zookeeper 节点编号
3.1.2 节点2配置
编辑kafka_2.10-0.9.0.1-2/config/ zookeeper.properties文件,添加以下内容:
tickTime=2000
dataDir=./data/zookeeper/
clientPort=2182
initLimit=5
syncLimit=2
server.1=127.0.0.1:2886:3886
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2887:3887
创建kafka_2.10-0.9.0.1-2/bin/windows/data/zookeeper/myid文件,添加以下内容:
2
3.1.3 节点3配置
编辑kafka_2.10-0.9.0.1-3/config/ zookeeper.properties文件,添加以下内容:
tickTime=2000
dataDir=./data/zookeeper/
clientPort=2183
initLimit=5
syncLimit=2
server.1=127.0.0.1:2886:3886
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2887:3887
创建kafka_2.10-0.9.0.1-3/bin/windows/data/zookeeper/myid文件,添加以下内容:
3
注意端口号的配置,3个节点运行在同台机器之上,估计端口号必须区别开来。
3.2 节点启动
3.2.1 节点1启动
进入目录kafka_2.10-0.9.0.1/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
3.2.2 节点2启动
进入目录kafka_2.10-0.9.0.1-2/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
3.2.3 节点3启动
进入目录kafka_2.10-0.9.0.1-3/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
注:节点启动有错误打印,此乃正常信息,因为配置zookeeper为集群模式,而起来的节点有先后顺序,第一个启动节点无法与其他节点通信,故而报错,等3个节点都起来后,选举出leader,稳定后,各节点打印正常。
4. kafka配置及启动
4.1 节点配置
4.1.1 节点1配置
编辑kafka_2.10-0.9.0.1/config/ server.properties文件,设置或添加以下内容:
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
4.1.2 节点2配置
编辑kafka_2.10-0.9.0.1-2/config/ server.properties文件,设置或添加以下内容:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
4.1.3 节点3配置
编辑kafka_2.10-0.9.0.1-3/config/ server.properties文件,设置或添加以下内容:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
注意各个id及端口!!!
4.2 节点启动
4.2.1 节点1启动
进入目录kafka_2.10-0.9.0.1/bin/windows/
运行:kafka-server-start.bat../../config/server.properties
4.2.2 节点2启动
进入目录kafka_2.10-0.9.0.1-2/bin/windows/
运行:kafka-server-start.bat../../config/server.properties
4.2.3 节点3启动
进入目录kafka_2.10-0.9.0.1-3/bin/windows/
运行:kafka7-server-start.bat../../config/server.properties
至此,3个zookeeper节点以及3个kafka节点启动成功…..
5. Kafka命令行测试
5.1 创建topic
kafka-topics.bat--create --zookeeper localhost:2181 --replication-factor 3 --partitions 1--topic test-replicated-topic
5.2 查topic
kafka-topics.bat--describe --zookeeper localhost:2181 --topic test-replicated-topic
## 输出::
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
5.3 开一个cmd窗口模拟生产者
kafka-console-producer.bat--broker-list localhost:9092,localhost:9093,localhost:9094 --topictest-replicated-topic
5.4 开一个cmd窗口模拟消费者
kafka-console-consumer.bat--zookeeper localhost:2181 --from-beginning --topic test-replicated-topic
此时,生成者窗口等待用户输入,用户输入数据之后,在消费者窗口就能看到。
6. Kafka编程
以maven来管理测试程序的构建,在pom文件中添加以下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
</dependency>
----------------------------------------------分割线----------------------------------------------
-----------------------------------------------------------------------------------------------------
## 代码部分参考网友源码!!!
消费者程序:
packagecom.migu.tsg;
importjava.util.Properties;
importkafka.javaapi.producer.Producer;
importkafka.producer.KeyedMessage;
importkafka.producer.ProducerConfig;
public
class KafkaProducer extends Thread {
private
final Producer<Integer, String> producer;
private
final String topic;
private
final Properties props =
new Properties();
publicKafkaProducer(Stringtopic) {
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("metadata.broker.list","localhost:9092,localhost:9093,localhost:9094");
ProducerConfig config =new ProducerConfig(props);
producer = new kafka.javaapi.producer.Producer<Integer,String>(config);
this.topic =topic;
}
@Override
public
void run() {
int
messageNo = 1;
while (true) {
String messageStr =new String("Message_" +messageNo);
System.out.println("Send:" +messageStr);
producer.send(new KeyedMessage<Integer, String>(topic,messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedExceptione) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
消费者程序:
packagecom.migu.tsg;
importjava.util.HashMap;
import java.util.List;
import java.util.Map;
importjava.util.Properties;
importkafka.consumer.ConsumerConfig;
importkafka.consumer.ConsumerIterator;
importkafka.consumer.KafkaStream;
importkafka.javaapi.consumer.ConsumerConnector;
public
class KafkaConsumer extends Thread {
private
final ConsumerConnector consumer;
private
final String topic;
publicKafkaConsumer(Stringtopic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic =topic;
}
private
static ConsumerConfig createConsumerConfig() {
Properties props =
new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id","KafkaProperties.groupId");
// 同一个groupid的线程,保证不会消费同一个分区
props.put("zookeeper.session.timeout.ms","40000");
props.put("zookeeper.sync.time.ms","200");
props.put("auto.commit.interval.ms","1000");
return
new ConsumerConfig(props);
}
@Override
public
void run() {
Map<String, Integer> topicCountMap =new HashMap<String, Integer>();
topicCountMap.put(topic,new Integer(1));
// 描述读某个topic,需要几个线程读
Map<String, List<KafkaStream<byte[],byte[]>>>
consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]>
stream = consumerMap.get(topic).get(0);//
一个线程对应一个stream
ConsumerIterator<byte[],byte[]>
it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive:" +new
String(it.next().message()));
try {
sleep(3000);
} catch (InterruptedExceptione) {
e.printStackTrace();
}
}
}
}
配置文件:
packagecom.migu.tsg;
public
interface KafkaProperties {
final
static String zkConnect ="localhost:2181,localhost:2182,localhost:2183";
final
static String groupId ="group1";
final
static String topic ="topic1";
final
static String kafkaServerURL ="localhost";
final
static int kafkaServerPort = 9092;
final
static int kafkaProducerBufferSize = 64 * 1024;
final
static int connectionTimeOut = 20000;
final
static int reconnectInterval = 10000;
final
static String topic2 ="topic2";
final
static String topic3 ="topic3";
//final static String clientId = "SimpleConsumerDemoClient";
}
启动文件:
packagecom.migu.tsg;
/**
* @authorleicuibourne_cui@163.com
*/
public
class KafkaConsumerProducerDemo
{
public
static void main(String[]
args)
{
KafkaProducer producerThread =new KafkaProducer(KafkaProperties.topic);
producerThread.start();
KafkaConsumer consumerThread =new KafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}
1. 说明
本文描述在同一台机器上配置3个zookeeper节点,3个kafka节点的方法。本文描述的集群搭建方式,基于windows系统,其他操作系统类似。
2. 准备
到apache官网上下载kafka的压缩包,我下载的是:kafka_2.10-0.9.0.1.tgz。解压压缩包,解压文件夹复制两份。三个文件夹分别命名为:kafka_2.10-0.9.0.1,kafka_2.10-0.9.0.1-2,kafka_2.10-0.9.0.1-3。
3. zookeeper配置及启动
3.1 节点配置
3.1.1 节点1配置
编辑kafka_2.10-0.9.0.1/config/ zookeeper.properties文件,添加以下内容:
tickTime=2000
dataDir=./data/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
server.1=127.0.0.1:2886:3886
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2887:3887
创建kafka_2.10-0.9.0.1/bin/windows/data/zookeeper/myid文件,添加以下内容:
1 #zookeeper 节点编号
3.1.2 节点2配置
编辑kafka_2.10-0.9.0.1-2/config/ zookeeper.properties文件,添加以下内容:
tickTime=2000
dataDir=./data/zookeeper/
clientPort=2182
initLimit=5
syncLimit=2
server.1=127.0.0.1:2886:3886
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2887:3887
创建kafka_2.10-0.9.0.1-2/bin/windows/data/zookeeper/myid文件,添加以下内容:
2
3.1.3 节点3配置
编辑kafka_2.10-0.9.0.1-3/config/ zookeeper.properties文件,添加以下内容:
tickTime=2000
dataDir=./data/zookeeper/
clientPort=2183
initLimit=5
syncLimit=2
server.1=127.0.0.1:2886:3886
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2887:3887
创建kafka_2.10-0.9.0.1-3/bin/windows/data/zookeeper/myid文件,添加以下内容:
3
注意端口号的配置,3个节点运行在同台机器之上,估计端口号必须区别开来。
3.2 节点启动
3.2.1 节点1启动
进入目录kafka_2.10-0.9.0.1/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
3.2.2 节点2启动
进入目录kafka_2.10-0.9.0.1-2/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
3.2.3 节点3启动
进入目录kafka_2.10-0.9.0.1-3/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
注:节点启动有错误打印,此乃正常信息,因为配置zookeeper为集群模式,而起来的节点有先后顺序,第一个启动节点无法与其他节点通信,故而报错,等3个节点都起来后,选举出leader,稳定后,各节点打印正常。
4. kafka配置及启动
4.1 节点配置
4.1.1 节点1配置
编辑kafka_2.10-0.9.0.1/config/ server.properties文件,设置或添加以下内容:
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
4.1.2 节点2配置
编辑kafka_2.10-0.9.0.1-2/config/ server.properties文件,设置或添加以下内容:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
4.1.3 节点3配置
编辑kafka_2.10-0.9.0.1-3/config/ server.properties文件,设置或添加以下内容:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
注意各个id及端口!!!
4.2 节点启动
4.2.1 节点1启动
进入目录kafka_2.10-0.9.0.1/bin/windows/
运行:kafka-server-start.bat../../config/server.properties
4.2.2 节点2启动
进入目录kafka_2.10-0.9.0.1-2/bin/windows/
运行:kafka-server-start.bat../../config/server.properties
4.2.3 节点3启动
进入目录kafka_2.10-0.9.0.1-3/bin/windows/
运行:kafka7-server-start.bat../../config/server.properties
至此,3个zookeeper节点以及3个kafka节点启动成功…..
5. Kafka命令行测试
5.1 创建topic
kafka-topics.bat--create --zookeeper localhost:2181 --replication-factor 3 --partitions 1--topic test-replicated-topic
5.2 查topic
kafka-topics.bat--describe --zookeeper localhost:2181 --topic test-replicated-topic
## 输出::
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
5.3 开一个cmd窗口模拟生产者
kafka-console-producer.bat--broker-list localhost:9092,localhost:9093,localhost:9094 --topictest-replicated-topic
5.4 开一个cmd窗口模拟消费者
kafka-console-consumer.bat--zookeeper localhost:2181 --from-beginning --topic test-replicated-topic
此时,生成者窗口等待用户输入,用户输入数据之后,在消费者窗口就能看到。
6. Kafka编程
以maven来管理测试程序的构建,在pom文件中添加以下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
</dependency>
----------------------------------------------分割线----------------------------------------------
-----------------------------------------------------------------------------------------------------
## 代码部分参考网友源码!!!
消费者程序:
packagecom.migu.tsg;
importjava.util.Properties;
importkafka.javaapi.producer.Producer;
importkafka.producer.KeyedMessage;
importkafka.producer.ProducerConfig;
public
class KafkaProducer extends Thread {
private
final Producer<Integer, String> producer;
private
final String topic;
private
final Properties props =
new Properties();
publicKafkaProducer(Stringtopic) {
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("metadata.broker.list","localhost:9092,localhost:9093,localhost:9094");
ProducerConfig config =new ProducerConfig(props);
producer = new kafka.javaapi.producer.Producer<Integer,String>(config);
this.topic =topic;
}
@Override
public
void run() {
int
messageNo = 1;
while (true) {
String messageStr =new String("Message_" +messageNo);
System.out.println("Send:" +messageStr);
producer.send(new KeyedMessage<Integer, String>(topic,messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedExceptione) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
消费者程序:
packagecom.migu.tsg;
importjava.util.HashMap;
import java.util.List;
import java.util.Map;
importjava.util.Properties;
importkafka.consumer.ConsumerConfig;
importkafka.consumer.ConsumerIterator;
importkafka.consumer.KafkaStream;
importkafka.javaapi.consumer.ConsumerConnector;
public
class KafkaConsumer extends Thread {
private
final ConsumerConnector consumer;
private
final String topic;
publicKafkaConsumer(Stringtopic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic =topic;
}
private
static ConsumerConfig createConsumerConfig() {
Properties props =
new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id","KafkaProperties.groupId");
// 同一个groupid的线程,保证不会消费同一个分区
props.put("zookeeper.session.timeout.ms","40000");
props.put("zookeeper.sync.time.ms","200");
props.put("auto.commit.interval.ms","1000");
return
new ConsumerConfig(props);
}
@Override
public
void run() {
Map<String, Integer> topicCountMap =new HashMap<String, Integer>();
topicCountMap.put(topic,new Integer(1));
// 描述读某个topic,需要几个线程读
Map<String, List<KafkaStream<byte[],byte[]>>>
consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]>
stream = consumerMap.get(topic).get(0);//
一个线程对应一个stream
ConsumerIterator<byte[],byte[]>
it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive:" +new
String(it.next().message()));
try {
sleep(3000);
} catch (InterruptedExceptione) {
e.printStackTrace();
}
}
}
}
配置文件:
packagecom.migu.tsg;
public
interface KafkaProperties {
final
static String zkConnect ="localhost:2181,localhost:2182,localhost:2183";
final
static String groupId ="group1";
final
static String topic ="topic1";
final
static String kafkaServerURL ="localhost";
final
static int kafkaServerPort = 9092;
final
static int kafkaProducerBufferSize = 64 * 1024;
final
static int connectionTimeOut = 20000;
final
static int reconnectInterval = 10000;
final
static String topic2 ="topic2";
final
static String topic3 ="topic3";
//final static String clientId = "SimpleConsumerDemoClient";
}
启动文件:
packagecom.migu.tsg;
/**
* @authorleicuibourne_cui@163.com
*/
public
class KafkaConsumerProducerDemo
{
public
static void main(String[]
args)
{
KafkaProducer producerThread =new KafkaProducer(KafkaProperties.topic);
producerThread.start();
KafkaConsumer consumerThread =new KafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}
相关文章推荐
- Kafka 之 中级
- RedHat 5.8 安装Oracle 11gR2_Grid集群
- mysql集群之MMM简单搭建
- MySQL的集群配置的基本命令使用及一次操作过程实录
- MySQL slave_net_timeout参数解决的一个集群问题案例
- Redis 集群搭建和简单使用教程
- Windows Server 2003 下配置 MySQL 集群(Cluster)教程
- tomcat6_apache2.2_ajp 负载均衡加集群实战分享
- Shell脚本实现自动安装zookeeper
- 基于Zookeeper的使用详解
- Linux下Kafka单机安装配置方法(图文)
- 用apache和tomcat搭建集群(负载均衡)
- Red Hat Linux,Apache2.0+Weblogic9.2负载均衡集群安装配置
- Hadoop单机版和全分布式(集群)安装
- java结合HADOOP集群文件上传下载
- Kafka使用入门教程第1/2页
- Spring3.2.0和Quartz1.8.6集群配置
- (Weblogic Portal 9.2.3集群)Oracle数据库初始化报PF_MARKUP...
- mesos + marathon + docker部署