您的位置:首页 > 其它

kafka实践

2016-03-08 10:59 183 查看
生产者、消费者、重写分区接口

public class KafkaStatleConsumer

{

private final ConsumerConnector consumer;

private final String topic = "topictest";

private final String zk = "xxx:2181";

private final int numThreads = 3;

public static void main(String[] args)

{

KafkaStatleConsumer consumer = new KafkaStatleConsumer();

consumer.run();

}

public Properties getConsumerProps()

{

Properties consumerProps = new Properties();

consumerProps.put("zookeeper.connect", zk);

return consumerProps;

}

public KafkaStatleConsumer()

{

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

}

private ConsumerConfig createConsumerConfig()

{

// 不同主题的不同groupID的consumerProps,需要不同设置,不能用同一个单例consumerProps

Properties props = getConsumerProps();

props.put("group.id", "testConsumer");

props.put("zookeeper.session.timeout.ms", "10000");

return new ConsumerConfig(props);

}

public void run()

{

Map<String, Integer> topickMap = new HashMap<String, Integer>();

topickMap.put(topic, numThreads);

Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);

List<KafkaStream<byte[], byte[]>> streams = streamMap.get(topic);

ExecutorService executor = Executors.newFixedThreadPool(numThreads);

/**

* 多线程消费分区

*/

for (final KafkaStream<byte[], byte[]> stream : streams)

{

Runnable runnable = new Runnable()

{

public void run()

{

ConsumerIterator<byte[], byte[]> it = stream.iterator();

while (it.hasNext())

{

MessageAndMetadata<byte[], byte[]> mam = it.next();

System.out.println(Thread.currentThread().getName()

+ ": partition[" + mam.partition() + "],"

+ "offset[" + mam.offset() + "], "

+ new String(mam.message()));

consumer.commitOffsets();

}

}

};

executor.submit(runnable);

}

}

}

public class KafkaStatleProducer

{

private final String server = "xxxx:9092";

private final static String topicName = "topictest";

private Properties props = new Properties();

private Producer<String, String> producer;

public static Properties getProductProps()

{

Properties producterProps;

producterProps = new Properties();

producterProps.setProperty("metadata.broker.list", "xxx:9092");

return producterProps;

}

public static void main(String[] args)

{

Properties props = getProductProps();

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("partitioner.class", "com.domain.api.kafka.PartitionerExt");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

try

{

int i = 0;

while (i < 100)

{

String msg = "sdfsdfsdf: " + i;

KeyedMessage<String, String> data = new KeyedMessage<String, String>(

topicName, i + "", msg);

producer.send(data);

System.out.println(data);

data = null;

i++;

}

} catch (Exception e)

{

e.printStackTrace();

}

producer.close();

}

}

public class PartitionerExt extends DefaultPartitioner

{

public PartitionerExt(VerifiableProperties props)

{

super(props);

}

public int partition(Object arg0, int arg1)

{

int result = Math.abs(arg0.hashCode()) % arg1;

System.out.println("current DefaultPartitioner :" + result);

return result;

}

}

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic BALANCEREMINDERTOPICNAME

kafka-topics.bat --describe --zookeeper localhost:2181 --topic BALANCEREMINDERTOPICNAME

kafka-console-producer.bat --broker-list localhost:9092 --topic BALANCEREMINDERTOPICNAME

kafka-console-consumer.bat --zookeeper localhost:2181 --from-beginning --topic BALANCEREMINDERTOPICNAME

kafka-topics.bat --delete --zookeeper localhost:2181 --topic BALANCEREMINDERTOPICNAME
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: