Kafka学习(一)——0.10.2.0版本基础知识,java简单的demo
2017-03-24 22:38
615 查看
最近开始学习Kafka。简单的记录一下学习过程。
首先本地虚拟机安装centos 7。
请参考另一篇文章。(稍后补上)
安装jdk
这里下载了 jdk-7u71-linux-x64.rpm
安装后java -version已经成功,不过还是习惯性的配置了一下
JAVA_HOME=/usr/java/jdk1.7.0_71
JRE_HOME=/usr/java/jdk1.7.0_71/jre
PATH=JAVAHOME/bin:JRE_HOME/bin
CLASSPATH=.:JAVAHOME/lib/dt.jar:JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
安装kafka
wget http://mirrors.hust.edu.cn/apache/kafka/0.10.2.0/kafka-0.10.2.0-src.tgz
tar –xzvr kafka-0.10.2.0-src.tgz
查看官方Quickstart
启动kafka自带zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
创建topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
查看topic
bin/kafka-topics.sh –list –zookeeper localhost:2181
启动producer发送消息
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
This is a message
This is another message
启动consumer接收消息
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
This is a message
This is another message
官方示例完成~
Jar包常用类源码学习
生产者:
kafka.javaapi.producer.Producer较早的版本都会用这个,新版的注解
@deprecated(“This class has been deprecated and will be removed in a future release. ” +
“Please use org.apache.kafka.clients.producer.KafkaProducer instead.”, “0.10.0.0”)
去看KafkaProducer类,最终的构造方法如下:
Key和Value都需要实现org.apache.kafka.common.serialization.Serializer这个接口:
消费者:
同理,KafkaConsumer类,构造方法如下:
Key和Value都需要实现org.apache.kafka.common.serialization.Deserializer这个接口:
消息对象序列化工具类BeanUtils:
第一个java示例
添加maven依赖:
定义消息对象:
以一个支付消息对象为例
定义序列化和反序列化对象类:
生产者demo:
消费者demo:
开启消费者demo持续等待生产者发送消息。
运行生产者demo。
在消费者控制台打印出如下结果:
一个简单的单点kafka示例就完成啦~
未完待续。。。。。。
首先本地虚拟机安装centos 7。
请参考另一篇文章。(稍后补上)
安装jdk
这里下载了 jdk-7u71-linux-x64.rpm
安装后java -version已经成功,不过还是习惯性的配置了一下
JAVA_HOME=/usr/java/jdk1.7.0_71
JRE_HOME=/usr/java/jdk1.7.0_71/jre
PATH=JAVAHOME/bin:JRE_HOME/bin
CLASSPATH=.:JAVAHOME/lib/dt.jar:JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
安装kafka
wget http://mirrors.hust.edu.cn/apache/kafka/0.10.2.0/kafka-0.10.2.0-src.tgz
tar –xzvr kafka-0.10.2.0-src.tgz
查看官方Quickstart
启动kafka自带zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
创建topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
查看topic
bin/kafka-topics.sh –list –zookeeper localhost:2181
启动producer发送消息
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
This is a message
This is another message
启动consumer接收消息
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
This is a message
This is another message
官方示例完成~
Jar包常用类源码学习
生产者:
kafka.javaapi.producer.Producer较早的版本都会用这个,新版的注解
@deprecated(“This class has been deprecated and will be removed in a future release. ” +
“Please use org.apache.kafka.clients.producer.KafkaProducer instead.”, “0.10.0.0”)
去看KafkaProducer类,最终的构造方法如下:
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { ... }
Key和Value都需要实现org.apache.kafka.common.serialization.Serializer这个接口:
public abstract interface Serializer<T> extends Closeable { public abstract void configure(Map<String d11b , ?> paramMap, boolean paramBoolean); public abstract byte[] serialize(String paramString, T paramT); public abstract void close(); }
消费者:
同理,KafkaConsumer类,构造方法如下:
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer){ ... }
Key和Value都需要实现org.apache.kafka.common.serialization.Deserializer这个接口:
public abstract interface Deserializer<T> extends Closeable { public abstract void configure(Map<String, ?> paramMap, boolean paramBoolean); public abstract T deserialize(String paramString, byte[] paramArrayOfByte); public abstract void close(); }
消息对象序列化工具类BeanUtils:
public class BeanUtils { private BeanUtils(){} /** * 对象转字节数组 * @param obj * @return */ public static byte[] ObjectToBytes(Object obj){ ... } /** * 字节数组转对象 * @param bytes * @return */ public static Object BytesToObject(byte[] bytes){ ... } }
第一个java示例
添加maven依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.2.0</version> </dependency>
定义消息对象:
以一个支付消息对象为例
public class PayInfo implements java.io.Serializable{ public PayInfo(String orderNo,Long amount,String outTradeDate){ this.orderNo = orderNo; this.amount = amount; this.outTradeDate = outTradeDate; } private static final long serialVersionUID = 6235710215970026320L; private String orderNo;//订单号 private Long amount;//金额 private String outTradeDate;//交易日 public String getOrderNo() { return orderNo; } public void setOrderNo(String orderNo) { this.orderNo = orderNo; } public Long getAmount() { return amount; } public void setAmount(Long amount) { this.amount = amount; } public String getOutTradeDate() { return outTradeDate; } public void setOutTradeDate(String outTradeDate) { this.outTradeDate = outTradeDate; } public String toString(){ return new StringBuilder("order=").append(orderNo).append("|amount=").append(amount).append("|outTradeDate=").append(outTradeDate).toString(); } }
定义序列化和反序列化对象类:
public class PayInfoSerializer implements Serializer<PayInfo>{ public PayInfoSerializer(){ } public void configure(Map paramMap, boolean paramBoolean) { } public byte[] serialize(String paramString, PayInfo pay) { return BeanUtils.ObjectToBytes(pay); } public void close() { } }
public class PayInfoDeSerializer implements Deserializer<PayInfo>{ public void configure(Map<String, ?> paramMap, boolean paramBoolean) { } public PayInfo deserialize(String paramString, byte[] paramArrayOfByte) { return (PayInfo) BeanUtils.BytesToObject(paramArrayOfByte); } public void close() { } }
生产者demo:
import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class ProducerDemo { private static String topic = "pay"; // 定义要操作的主题 public static void main(String[] args) { try{ newKafka(); }catch(Exception e){ System.out.println(e); } } /** *新版kafka生产者示例 */ private static void newKafka(){ Properties pro = new Properties(); pro.setProperty("bootstrap.servers", "192.168.18.132:9092"); pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); pro.setProperty("value.serializer", PayInfoSerializer.class.getName()); KafkaProducer<String, Object> producer = new KafkaProducer<String, Object>(pro); Future<RecordMetadata> future = producer.send(new ProducerRecord<String, Object>(topic,new PayInfo("201703200001",101L,"20170320"))); System.out.println(future.toString()); } }
消费者demo:
package kafka; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class ConsumerDemo { private static Logger log = LoggerFactory.getLogger(ConsumerDemo.class); private static String topic = "pay"; // 定义要操作的主题 public static void main(String[] args) { newConsumer(); } private static void newConsumer(){ Properties pro = new Properties(); pro.setProperty("bootstrap.servers", "192.168.18.132:9092"); pro.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); pro.setProperty("value.deserializer", PayInfoDeSerializer.class.getName()); pro.setProperty("group.id", "group1"); KafkaConsumer<String,Object> consumer = new KafkaConsumer<String,Object>(pro); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, Object> records = consumer.poll(100); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } for (ConsumerRecord<String, Object> record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } } }
开启消费者demo持续等待生产者发送消息。
运行生产者demo。
在消费者控制台打印出如下结果:
offset = 0, key = null, value = order=201703200001|amount=100|outTradeDate=20170320
一个简单的单点kafka示例就完成啦~
未完待续。。。。。。
相关文章推荐
- kafka学习(二)---- Kafka简单的Java版本的Hello World实例
- java基础学习---java对象序列化基础知识
- JAVA第一周学习周记:java基础及面向对象基础知识
- 学习Spring必学的Java基础知识(1)----反射
- 黑马程序员_java学习日记_Java基础小知识学习整理
- 黑马程序员_java基础知识学习总结一
- java学习笔记,关于java的一些基础知识,适用于初学者,第一节
- JAVA学习笔记(基础知识)
- 学习Spring必学的Java基础知识(1)----反射
- 学习Spring必学的Java基础知识----反射
- 学习Spring必学的Java基础知识(6)----ThreadLocal
- java基础知识学习2
- java中一些简单的基础知识,温故而知新
- 学习Spring必学的Java基础知识(2)----动态代理
- 学习Spring必学的Java基础知识----反射
- Java学习基础知识
- Java 网络编程 学习笔记一 基础知识
- 黑马程序员 java学习笔记-基础知识盘点
- 黑马程序员_java基础知识学习总结二
- 学习Spring必学的Java基础知识(6)----ThreadLocal