您的位置:首页 > 编程语言 > Java开发

Spring注解方式集成Kafka(spring-kafka的使用)

2018-03-08 15:39 561 查看
import java.util.HashMap;  
import java.util.Map;  
  
import org.apache.kafka.clients.producer.ProducerConfig;  
import org.apache.kafka.common.serialization.StringSerializer;  
import org.springframework.context.annotation.Bea
4000
n;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.annotation.EnableKafka;  
import org.springframework.kafka.core.DefaultKafkaProducerFactory;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.core.ProducerFactory;  
  
import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation;  
  
/** 
 *  
 * Description:Kafka生产者 
 * Date:        2017年7月11日 
 *  
 * 依赖: 
        <dependency> 
            <groupId>org.springframework.kafka</groupId> 
            <artifactId>spring-kafka</artifactId> 
            <version>1.0.5.RELEASE</version> 
        </dependency> 
 *  
 * 使用案例: 
    @Resource 
    private KafkaTemplate kafkaTemplate; 
     
    调用方法发送数据: 
    kafkaTemplate.send(topic, msg); 
 *  
 */  
@Configuration  
@EnableKafka  
public class KafkaProducer {  
  
    /** 
     *  
     * Description:获取配置 
     * Date:        2017年7月11日 
     * @author      shaqf 
     */  
    public Map<String, Object> producerConfigs() {  
        Map<String, Object> props = new HashMap<>();  
        // kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092  
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));  
        props.put(ProducerConfig.RETRIES_CONFIG, 0);  
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);  
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);  
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);  
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  
        return props;  
    }  
  
    /** 获取工厂 */  
    public ProducerFactory<String, String> producerFactory() {  
        return new DefaultKafkaProducerFactory<>(producerConfigs());  
    }  
  
    /** 注册实例 */  
    @Bean  
    public KafkaTemplate<String, String> kafkaTemplate() {  
        return new KafkaTemplate<>(producerFactory());  
    }  
}</pre><br>  
<br>  
<pre></pre>  
<p></p>  
<pre></pre>  
<pre code_snippet_id="2478280" snippet_file_name="blog_20170711_2_9410437" name="code" class="java"></pre><pre code_snippet_id="2478280" snippet_file_name="blog_20170711_2_9410437" name="code" class="java">import java.util.HashMap;  
import java.util.Map;  
  
import org.apache.kafka.clients.consumer.ConsumerConfig;  
import org.apache.kafka.common.serialization.StringDeserializer;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.annotation.EnableKafka;  
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;  
import org.springframework.kafka.config.KafkaListenerContainerFactory;  
import org.springframework.kafka.core.ConsumerFactory;  
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;  
  
import cn.disruptive.bigDataPlatform.agentServer.config.ReadConfigation;  
  
  
/** 
 *  
 * Description:Kafka消费者 
 * Date:        2017年7月11日 
 *  
 * 依赖: 
        <dependency> 
            <groupId>org.springframework.kafka</groupId> 
            <artifactId>spring-kafka</artifactId> 
            <version>1.0.5.RELEASE</version> 
        </dependency> 
 *  
 * 使用案例: 
    @KafkaListener(topics = { "taskCmd" }) 
    public void taskCmd(ConsumerRecord<?, ?> record) { 
        Object message = record.value(); 
        logger.info("收到管理平台命令:" + message); 
    } 
 *  
 */  
  
@Configuration  
@EnableKafka  
public class KafkaConsumer {  
  
    /** 
     *  
     * Description:获取配置 
     * Date:        2017年7月11日 
     * @author      shaqf 
     */  
    public Map<String, Object> consumerConfigs() {  
        Map<String, Object> props = new HashMap<>();  
        // kafka.metadata.broker.list=10.16.0.214:9092,10.16.0.215:9092,10.16.0.216:9092  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ReadConfigation.getConfigItem("kafka.metadata.broker.list"));  
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");  
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");  
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "Agent-Server-1.0.2");  
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  
        return props;  
    }  
  
    /** 获取工厂 */  
    public ConsumerFactory<String, String> consumerFactory() {  
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());  
    }  
  
    /** 获取实例 */  
    @Bean  
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {  
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();  
        factory.setConsumerFactory(consumerFactory());  
        factory.setConcurrency(3);  
        factory.getContainerProperties().setPollTimeout(3000);  
        return factory;  
    }  
}
还有一种方式,直接在配置文件里面添加Kafka地址即可,以上代码都可以省略  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: