您的位置:首页 > 其它

Kafka笔记--参数说明及Demo

2015-11-03 20:15 501 查看
参考资料:http://blog.csdn.net/honglei915/article/details/37563647
参数说明http://ju.outofmemory.cn/entry/119243
参数说明/Demo:http://www.aboutyun.com/thread-9906-1-1.html

Kafka+Spark:
http://shiyanjun.cn/archives/1097.html
http://ju.outofmemory.cn/entry/84636

1. Kafka启动:
  1. 先启动所有节点的zookeeper : 进入ZOOKEEPER_HOME/bin 执行./zkServer.sh start
  2. 启动所有节点的kafka:进入 KAFKA_HOME/bin 执行 ./kafka-server-start.sh config/server.properties &

2. 参数说明

2.0 boker参数说明 (配置文件位于config/server.properties)

name默认值描述
broker.idnone每一个boker都有一个唯一的id作为它们的名字。 这就允许boker切换到别的主机/端口上, consumer依然知道
enable.zookeepertrue允许注册到zookeeper
log.flush.interval.messagesLong.MaxValue在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.msLong.MaxValue在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.msLong.MaxValue检查数据是否要写入到硬盘的时间间隔。
log.retention.hours168控制一个log保留多长个小时
log.retention.bytes-1控制log文件最大尺寸
log.cleaner.enablefalse是否log cleaning
log.cleanup.policydeletedelete还是compat. 其它控制参数还包括log.cleaner.threads,log.cleaner.io.max.bytes.per.second,
log.cleaner.dedupe.buffer.size,log.cleaner.io.buffer.size,log.cleaner.io.buffer.load.factor,
log.cleaner.backoff.ms,log.cleaner.min.cleanable.ratio,log.cleaner.delete.retention.ms
log.dir/tmp/kafka-logs指定log文件的根目录
log.segment.bytes110241024*1024单一的log segment文件大小
log.roll.hours24 * 7开始一个新的log文件片段的最大时间
message.max.bytes1000000 + MessageSet.LogOverhead一个socket 请求的最大字节数
num.network.threads3处理网络请求的线程数
num.io.threads8处理IO的线程数
background.threads10后台线程序
num.partitions1默认分区数
socket.send.buffer.bytes102400socket SO_SNDBUFF参数
socket.receive.buffer.bytes102400socket SO_RCVBUFF参数
zookeeper.connectlocalhost:2182/kafka指定zookeeper连接字符串, 格式如hostname:port/chroot。chroot是一个namespace
zookeeper.connection.timeout.ms6000指定客户端连接zookeeper的最大超时时间
zookeeper.session.timeout.ms6000连接zk的session超时时间
zookeeper.sync.time.ms2000zk follower落后于zk leader的最长时间

2.1 producer参数说明(配置文件位于config/producer.properties或者在程序内定义)


package com.test.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class LogConsumer {

private ConsumerConfig config;
private String topic;
private int partitionsNum;
private MessageExecutor executor;
private ConsumerConnector connector;
private ExecutorService threadPool;
public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
config = new ConsumerConfig(properties);
this.topic = topic;
this.partitionsNum = partitionsNum;
this.executor = executor;
}

public void start() throws Exception{
connector = Consumer.createJavaConsumerConnector(config);
Map<String,Integer> topics = new HashMap<String,Integer>();
topics.put(topic, partitionsNum);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
threadPool = Executors.newFixedThreadPool(partitionsNum);
for(KafkaStream<byte[], byte[]> partition : partitions){
threadPool.execute(new MessageRunner(partition));
}
}

public void close(){
try{
threadPool.shutdownNow();
}catch(Exception e){
//
}finally{
connector.shutdown();
}

}

class MessageRunner implements Runnable{
private KafkaStream<byte[], byte[]> partition;

MessageRunner(KafkaStream<byte[], byte[]> partition) {
this.partition = partition;
}

public void run(){
ConsumerIterator<byte[], byte[]> it = partition.iterator();
while(it.hasNext()){
MessageAndMetadata<byte[],byte[]> item = it.next();
System.out.println("partiton:" + item.partition());
System.out.println("offset:" + item.offset());
executor.execute(new String(item.message()));//UTF-8
}
}
}

interface MessageExecutor {

public void execute(String message);
}

/**
* @param args
*/
public static void main(String[] args) {
LogConsumer consumer = null;
try{
MessageExecutor executor = new MessageExecutor() {

public void execute(String message) {
System.out.println(message);

}
};
consumer = new LogConsumer("test-topic", 2, executor);
consumer.start();
}catch(Exception e){
e.printStackTrace();
}finally{
//            if(consumer != null){
//                consumer.close();
//            }
}

}

}


View Code
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: