kafka学习(三)----- Kafka整合SpringMVC实例
2016-08-22 00:00
661 查看
摘要: Kafka整合SpringMVC实例
Zookeeper下载基本使用
2. 安装Kafka
kafka基本概念以及环境搭建
3. 创建spring项目(建议使用maven方式创建)
项目截图(小红叉不影响项目的启动)
pom.xml配置
spring-kafka-consumer.xml配置
spring-kafka-producer.xml配置
其他代码请参看实例源码:源码下载
github下载地址
b、运行项目访问 http://localhost:8080/SpringMvcWithKafka/kafka/test //测试地址
效果如图:
c、查看kafka控制台信息输出,如下图:
说明:如果使用最新版本的kafka,上面的例子可能就跑步起来了,猜测应该是kafka版本问题,所以推荐使用稳定版本。
kafka学习(三)----- Kafka整合SpringMVC实例
一、概述
kafka一个高吞吐量的分布式发布订阅消息系统。有关知识请参看:kafka基本概念以及环境搭建,kafka整合springMVC需要用到一个开源框架:spring-integration-kafka,这个官方框架我就不介绍了,请自行百度。二、实例
1. 安装ZookeeperZookeeper下载基本使用
2. 安装Kafka
kafka基本概念以及环境搭建
3. 创建spring项目(建议使用maven方式创建)
项目截图(小红叉不影响项目的启动)
pom.xml配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kafka.demo</groupId> <artifactId>SpringMvcWithKafka</artifactId> <packaging>war</packaging> <version>0.0.1-SNAPSHOT</version> <name>SpringMvcWithKafka Maven Webapp</name> <url>http://maven.apache.org</url> <properties> <spring.version>4.2.5.RELEASE</spring.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.7.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.7.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.7.6</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> </dependency> </dependencies> <build> <finalName>SpringMvcWithKafka</finalName> </build> </project>
spring-kafka-consumer.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- topic test conf --> <int:channel id="inputFromKafka"> <int:dispatcher task-executor="kafkaMessageExecutor" /> </int:channel> <!-- zookeeper配置 可以配置多个 --> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" /> <!-- channel配置 auto-startup="true" 否则接收不发数据 --> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="inputFromKafka"> <int:poller fixed-delay="1" time-unit="MILLISECONDS" /> </int-kafka:inbound-channel-adapter> <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" /> <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" /> <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="auto.offset.reset">smallest</prop> <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M --> <prop key="fetch.message.max.bytes">5242880</prop> <prop key="auto.commit.interval.ms">1000</prop> </props> </property> </bean> <!-- 消息接收的BEEN --> <bean id="kafkaConsumerService" class="com.kafka.demo.service.impl.KafkaConsumerService" /> <!-- 指定接收的方法 --> <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumerService" method="processMessage" /> <int-kafka:consumer-context id="consumerContext" consumer-timeout="1000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="default1" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder" max-messages="5000"> <!-- 两个TOPIC配置 --> <int-kafka:topic id="myTopic" streams="4" /> <int-kafka:topic id="testTopic" streams="4" /> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> </beans>
spring-kafka-producer.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- commons config --> <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" /> <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> <constructor-arg value="java.lang.String" /> </bean> <bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="topic.metadata.refresh.interval.ms">3600000</prop> <prop key="message.send.max.retries">5</prop> <prop key="serializer.class">kafka.serializer.StringEncoder</prop> <prop key="request.required.acks">1</prop> </props> </property> </bean> <!-- topic test config --> <int:channel id="kafkaTopicTest"> <int:queue /> </int:channel> <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapterTopicTest" kafka-producer-context-ref="producerContextTopicTest" auto-startup="true" channel="kafkaTopicTest" order="3"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="1" task-executor="taskExecutor" /> </int-kafka:outbound-channel-adapter> <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500" /> <int-kafka:producer-context id="producerContextTopicTest" producer-properties="producerProperties"> <int-kafka:producer-configurations> <!-- 多个topic配置 --> <int-kafka:producer-configuration broker-list="localhost:9092" key-serializer="stringSerializer" value-class-type="java.lang.String" value-serializer="stringSerializer" topic="testTopic" /> <int-kafka:producer-configuration broker-list="localhost:9092" key-serializer="stringSerializer" value-class-type="java.lang.String" value-serializer="stringSerializer" topic="myTopic" /> </int-kafka:producer-configurations> </int-kafka:producer-context> </beans>
其他代码请参看实例源码:源码下载
github下载地址
三、实例演示
a、先根据配置文件使用命令行创建两个topic,截图如下:b、运行项目访问 http://localhost:8080/SpringMvcWithKafka/kafka/test //测试地址
效果如图:
c、查看kafka控制台信息输出,如下图:
说明:如果使用最新版本的kafka,上面的例子可能就跑步起来了,猜测应该是kafka版本问题,所以推荐使用稳定版本。
相关文章推荐
- kafka学习(四)---- Kafka整合SpringMVC实例(二)
- 4000 Shiro学习--与SpringMVC整合(数据库,Shiro注解和Shiro标签)
- SpringMVC与iReport(JasperReports) 5.6整合开发实例
- quartz任务调度整合springMVC学习一
- 学习淘淘商城第十课(SSM框架整合之springmvc整合及父子容器的关系)
- [SpringMVC]SpringMVC学习笔记一: springmvc原理及实例解析.
- 学习整合hibernate springmvc spring的 心得(3)
- springmvc学习笔记(6)-springmvc整合mybatis(IDEA中通过maven构建)
- JStorm学习笔记-基于Kafka、ElasticSearch、HBase简单实例
- springmvc学习笔记(6)-springmvc整合mybatis(IDEA中通过maven构建)
- [SpringMVC]SpringMVC学习笔记一: springmvc原理及实例解析.
- Dubbo与Zookeeper、SpringMVC mybatis整合和使用(负载均衡、容错) 实例
- SpringMVC与iReport(JasperReports) 5.6整合开发实例
- Flume-Kafka-Storm 整合实例
- FreeMarker与SpringMVC整合实例代码教程
- springMVC学习笔记,SpringMV与web项目的整合(xml方式)
- 学习SpringMVC——整合Hibernate、Spring、SpringMVC
- SpringMVC4+thymeleaf3的一个简单实例(篇二:springMVC与thymeleaf的整合)
- java struts2入门学习实例--用户注册和用户登录整合
- 【SpringMVC整合MyBatis】商品修改功能分析 ---SpringMVC学习笔记(三)