zoukankan      html  css  js  c++  java
  • springboot 1.5.2 集成kafka 简单例子

    添加依赖

    compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE")

    添加application.properties

    #kafka
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=192.168.59.130:9092,192.168.59.131:9092,192.168.59.132:9092
    # 指定默认消费者group id
    spring.kafka.consumer.group-id=myGroup
    # 指定默认topic id
    spring.kafka.template.default-topic= my-replicated-topic
    # 指定listener 容器中的线程数,用于提高并发量
    spring.kafka.listener.concurrency= 3
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size= 1000

    configuration 启用kafka

    package cn.xiaojf.today.data.kafka.configuration;
    
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    
    /**
     * kafka 配置
     * @author xiaojf 2017/3/24 14:09
     */
    @Configuration
    @EnableKafka
    public class KafkaConfiguration {
    
    
    }

    消息生产者

    package cn.xiaojf.today.data.kafka.producer;
    
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaOperations;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.ProducerListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息生产者
     * @author xiaojf 2017/3/24 14:36
     */
    @Component
    public class MsgProducer {
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        public void send() {
            kafkaTemplate.send("my-replicated-topic","xiaojf");
            kafkaTemplate.send("my-replicated-topic","xiaojf");
    
            kafkaTemplate.metrics();
    
            kafkaTemplate.execute(new KafkaOperations.ProducerCallback<String, String, Object>() {
                @Override
                public Object doInKafka(Producer<String, String> producer) {
                    //这里可以编写kafka原生的api操作
                    return null;
                }
            });
    
            //消息发送的监听器,用于回调返回信息
            kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
                @Override
                public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
    
                }
    
                @Override
                public void onError(String topic, Integer partition, String key, String value, Exception exception) {
    
                }
    
                @Override
                public boolean isInterestedInSuccess() {
                    return false;
                }
            });
        }
    }

    消息消费者

    package cn.xiaojf.today.data.kafka.consumer;
    
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息消费者
     * @author xiaojf 2017/3/24 14:36
     */
    @Component
    public class MsgConsumer {
        @KafkaListener(topics = {"my-replicated-topic","my-replicated-topic2"})
        public void processMessage(String content) {
            System.out.println(content);
        }
    
    
    }
  • 相关阅读:
    maven的安装和配置以及搭建项目应用
    Spring MVC与Struts2的区别(仅本人浅薄的理解)?
    记录学习PYTHON
    Zookeeper可以干什么
    Redis与Memcache的区别
    Redis持久化的两种方式和区别
    Scala 柯里化
    Redis与Memcached的区别
    高并发的处理策略
    序列化
  • 原文地址:https://www.cnblogs.com/xiaojf/p/6613559.html
Copyright © 2011-2022 走看看