您的位置:首页 > 编程语言 > Java开发

使用spring集成的kafka收发消息

2017-09-06 14:09 561 查看
原文地址:http://blog.csdn.net/u011734144/article/details/52351289

1. 引入maven依赖

[html] view
plain copy

<dependency>  

    <groupId>org.springframework.integration</groupId>  

    <artifactId>spring-integration-kafka</artifactId>  

    <version>${spring-integration-kafka.version}</version>  

</dependency>  

2. 生产者的xml配置

[html] view
plain copy

<?xml version="1.0" encoding="UTF-8"?>  

<beans xmlns="http://www.springframework.org/schema/beans"  

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

       xmlns:int="http://www.springframework.org/schema/integration"  

       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"  

       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd  

        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd  

        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">  

  

    <int:channel id="outWriteBackLemmaRecordChannel" />  

  

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"  

                                        kafka-template="kafkaTemplate"  

                                        auto-startup="true"  

                                        channel="outWriteBackLemmaRecordChannel"  

                                        order="3"  

                                        topic="writeBackLemmaRecordTopic">  

        <int-kafka:request-handler-advice-chain>  

            <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />  

        </int-kafka:request-handler-advice-chain>  

    </int-kafka:outbound-channel-adapter>  

  

    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">  

        <constructor-arg>  

            <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">  

                <constructor-arg>  

                    <map>  

                        <entry key="bootstrap.servers" value="1.1.1.1:9092,2.2.2.2:9092"/>  

                        <entry key="retries" value="10"/>  

                        <entry key="batch.size" value="16384"/>  

                        <entry key="linger.ms" value="1"/>  

                        <entry key="buffer.memory" value="33554432"/>  

                        <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>  

                        <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>  

                    </map>  

                </constructor-arg>  

            </bean>  

        </constructor-arg>  

        <constructor-arg name="autoFlush" value="true"/>  

        <property name="defaultTopic" value="writeBackLemmaRecordTopic"/>  

    </bean>  

  

    <bean id="kafkaProducerService"  

          class="com.soso.baike.admin.service.kafka.producer.impl.KafkaProducerServiceImpl"/>  

</beans>  

针对DefaultKafkaProducerFactory 的参数,本公司其实是配置注册到了zookeeper上,针对开发环境,预发环境,线上环境的配置是不同的,所以zookeeper上分别针对不同的环境注册了三套配置文件,发布的时候,会根据要发布的环境去zookeeper上拉取对应环境的配置文件,从而填充DefaultKafkaProducerFactory的构造参数

3. 发送消息

发送消息是上述配置文件中配置的KafkaProducerServiceImpl类

[html] view
plain copy

package com.soso.baike.admin.service.kafka.producer.impl;  

  

import com.soso.baike.admin.service.kafka.producer.IKafkaProducerService;  

import org.slf4j.Logger;  

import org.slf4j.LoggerFactory;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.kafka.core.KafkaTemplate;  

  

  

public class KafkaProducerServiceImpl implements IKafkaProducerService {  

    private Logger logger = LoggerFactory.getLogger("kafka");  

  

    @Autowired  

    private KafkaTemplate<Integer, String> kafkaTemplate;<span style="white-space:pre"> </span>//这个已经在上述xml文件中配置  

  

    @Override  

    public void sendMessage(String topic, String data) {  

        logger.info("the message is to be send by kafka is : topic = {}, data = {}", topic, data);  

        kafkaTemplate.setDefaultTopic(topic);  

        kafkaTemplate.sendDefault(data);  

    }  

  

    @Override  

    public void sendMessage(String topic, int key, String data) {  

        logger.info("the message is to be send by kafka is : topic = {}, data = {}", topic, data);  

        kafkaTemplate.setDefaultTopic(topic);  

        kafkaTemplate.sendDefault(key, data);  

    }  

}  

4.  消费者xml配置

[html] view
plain copy

<?xml version="1.0" encoding="UTF-8"?>  

