Spring注解方式集成Kafka(spring-kafka的使用)
2017-07-11 15:48
881 查看
import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation; /** * * Description:Kafka生产者 * Date: 2017年7月11日 * * 依赖: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.0.5.RELEASE</version> </dependency> * * 使用案例: @Resource private KafkaTemplate kafkaTemplate; 调用方法发送数据: kafkaTemplate.send(topic, msg); * */ @Configuration @EnableKafka public class KafkaProducer { /** * * Description:获取配置 * Date: 2017年7月11日 * @author shaqf */ public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); // kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list")); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } /** 获取工厂 */ public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** 注册实例 */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation; /** * * Description:Kafka消费者 * Date: 2017年7月11日 * * 依赖: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.0.5.RELEASE</version> </dependency> * * 使用案例: @KafkaListener(topics = { "taskCmd" }) public void taskCmd(ConsumerRecord<?, ?> record) { Object message = record.value(); logger.info("收到管理平台命令:" + message); } * */ @Configuration @EnableKafka public class KafkaConsumer { /** * * Description:获取配置 * Date: 2017年7月11日 * @author shaqf */ public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list")); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "Agent-Server-1.0.2"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } /** 获取工厂 */ public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } /** 获取实例 */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } }
还有一种方式,直接在配置文件里面添加Kafka地址即可,以上代码都可以省略
相关文章推荐
- Spring注解方式集成Kafka(spring-kafka的使用)
- spring 和 redis的集成(使用注解方式)
- Mybatis3.2.1使用例二:Mapper方式集成Spring、注解提供SQL
- dubbox在异构系统中的使用-集成mybatis全注解方式到spring
- 关于spring与struts2使用Annotion注解方式的集成
- 从零学spring boot--集成mybatis--项目--使用注解方式
- Spring集成Hibernate,使用JPA注解方式,新增数据无法提交
- spring 注解方式下使用commons-validator 验证表单
- Spring 使用Spring注解方式管理事务与传播行为
- 使用Spring的注解方式实现AOP
- 使用Spring的注解方式实现AOP
- DWR高级主题之DWR与spring集成(使用springMVC,并使用注解的实例)
- Spring学习笔记(14)----使用Spring的注解方式实现AOP
- spring 3 中使用注解的方式来进行任务调度。
- spring aop 功能初次使用(注解方式)
- aop(使用xml方式)(spring团队建议我们使用注解的方式)
- DWR高级主题之DWR与spring集成(使用springMVC,非注解的实例)
- 使用Spring注解方式管理事务与传播行为详解
- DWR高级主题之DWR与spring集成(不使用springMVC,不使用注解的实例)
- 使用Spring的注解方式实现AOP的细节