java kafka消息的发送与接收
2017-07-28 19:24
429 查看
java kafka消息的发送与接收
消息队列在java EE级开发是很常用到的工具之一,在众多消息队列当中,active mq与kafka相对比较受开发者的喜爱,那么kafka是怎样实现消息的发送与接收呢?这里我们的消息通过一个实体类对象来进行封装,前提是你服务器上已经搭建好kafka环境,整个代码如下:
1、kafka工具类,实现了生产者消费者消息的封装,以及发送消息到kafka上和从kafka上获取消息。
2、实体类是IpranAlarm,可以用其对象封装消息,调用发送与接收方法就可以实现相应的功能了。
消息队列在java EE级开发是很常用到的工具之一,在众多消息队列当中,active mq与kafka相对比较受开发者的喜爱,那么kafka是怎样实现消息的发送与接收呢?这里我们的消息通过一个实体类对象来进行封装,前提是你服务器上已经搭建好kafka环境,整个代码如下:
1、kafka工具类,实现了生产者消费者消息的封装,以及发送消息到kafka上和从kafka上获取消息。
package com.starit.ipran.kafka; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.starit.ipran.load.Constants; import com.starit.ipran.model.IpranAlarm; public class KafkaUtils { private final static Logger LOGGER = LoggerFactory.getLogger(KafkaUtils.class); private static Producer<String, String> producer; private static Consumer<String, String> consumer; private KafkaUtils() { } /** * 生产者,注意kafka生产者不能够从代码上生成主题,只有在服务器上用命令生成 */ static { Properties props = new Properties(); props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);//服务器ip:端口号,集群用逗号分隔 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } /** * 消费者 */ static { Properties props = new Properties(); props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);//服务器ip:端口号,集群用逗号分隔 props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(Constants.TOPIC_NAME)); } /** * 发送对象消息 至kafka上,调用json转化为json字符串,应为kafka存储的是String。 * @param msg */ public static void sendMsgToKafka(IpranAlarm msg) { producer.send(new ProducerRecord<String, String>(Constants.TOPIC_NAME, String.valueOf(new Date().getTime()), JSON.toJSONString(msg))); } /** * 从kafka上接收对象消息,将json字符串转化为对象,便于获取消息的时候可以使用get方法获取。 */ public static void getMsgFromKafka(){ while(true){ ConsumerRecords<String, String> records = KafkaUtils.getKafkaConsumer().poll(100); if (records.count() > 0) { for (ConsumerRecord<String, String> record : records) { JSONObject jsonAlarmMsg = JSON.parseObject(record.value()); IpranAlarm alarmMsg = JSONObject.toJavaObject(jsonAlarmMsg, IpranAlarm.class); LOGGER.info("从kafka接收到的消息是:" + alarmMsg.toString()); } } } } public static Consumer<String, String> getKafkaConsumer() { return consumer; } public static void closeKafkaProducer() { producer.close(); } public static void closeKafkaConsumer() { consumer.close(); } }
2、实体类是IpranAlarm,可以用其对象封装消息,调用发送与接收方法就可以实现相应的功能了。
相关文章推荐
- java向linux的kafka发送消息 并接收消息实例
- 关于Java连接虚拟机中的Kafka时,无法发送、接收消息的问题
- WebSphere MQ安装配置,java发送接收消息
- Java Socket发送与接收HTTP消息简单实现
- 求助在java(java web)中发送接收soap消息。
- Web Service学习笔记(webservice、soap、wsdl、jws详细分析) Webservice的wsdl文件解析与Soap消息的发送、接收(不生成java客户端代码)
- Java Socket发送与接收HTTP消息简单实现
- Activemq 消息发送、接收java代码实现队列模式
- java 实现jms的客户端(发送接收消息)
- Java实现XMPP发送接收消息和文件功能
- php rdkafka扩展发送和接收消息
- Java与微信不得不说的故事——消息的接收与发送
- kafka(java客户端)消费者取不到消息,生产者消息也没发送成功
- RabbitMQ学习笔记二:rabbitmq发送接收消息Helloworld(Java版)
- JAVA开发微信公众号-接收和发送消息
- java activeMQ消息的发送与接收
- Java Socket发送与接收HTTP消息简单实现
- java DategramSocker发送消息和接收消息
- Android非UI主线程中,若干普通Java线程使用Handler发送接收消息
- Java-TCP/IP 编程-01 主线程:从控制台录入消息,发送到服务器 子线程:接收服务器转发来的消息