您的位置:首页 > 其它

Kafka简单入门程序

2016-07-21 14:43 274 查看
基本都是从官网那学来的....抄?


官网 : http://kafka.apache.org/documentation.html#quickstart

一.kafka安装测试


首先下载最新版的kafka http://kafka.apache.org/downloads.html 

[假装这是截图.....⊙﹏⊙b汗]

  Binary downloads:

Scala 2.10  -
kafka_2.10-0.10.0.0.tgz (asc,

md5)
Scala 2.11  -
kafka_2.11-0.10.0.0.tgz (asc,

md5)

然后解压 tar -zxf kafka<version>.tgz

根据官网的 quickstart 那几个step 启动kafka服务:

一行一个单独的命令行窗口

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

现在命令行测试:
创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
生产消息(单独窗口):

二. Kafka Hello World Program Of Java
1.自己新建Java工程
2.导入前面解压的kafka目录下libs(哪个包是啥啥啥不要问我,我没做jar的筛选,自己去筛)
3.新建KafkaProducer(生产消息)
抄袭:  http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.htmlpublic static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for(int i = 10; i < 30; i++)
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
producer.close();
System.out.println("End");
}
运行后会在"消费消息"窗口看到打印出来的消息.
4.新建KafkaConsumer(消费消息)
抄袭:http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
public static void main(String arg[]){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 1000);
props.put("session.timeout.ms", 30000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println();
}
}
}
运行后,在"生产消息"窗口输入消息,程序中会打印出消息(消费消息)

结束语:好了,到这里,Kafka Hello World 入门成功?
哈哈哈哈......我也是菜鸟,有什么说得不好的地方,请多指教!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka