kafka实践
2016-03-08 10:59
183 查看
生产者、消费者、重写分区接口
public class KafkaStatleConsumer
{
private final ConsumerConnector consumer;
private final String topic = "topictest";
private final String zk = "xxx:2181";
private final int numThreads = 3;
public static void main(String[] args)
{
KafkaStatleConsumer consumer = new KafkaStatleConsumer();
consumer.run();
}
public Properties getConsumerProps()
{
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zk);
return consumerProps;
}
public KafkaStatleConsumer()
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
}
private ConsumerConfig createConsumerConfig()
{
// 不同主题的不同groupID的consumerProps,需要不同设置,不能用同一个单例consumerProps
Properties props = getConsumerProps();
props.put("group.id", "testConsumer");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
public void run()
{
Map<String, Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, numThreads);
Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
List<KafkaStream<byte[], byte[]>> streams = streamMap.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
/**
* 多线程消费分区
*/
for (final KafkaStream<byte[], byte[]> stream : streams)
{
Runnable runnable = new Runnable()
{
public void run()
{
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
{
MessageAndMetadata<byte[], byte[]> mam = it.next();
System.out.println(Thread.currentThread().getName()
+ ": partition[" + mam.partition() + "],"
+ "offset[" + mam.offset() + "], "
+ new String(mam.message()));
consumer.commitOffsets();
}
}
};
executor.submit(runnable);
}
}
}
public class KafkaStatleProducer
{
private final String server = "xxxx:9092";
private final static String topicName = "topictest";
private Properties props = new Properties();
private Producer<String, String> producer;
public static Properties getProductProps()
{
Properties producterProps;
producterProps = new Properties();
producterProps.setProperty("metadata.broker.list", "xxx:9092");
return producterProps;
}
public static void main(String[] args)
{
Properties props = getProductProps();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.domain.api.kafka.PartitionerExt");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
try
{
int i = 0;
while (i < 100)
{
String msg = "sdfsdfsdf: " + i;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
topicName, i + "", msg);
producer.send(data);
System.out.println(data);
data = null;
i++;
}
} catch (Exception e)
{
e.printStackTrace();
}
producer.close();
}
}
public class PartitionerExt extends DefaultPartitioner
{
public PartitionerExt(VerifiableProperties props)
{
super(props);
}
public int partition(Object arg0, int arg1)
{
int result = Math.abs(arg0.hashCode()) % arg1;
System.out.println("current DefaultPartitioner :" + result);
return result;
}
}
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic BALANCEREMINDERTOPICNAME
kafka-topics.bat --describe --zookeeper localhost:2181 --topic BALANCEREMINDERTOPICNAME
kafka-console-producer.bat --broker-list localhost:9092 --topic BALANCEREMINDERTOPICNAME
kafka-console-consumer.bat --zookeeper localhost:2181 --from-beginning --topic BALANCEREMINDERTOPICNAME
kafka-topics.bat --delete --zookeeper localhost:2181 --topic BALANCEREMINDERTOPICNAME
public class KafkaStatleConsumer
{
private final ConsumerConnector consumer;
private final String topic = "topictest";
private final String zk = "xxx:2181";
private final int numThreads = 3;
public static void main(String[] args)
{
KafkaStatleConsumer consumer = new KafkaStatleConsumer();
consumer.run();
}
public Properties getConsumerProps()
{
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zk);
return consumerProps;
}
public KafkaStatleConsumer()
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
}
private ConsumerConfig createConsumerConfig()
{
// 不同主题的不同groupID的consumerProps,需要不同设置,不能用同一个单例consumerProps
Properties props = getConsumerProps();
props.put("group.id", "testConsumer");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
public void run()
{
Map<String, Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, numThreads);
Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
List<KafkaStream<byte[], byte[]>> streams = streamMap.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
/**
* 多线程消费分区
*/
for (final KafkaStream<byte[], byte[]> stream : streams)
{
Runnable runnable = new Runnable()
{
public void run()
{
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
{
MessageAndMetadata<byte[], byte[]> mam = it.next();
System.out.println(Thread.currentThread().getName()
+ ": partition[" + mam.partition() + "],"
+ "offset[" + mam.offset() + "], "
+ new String(mam.message()));
consumer.commitOffsets();
}
}
};
executor.submit(runnable);
}
}
}
public class KafkaStatleProducer
{
private final String server = "xxxx:9092";
private final static String topicName = "topictest";
private Properties props = new Properties();
private Producer<String, String> producer;
public static Properties getProductProps()
{
Properties producterProps;
producterProps = new Properties();
producterProps.setProperty("metadata.broker.list", "xxx:9092");
return producterProps;
}
public static void main(String[] args)
{
Properties props = getProductProps();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.domain.api.kafka.PartitionerExt");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
try
{
int i = 0;
while (i < 100)
{
String msg = "sdfsdfsdf: " + i;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
topicName, i + "", msg);
producer.send(data);
System.out.println(data);
data = null;
i++;
}
} catch (Exception e)
{
e.printStackTrace();
}
producer.close();
}
}
public class PartitionerExt extends DefaultPartitioner
{
public PartitionerExt(VerifiableProperties props)
{
super(props);
}
public int partition(Object arg0, int arg1)
{
int result = Math.abs(arg0.hashCode()) % arg1;
System.out.println("current DefaultPartitioner :" + result);
return result;
}
}
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic BALANCEREMINDERTOPICNAME
kafka-topics.bat --describe --zookeeper localhost:2181 --topic BALANCEREMINDERTOPICNAME
kafka-console-producer.bat --broker-list localhost:9092 --topic BALANCEREMINDERTOPICNAME
kafka-console-consumer.bat --zookeeper localhost:2181 --from-beginning --topic BALANCEREMINDERTOPICNAME
kafka-topics.bat --delete --zookeeper localhost:2181 --topic BALANCEREMINDERTOPICNAME
相关文章推荐
- 纯CSS修改浏览器的默认滚动条样式
- CSS 属性 - 伪类和伪元素
- SocketServer model_use
- Codeforces Round #345 (Div. 2) E. Table Compression 并查集
- JavaScript笔记
- Ambiguous column name
- ClamAV源码配置安装测试
- Leetcode:102. Binary Tree Level Order Traversal(JAVA)
- MapGIS6.7_学习中遇到的问题(2):去除飞点
- Android中使用Vectors(2)绘制优美的路径动画
- linux中more与less的区别
- php学习笔记之:环境配置(一)
- RHEL4 安装yum
- wordpress发布文章404
- CocoaPods详解之----使用篇
- asp.net(C#)页面事件顺序
- Android安全–ELF文件格式解析
- GPU Powered DeepLearning with NVIDIA DIGITS on EC2
- 利用KissXML解析xml数据,并对XML节点属性进行修改
- SAP模块常用增强总结