<beans xmlns="http://www.springframework.org/schema/beans"  

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"  

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  

        http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">  

  

    <int-kafka:message-driven-channel-adapter  

            id="kafkaMessageDrivenChannelAdapter"  

            listener-container="kafkaMessageListenerContainer"  

            auto-startup="true"  

            phase="100"  

            send-timeout="5000"  

            channel="nullChannel"  

            message-converter="messagingMessageConverter"  

            error-channel="errorChannel"/>  

  

    <bean id="messagingMessageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>  

  

    <bean id="kafkaMessageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">  

        <constructor-arg>  

            <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">  

                <constructor-arg>  

                    <map>  

                        <entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}"/>  

                        <entry key="group.id" value="${kafka.consumer.group.id}"/>  

                        <entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}"/>  

                        <entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}"/>  

                        <entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}"/>  

                        <entry key="key.deserializer" value="${kafka.consumer.key.deserializer}"/>  

                        <entry key="value.deserializer" value="${kafka.consumer.value.deserializer}"/>  

                    </map>  

                </constructor-arg>  

            </bean>  

        </constructor-arg>  

        <constructor-arg>  

            <bean class="org.springframework.kafka.listener.config.ContainerProperties">  

                <constructor-arg name="topics" value="writeBackLemmaRecordTopic"/>  

            </bean>  

        </constructor-arg>  

    </bean>  

  

    <!-- 实际执行消息消费的类 -->  

    <bean id="kafkaConsumerService"  

          class="com.soso.baike.admin.service.kafka.consumer.impl.KafkaConsumerServiceImpl"/>  

</beans>  

上述DefaultKafkaConsumerFactory的构造参数就是在配置文件中配置的,这里你可以直接替换成实际的参数而不用配置文件

5. 接收消息类是上述配置文件中配置的KafkaConsumerServiceImpl类,代码如下:

[html] view
plain copy

package com.soso.baike.admin.service.kafka.consumer.impl;  

  

import com.soso.baike.admin.constant.KafkaConstants;  

import com.soso.baike.admin.lmaimp.DummyUser;  

import com.soso.baike.admin.service.kafka.consumer.IKafkaConsumerService;  

import com.soso.baike.audit.Auditors;  

import com.soso.baike.audit.db.LemmaAuditDao;  

import com.soso.baike.audit.lemma.LemmaRecord;  

import com.soso.baike.audit.lemma.LemmaWriteBackOp;  

import com.soso.baike.domain.IdConvert;  

import org.apache.commons.lang.math.NumberUtils;  

import org.apache.kafka.clients.consumer.ConsumerRecord;  

import org.slf4j.Logger;  

import org.slf4j.LoggerFactory;  

import org.springframework.beans.factory.InitializingBean;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.kafka.listener.KafkaMessageListenerContainer;  

import org.springframework.kafka.listener.config.ContainerProperties;  

  

import java.util.concurrent.ArrayBlockingQueue;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.ThreadPoolExecutor;  

import java.util.concurrent.TimeUnit;  

  

/**  

 * Created by zhangyongguang on 2016/6/30.  

 */  

public class KafkaConsumerServiceImpl implements IKafkaConsumerService, InitializingBean {  

    private Logger logger = LoggerFactory.getLogger("kafka");  

  

    @Autowired  

    private KafkaMessageListenerContainer kafkaMessageListenerContainer;  

    @Autowired  

    private LemmaWriteBackOp lemmaWriteBackOp;  

  

    private int threadNum = 8;  

    private int maxQueueSize = 2000;  

    private ExecutorService executorService = new ThreadPoolExecutor(threadNum,  

            threadNum, 0L, TimeUnit.MILLISECONDS,  

            new ArrayBlockingQueue<Runnable>(maxQueueSize),  

            new ThreadPoolExecutor.CallerRunsPolicy());  

  

    @Override  

    public void onMessage(ConsumerRecord<Integer, String> record) {  

        logger.info("===============processMessage===============");  

        logger.info("the kafka message is arriving with topic = {}, partition = {}, key = {}, value = {}",  

                new Object[]{record.topic(), record.partition(), record.key(), record.value()});  

<span style="white-space:pre">    </span>//这里收到消息后,开启了一个线程来处理<span style="white-space:pre">   </span>  

        executorService.execute(new Runnable() {  

            @Override  

            public void run() {  

                String msg = record.value();  

                  

            }  

        });  

    }  

  

    @Override<span style="white-space:pre">   </span>//设置监听  

    public void afterPropertiesSet() throws Exception {  

        ContainerProperties containerProperties = kafkaMessageListenerContainer.getContainerProperties();  

  

        if (null != containerProperties) {  

            containerProperties.setMessageListener(this);  

        }  

    }  

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: