您的位置:首页 > 编程语言

kafka集群配置启动及编程

2016-07-03 21:27 323 查看
kafka集群配置启动及编程
1.      说明

本文描述在同一台机器上配置3个zookeeper节点,3个kafka节点的方法。本文描述的集群搭建方式,基于windows系统,其他操作系统类似。

 

2.      准备

到apache官网上下载kafka的压缩包,我下载的是:kafka_2.10-0.9.0.1.tgz。解压压缩包,解压文件夹复制两份。三个文件夹分别命名为:kafka_2.10-0.9.0.1,kafka_2.10-0.9.0.1-2,kafka_2.10-0.9.0.1-3。

 

3.      zookeeper配置及启动

3.1   节点配置

3.1.1 节点1配置
编辑kafka_2.10-0.9.0.1/config/ zookeeper.properties文件,添加以下内容:

tickTime=2000

dataDir=./data/zookeeper/

clientPort=2181

initLimit=5

syncLimit=2

server.1=127.0.0.1:2886:3886

server.2=127.0.0.1:2889:3889

server.3=127.0.0.1:2887:3887

 

创建kafka_2.10-0.9.0.1/bin/windows/data/zookeeper/myid文件,添加以下内容:

1  #zookeeper 节点编号

 
3.1.2 节点2配置
编辑kafka_2.10-0.9.0.1-2/config/ zookeeper.properties文件,添加以下内容:

tickTime=2000

dataDir=./data/zookeeper/

clientPort=2182

initLimit=5

syncLimit=2

server.1=127.0.0.1:2886:3886

server.2=127.0.0.1:2889:3889

server.3=127.0.0.1:2887:3887

 

创建kafka_2.10-0.9.0.1-2/bin/windows/data/zookeeper/myid文件,添加以下内容:

2

 

3.1.3 节点3配置
编辑kafka_2.10-0.9.0.1-3/config/ zookeeper.properties文件,添加以下内容:

tickTime=2000

dataDir=./data/zookeeper/

clientPort=2183

initLimit=5

syncLimit=2

server.1=127.0.0.1:2886:3886

server.2=127.0.0.1:2889:3889

server.3=127.0.0.1:2887:3887

 

创建kafka_2.10-0.9.0.1-3/bin/windows/data/zookeeper/myid文件,添加以下内容:

3

 

注意端口号的配置,3个节点运行在同台机器之上,估计端口号必须区别开来。

 

3.2   节点启动

3.2.1 节点1启动
进入目录kafka_2.10-0.9.0.1/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
 
3.2.2 节点2启动
进入目录kafka_2.10-0.9.0.1-2/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
 
3.2.3 节点3启动
进入目录kafka_2.10-0.9.0.1-3/bin/windows/
运行:zookeeper-server-start.bat ../../config/zookeeper.properties
 
注:节点启动有错误打印,此乃正常信息,因为配置zookeeper为集群模式,而起来的节点有先后顺序,第一个启动节点无法与其他节点通信,故而报错,等3个节点都起来后,选举出leader,稳定后,各节点打印正常。
 
4.      kafka配置及启动

4.1   节点配置  

4.1.1 节点1配置
编辑kafka_2.10-0.9.0.1/config/ server.properties文件,设置或添加以下内容:

broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
 
4.1.2 节点2配置
编辑kafka_2.10-0.9.0.1-2/config/ server.properties文件,设置或添加以下内容:

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
 
4.1.3 节点3配置
编辑kafka_2.10-0.9.0.1-3/config/ server.properties文件,设置或添加以下内容:

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=F:\\work\\scala\\kafka_2.10-0.9.0.1\\log# 根据实际情况而定
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
broker.list=localhost:9092,localhost:9093,localhost:9094
 
注意各个id及端口!!!
 
4.2   节点启动

4.2.1 节点1启动
进入目录kafka_2.10-0.9.0.1/bin/windows/
运行:kafka-server-start.bat../../config/server.properties
 
4.2.2 节点2启动
进入目录kafka_2.10-0.9.0.1-2/bin/windows/
运行:kafka-server-start.bat../../config/server.properties
 
4.2.3 节点3启动
进入目录kafka_2.10-0.9.0.1-3/bin/windows/
运行:kafka7-server-start.bat../../config/server.properties
 
至此,3个zookeeper节点以及3个kafka节点启动成功…..
 
5.      Kafka命令行测试
5.1   创建topic

kafka-topics.bat--create --zookeeper localhost:2181 --replication-factor 3 --partitions 1--topic test-replicated-topic
 
5.2   查topic

kafka-topics.bat--describe --zookeeper localhost:2181 --topic test-replicated-topic
## 输出::
Topic:my-replicated-topic        PartitionCount:1      ReplicationFactor:3 Configs:
         Topic: my-replicated-topic       Partition: 0       Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
 
