Spring注解方式集成Kafka(spring-kafka的使用)
2018-03-08 15:39
561 查看
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bea
4000
n;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation;
/**
*
* Description:Kafka生产者
* Date: 2017年7月11日
*
* 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
*
* 使用案例:
@Resource
private KafkaTemplate kafkaTemplate;
调用方法发送数据:
kafkaTemplate.send(topic, msg);
*
*/
@Configuration
@EnableKafka
public class KafkaProducer {
/**
*
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
*/
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/** 获取工厂 */
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/** 注册实例 */
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}</pre><br>
<br>
<pre></pre>
<p></p>
<pre></pre>
<pre code_snippet_id="2478280" snippet_file_name="blog_20170711_2_9410437" name="code" class="java"></pre><pre code_snippet_id="2478280" snippet_file_name="blog_20170711_2_9410437" name="code" class="java">import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation;
/**
*
* Description:Kafka消费者
* Date: 2017年7月11日
*
* 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
*
* 使用案例:
@KafkaListener(topics = { "taskCmd" })
public void taskCmd(ConsumerRecord<?, ?> record) {
Object message = record.value();
logger.info("收到管理平台命令:" + message);
}
*
*/
@Configuration
@EnableKafka
public class KafkaConsumer {
/**
*
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
*/
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Agent-Server-1.0.2");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
/** 获取工厂 */
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/** 获取实例 */
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
还有一种方式,直接在配置文件里面添加Kafka地址即可,以上代码都可以省略
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bea
4000
n;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation;
/**
*
* Description:Kafka生产者
* Date: 2017年7月11日
*
* 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
*
* 使用案例:
@Resource
private KafkaTemplate kafkaTemplate;
调用方法发送数据:
kafkaTemplate.send(topic, msg);
*
*/
@Configuration
@EnableKafka
public class KafkaProducer {
/**
*
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
*/
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/** 获取工厂 */
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/** 注册实例 */
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}</pre><br>
<br>
<pre></pre>
<p></p>
<pre></pre>
<pre code_snippet_id="2478280" snippet_file_name="blog_20170711_2_9410437" name="code" class="java"></pre><pre code_snippet_id="2478280" snippet_file_name="blog_20170711_2_9410437" name="code" class="java">import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation;
/**
*
* Description:Kafka消费者
* Date: 2017年7月11日
*
* 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
*
* 使用案例:
@KafkaListener(topics = { "taskCmd" })
public void taskCmd(ConsumerRecord<?, ?> record) {
Object message = record.value();
logger.info("收到管理平台命令:" + message);
}
*
*/
@Configuration
@EnableKafka
public class KafkaConsumer {
/**
*
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
*/
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Agent-Server-1.0.2");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
/** 获取工厂 */
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/** 获取实例 */
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
还有一种方式,直接在配置文件里面添加Kafka地址即可,以上代码都可以省略
相关文章推荐
- Spring注解方式集成Kafka(spring-kafka的使用)
- 从零学spring boot--集成mybatis--项目--使用注解方式
- dubbox在异构系统中的使用-集成mybatis全注解方式到spring
- 关于spring与struts2使用Annotion注解方式的集成
- spring 和 redis的集成(使用注解方式)
- Mybatis3.2.1使用例二:Mapper方式集成Spring、注解提供SQL
- Spring集成Hibernate,使用JPA注解方式,新增数据无法提交
- 集成Struts2 spring hibernate使用注解
- spring 3 中使用注解的方式来进行任务调度。
- Struts1.2&Spring2.5&Hibernate3.2集成---全注解方式
- DWR高级主题之DWR与spring集成(不使用springMVC,不使用注解的实例)
- spring 3 中使用注解的方式来进行任务调度。
- Spring学习笔记(15)----使用Spring注解方式管理事务
- 使用Spring的注解方式实现AOP
- DWR高级主题之DWR与spring集成(不使用springMVC,但使用注解的实例)
- action控制层,使用spring注解方式注入业务bean报空指向错误
- 使用Spring注解方式管理事务与传播行为详解
- 注解方式使用hibernate与spring
- Struts1.2&Spring2.5&Hibernate3.2集成---非注解方式
- DWR高级主题之DWR与spring集成(使用springMVC,非注解的实例)