Kafka系列之-自定义Producer
2016-08-15 23:37
363 查看
前面已经讲到了,在Kafka中,Message是由Producer产生的,Producer产生的Message会发送到Topic的指定Partition中。Producer可以有多种形式,也可以由用户通过Java,C以及Python语言来自定义。
Kafka中Producer的主要作用和地位如下图所示,Producer通过获取某个Topic指定Partition的Leader节点连接到Kafka集群中,
1. kafka.javaapi.producer.Producer
类定义:
UML图:
2. kafka.producer.ProducerConfig
类定义:
UML图:
3. kafka.producer.KeyedMessage
类定义:
具体代码和过程在代码和注释中。
1、Java代码
2、运行
(1)启动ZooKeeper
(2)启动Kafka集群
(3)创建测试Topic
(4)运行SimpleKafkaProducer 代码
运行该代码,向simple-kafka-producer Topic发送10条Message
(5)查看simple-kafka-producer中的Message
在这里,将会模拟一个网页访问日志生成的过程,每条随机生成的日志Message中包含三个部分的信息:
- 页面访问时间戳
- 页面名称
- 访问页面的IP地址
1、Java代码
(1)Producer
(2)Partitioner
2、运行
由于前面已经启动了ZooKeeper以及Kafka,这里直接从创建Topic开始
(1)创建Topic
创建一个partition为3,replication为3的topic。
如何使用list命令查看该Topic,可以参考前面的示例
(2)运行Java代码
往partition-kafka-producer Topic中写入100条随机生成的Message。
(3)查看这些Message
经过封装后,producer有关的参数都写在properties文件中。
第二步中的Producer的调用方法为:
两行代码就可以将该message发送到配置的Kafka集群指定的topic中。
第三步中的自定义Partitioner的Producer的调用方法为:
具体代码可以参考KafkaProducerTool。
欢迎提出宝贵意见。
Kafka中Producer的主要作用和地位如下图所示,Producer通过获取某个Topic指定Partition的Leader节点连接到Kafka集群中,
一、Java Producer API
用户可以基于Kafka提供的API自定义Producer,在这些API中有几个主要的类:1. kafka.javaapi.producer.Producer
类定义:
class Producer[ K,V ](private val underlying: kafka.producer.Producer[K ,V])
UML图:
2. kafka.producer.ProducerConfig
类定义:
class ProducerConfig private (val props: VerifiableProperties) extends AsyncProducerConfig with SyncProducerConfigShared
UML图:
3. kafka.producer.KeyedMessage
类定义:
case class KeyedMessage[ K, V ](val topic: String, val key: K, val partKey: Any , val message: V)
二、自定义简单的Producer
接下来根据上面的三个类,使用Java代码实现一个简单的Producer向Broker发送Message。这个Producer会为特定的Topic生成Message并发送到默认的Partition中。具体代码和过程在代码和注释中。
1、Java代码
package ckm.kafka.producer; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Date; import java.util.Properties; /** * 一个简单的Kafka Producer类,传入两个参数: * topic num * 设置主题和message条数 * * 执行过程: * 1、创建一个topic * kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic xxxx * 2、运行本类中的代码 * 3、查看message * kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx * kafka */ public class SimpleKafkaProducer { /** * Producer的两个泛型,第一个指定Key的类型,第二个指定value的类型 */ private static Producer<String, String> producer; public SimpleKafkaProducer() { Properties props = new Properties(); /** * 指定producer连接的broker列表 */ props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092"); /** * 指定message的序列化方法,用户可以通过实现kafka.serializer.Encoder接口自定义该类 * 默认情况下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化 */ props.put("serializer.class", "kafka.serializer.StringEncoder"); /** * 这个参数用于通知broker接收到message后是否向producer发送确认信号 * 0 - 表示producer不用等待任何确认信号,会一直发送消息, * 否则producer进入等待状态 * -1 - 表示leader状态的replica需要等待所有in-sync状态的replica都接收到消息后才会向producer发送确认信号, * 再次之前producer一直处于等待状态 */ props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } public static void main(String[] args) { if (args.length < 2) { System.out.println("Please Input Topic and Message Numbers"); } String topic = (String) args[0]; int count = Integer.parseInt((String) args[1]); System.out.println("Topic = " + topic); System.out.println("Message Nums = " + count); SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer(); simpleProducer.publishMessage(topic, count); } /** * 根据topic和消息条数发送消息 * @param topic * @param count */ private void publishMessage(String topic, int count) { for (int i = 0; i < count; i ++) { String runtime = new Date().toString(); String msg = "Message published time - " + runtime; System.out.println("msg = " + msg); /** * 第一个泛型指定用于分区的key的类型,第二个泛型指message的类型 * topic只能为String类型 */ KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg); producer.send(data); } producer.close(); } }
2、运行
(1)启动ZooKeeper
$ZK_HOME/bin/zkServer.sh start
(2)启动Kafka集群
cd $KAFKA_HOME nohup bin/kafka-server-start.sh config/server.properties &
(3)创建测试Topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 1 --partition 3 --topic simple-kafka-producer
(4)运行SimpleKafkaProducer 代码
运行该代码,向simple-kafka-producer Topic发送10条Message
java -cp KafkaTestProgram.jar ckm.kafka.producer.SimpleKafkaProducer simple-kafka-producer 10
(5)查看simple-kafka-producer中的Message
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic simple-kafka-producer
三、自定义Partition的Producer
这一节中除了实现Producer之外,还自定义了Message的Partition划分过程。在这里,将会模拟一个网页访问日志生成的过程,每条随机生成的日志Message中包含三个部分的信息:
- 页面访问时间戳
- 页面名称
- 访问页面的IP地址
1、Java代码
(1)Producer
package ckm.kafka.producer; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Date; import java.util.Properties; import java.util.Random; /** * 一个自定义分区的Kafka Producer类,传入两个参数: * topic num * 设置主题和message条数 * * 模拟用户点击日志,日志格式为:“时间,网址,IP地址"格式 * * 自定义分区,通过IP地址最后一位与分区数求余,message分散到0~partition - 1这些分区中 * * 执行过程: * 1、创建一个topic * kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic xxxx * 2、运行本类中的代码 * 3、查看message * kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx * kafka */ public class KafkaProducerWithPartition { /** * Producer的两个泛型,第一个指定Key的类型,第二个指定value的类型 */ private static Producer<String, String> producer; public KafkaProducerWithPartition() { Properties props = new Properties(); /** * 指定producer连接的broker列表 */ props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092"); /** * 指定message的序列化方法,用户可以通过实现kafka.serializer.Encoder接口自定义该类 * 默认情况下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化 */ props.put("serializer.class", "kafka.serializer.StringEncoder"); /** * 这个参数用于通知broker接收到message后是否向producer发送确认信号 * 0 - 表示producer不用等待任何确认信号,会一直发送消息 * 1 - 表示leader状态的replica在接收到message后需要向producer发送一个确认信号,否则producer进入等待状态 * -1 - 表示leader状态的replica需要等待所有in-sync状态的replica都接收到消息后才会向producer发送确认信号,再次之前producer一直处于等待状态 */ props.put("request.required.acks", "1"); /** * 指定partition类,自定义的分区类,继承自kafka.producer.Partitioner接口 */ props.put("partitioner.class", "ckm.kafka.producer.SimplePartitioner"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } public static void main(String[] args) { if (args.length < 2) { System.out.println("Please Input Topic and Message Numbers"); } String topic = (String) args[0]; int count = Integer.parseInt((String) args[1]); System.out.println("Topic = " + topic); System.out.println("Message Nums = " + count); KafkaProducerWithPartition simpleProducer = new KafkaProducerWithPartition(); simpleProducer.publishMessage(topic, count); } /** * 根据topic和消息条数发送消息 * @param topic * @param count */ private void publishMessage(String topic, int count) { Random random = new Random(); for (int i = 0; i < count; i ++) { String runtime = new Date().toString(); // 访问的IP地址 String clientIP = "192.168.1." + random.nextInt(255); String msg = runtime + ",kafka.apache.org," + clientIP; System.out.println("msg = " + msg); /** * 第一个泛型指定用于分区的key的类型,第二个泛型指message的类型 * topic只能为String类型 * 和上一个Producer相比,多了一个用于分区的key */ KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, clientIP, msg); producer.send(data); } producer.close(); } }
(2)Partitioner
package ckm.kafka.producer; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; /** * Created by ckm on 2016/8/3. */ public class SimplePartitioner implements Partitioner { /** * 不写这个方法,会报错 * Exception in thread "main" java.lang.NoSuchMethodException: ckm.kafka.producer.SimplePartitioner.<init>(kafka.utils.VerifiableProperties) * at java.lang.Class.getConstructor0(Class.java:2892) * at java.lang.Class.getConstructor(Class.java:1723) * at kafka.utils.Utils$.createObject(Utils.scala:436) * at kafka.producer.Producer.<init>(Producer.scala:61) * at kafka.javaapi.producer.Producer.<init>(Producer.scala:26) * at ckm.kafka.producer.KafkaProducerWithPartition.<init>(KafkaProducerWithPartition.java:58) * at ckm.kafka.producer.KafkaProducerWithPartition.main(KafkaProducerWithPartition.java:70) * @param verifiableProperties */ public SimplePartitioner(VerifiableProperties verifiableProperties) { } public int partition(Object key, int numPartitions) { int partition = 0; String partitionKey = (String) key; int offset = partitionKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt(partitionKey.substring(offset + 1)) % numPartitions; } return partition; } }
2、运行
由于前面已经启动了ZooKeeper以及Kafka,这里直接从创建Topic开始
(1)创建Topic
创建一个partition为3,replication为3的topic。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 3 --partitions 3 --topic partition-kafka-producer
如何使用list命令查看该Topic,可以参考前面的示例
(2)运行Java代码
java -cp KafkaTestProgram.jar ckm.kafka.producer.KafkaProducerWithPartition partition-kafka-producer 100
往partition-kafka-producer Topic中写入100条随机生成的Message。
(3)查看这些Message
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic partition-kafka-producer
四、自定义Producer的封装
上面两种自定义的Producer中,其实有很多代码是重复性的。接下来对Kafka自定义Producer进行一定的封装,使其使用和配置更加简便。经过封装后,producer有关的参数都写在properties文件中。
第二步中的Producer的调用方法为:
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); kafkaProducerTool.publishMessage("test message");
两行代码就可以将该message发送到配置的Kafka集群指定的topic中。
第三步中的自定义Partitioner的Producer的调用方法为:
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); Properties producerProperties = kafkaProducerTool.getProducerProperties(); // 如果properties配置文件中没有配置该参数的话,手动设置 producerProperties.put("partitioner.class", "SimplePartitioner"); kafkaProducerTool.publishPartitionedMessage("partition-key", "test messate");
具体代码可以参考KafkaProducerTool。
欢迎提出宝贵意见。
相关文章推荐
- Kafka系列之-自定义Producer
- Kafka系列之-自定义Producer
- Kafka系列2-producer和consumer报错
- Kafka Producer端自定义消息
- Kafka系列3-python版本producer生产者和consumer消费者实例
- Kafka系列2-producer和consumer报错
- 深入Atlas系列:探究Application Services(2) - 自定义服务器端Profile Service支持
- EMF介绍系列(六、自定义命令)
- 深入Atlas系列:探究Application Services(2) - 自定义服务器端Profile Service支持
- ASP.NET AJAX入门系列(8):自定义异常处理
- NHibernate考察系列 04 枚举 自定义类型 组件类型
- ASP.NET AJAX入门系列(8):自定义异常处理
- ASP.NET AJAX入门系列(8):自定义异常处理
- 深入Atlas系列:探究Application Services(3) - 自定义客户端Profile Service支持
- NLog文章系列——如何写自定义布局生成器(Layout Renderer)
- ASP.NET AJAX入门系列(8):自定义异常处理
- 深入Atlas系列:客户端网络访问基础结构示例(1) - 编写并使用自定义的WebRequestExecutor
- 深入Atlas系列:Web Sevices Access in Atlas示例(5) - 自定义TypeConverter把基础类型转换为复杂类型
- SharePoint Web Service系列:编写自定义SharePoint Web Services之一
- 深入Atlas系列:Web Sevices Access in Atlas示例(5) - 自定义TypeConverter把基础类型转换为复杂类型