5.3   开一个cmd窗口模拟生产者

kafka-console-producer.bat--broker-list localhost:9092,localhost:9093,localhost:9094 --topictest-replicated-topic
 
5.4   开一个cmd窗口模拟消费者

kafka-console-consumer.bat--zookeeper localhost:2181 --from-beginning --topic test-replicated-topic
 
此时,生成者窗口等待用户输入,用户输入数据之后,在消费者窗口就能看到。
 
6.      Kafka编程

以maven来管理测试程序的构建,在pom文件中添加以下依赖:
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.1</version>
    </dependency>
----------------------------------------------分割线---------------------------------------------- 
-----------------------------------------------------------------------------------------------------
## 代码部分参考网友源码!!!

消费者程序:
packagecom.migu.tsg;
importjava.util.Properties;
 
importkafka.javaapi.producer.Producer;
importkafka.producer.KeyedMessage;
importkafka.producer.ProducerConfig;
 
public
class
KafkaProducer extends Thread {
    private
final
Producer<Integer, String> producer;
    private
final
String topic;
    private
final
Properties props =
new
Properties();
 
    publicKafkaProducer(Stringtopic) {
        props.put("serializer.class","kafka.serializer.StringEncoder");
        props.put("metadata.broker.list","localhost:9092,localhost:9093,localhost:9094");
        ProducerConfig config =new ProducerConfig(props);
        producer = new kafka.javaapi.producer.Producer<Integer,String>(config);
       
        this.topic =topic;
    }
 
    @Override
    public
void
run() {
        int
messageNo = 1;
        while (true) {
            String messageStr =new String("Message_" +messageNo);
            System.out.println("Send:" +messageStr);
            producer.send(new KeyedMessage<Integer, String>(topic,messageStr));
            messageNo++;
            try {
                sleep(3000);
            } catch (InterruptedExceptione) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
 
 
消费者程序:
packagecom.migu.tsg;
 
importjava.util.HashMap;
import java.util.List;
import java.util.Map;
importjava.util.Properties;
 
importkafka.consumer.ConsumerConfig;
importkafka.consumer.ConsumerIterator;
importkafka.consumer.KafkaStream;
importkafka.javaapi.consumer.ConsumerConnector;
 
 
public
class
KafkaConsumer extends Thread {
    private
final
ConsumerConnector consumer;
    private
final
String topic;
 
    publicKafkaConsumer(Stringtopic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        this.topic =topic;
    }
 
    private
static
ConsumerConfig createConsumerConfig() {
        Properties props =
new
Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id","KafkaProperties.groupId"); 
// 同一个groupid的线程,保证不会消费同一个分区
        props.put("zookeeper.session.timeout.ms","40000");
        props.put("zookeeper.sync.time.ms","200");
        props.put("auto.commit.interval.ms","1000");
        return
new
ConsumerConfig(props);
    }
 
    @Override
    public
void
run() {
        Map<String, Integer> topicCountMap =new HashMap<String, Integer>();
        topicCountMap.put(topic,new Integer(1));
// 描述读某个topic,需要几个线程读
        Map<String, List<KafkaStream<byte[],byte[]>>>
consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[],byte[]>
stream = consumerMap.get(topic).get(0);//
一个线程对应一个stream
        ConsumerIterator<byte[],byte[]>
it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("receive:" +new
String(it.next().message()));
            try {
                sleep(3000);
            } catch (InterruptedExceptione) {
                e.printStackTrace();
            }
        }
    }
}
 
 
配置文件:
packagecom.migu.tsg;
 
public
interface
KafkaProperties {
final
static
String zkConnect ="localhost:2181,localhost:2182,localhost:2183";
    final
static
String groupId ="group1";
    final
static
String topic ="topic1";
    final
static
String kafkaServerURL ="localhost";
    final
static int kafkaServerPort
= 9092;
    final
static int kafkaProducerBufferSize
= 64 * 1024;
    final
static int connectionTimeOut
= 20000;
    final
static int reconnectInterval
= 10000;
    final
static
String topic2 ="topic2";
    final
static
String topic3 ="topic3";
    //final static String clientId = "SimpleConsumerDemoClient";
}
 
启动文件:
packagecom.migu.tsg;
 
/**
 * @authorleicuibourne_cui@163.com
 */
public
class
KafkaConsumerProducerDemo
{
    public
static void
main(String[]
args)
    {
        KafkaProducer producerThread =new KafkaProducer(KafkaProperties.topic);
        producerThread.start();
 
        KafkaConsumer consumerThread =new KafkaConsumer(KafkaProperties.topic);
        consumerThread.start();
    }
}
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka zookeeper 集群