spring boot与kafka集成(spring boot 1.5.1版本)
2017-04-11 18:39
896 查看
spring boot与kafka集成(spring boot 1.5.1版本)
作者 SamHxm
2017.03.05 20:33
字数 321 阅读 345评论 0喜欢 19
随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便。
引入依赖<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
具体spring-kafka的版本由spring boot的当前版本决定。
application.properties配置文件
spring.kafka.bootstrap-servers=192.168.1.107:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
最简化的配置仅需指定kafka主机和消息者组名即可。这里使用的是单节点kafka,集群环境中配置多个kafka主机地址即可。例如:
spring.kafka.bootstrap-servers=192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092
以下4项配置指定消息key和消息体的编解码方式。
spring.kafka.consumer.key-deserializer spring.kafka.consumer.value-deserializer spring.kafka.producer.key-serializer spring.kafka.producer.value-serializer
消息对象
import java.util.Date; public class Message { private Long id; private String msg; private Date sendTime; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public Date getSendTime() { return sendTime; } public void setSendTime(Date sendTime) { this.sendTime = sendTime; } }
消息生产者
import java.util.Date; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @Component public class Sender { @Autowired private KafkaTemplate kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void sendMessage(){ Message m = new Message(); m.setId(System.currentTimeMillis()); m.setMsg(UUID.randomUUID().toString()); m.setSendTime(new Date()); kafkaTemplate.send("test1", gson.toJson(m)); } }
消息消费者
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @Component public class Receiver { private Gson gson = new GsonBuilder().create(); @KafkaListener(topics = "test1") public void processMessage(String content) { Message m = gson.fromJson(content, Message.class); } }
运行
@SpringBootApplication public class AppStart { public static void main(String[] args) throws InterruptedException { ApplicationContext app = SpringApplication.run(AppStart.class, args); while(true){ Sender sender = app.getBean(Sender.class); sender.sendMessage(); Thread.sleep(500); } } }
通过上面的示例可以发现,相对于spring boot 1.4.x版本,1.5集成kafka主要是将以前需要手工编码进行设置的kafka配置改由spring配置文件定义。
注意
我使用的spring boot版本是1.5.1,spring-kafka版本1.1.2,jdk1.8,该组合似乎不支持低版本的kafka。之前我使用kafka版本为2.11-0.10.0.0,向kafka发送消息时一直产生异常,后来升级kafka版本至2.11-0.10.2.0故障消失。由于测试时间有限,未作进一步分析。希望查明原因的同学能私信我。谢谢。
相关文章推荐
- 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便。
- spring boot 学习(六)spring boot 各版本中使用 log4j2 记录日志
- springboot 1.5.2 集成kafka 简单例子
- spring boot 与kafka集成
- Spring Boot(四):Spring Boot 集成 Thymeleaf
- Spring Boot教程 - Spring Boot集成Mybatis(XML方式)
- Spring Boot系列一 spring boot 集成 slf4j 和 logback
- springboot和kafka集成
- spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)
- springboot - 集成kafka完整代码实现
- [Spring Boot 系列] 集成maven和Spring boot的profile功能
- spring-boot集成kafka
- springboot kafka集成
- Kafka 安装-配置-监控 与集成springboot
- [Spring Boot 系列] 集成maven和Spring boot的profile功能
- Spring Boot系列教程十:Spring boot集成MyBatis
- Spring Boot集成kafka笔记
- Spring Boot集成kafka笔记
- Apache Kafka 入门 - Spring Boot 集成 Kafka
- Spring Boot教程 - Spring Boot集成Mybatis(注解配置)