zoukankan      html  css  js  c++  java
  • springboot集成kafka

    一、下载

      参考文档

      a.中文社区 http://kafka.apachecn.org

      b.下载地址 http://kafka.apache.org/downloads

       

          

    二、安装kafka

      解压: tar -zxvf kafka_2.12-2.2.0.tgz

     

      Zookeeper下载地址:http://ftp.man.poznan.pl/apache/

      修改配置文件名称

      添加环境变量vi /etc/profile

      修改zookeeper配置数据和日志目录

      启动

      查看Zookeeper启动状态:是否成功

      或zkCli.sh:  能连接到zookeeper server

      修改kafka配置server.properties: 只需要修改zk地址即可

    broker.id=0

    listeners=PLAINTEXT://172.22.64.45:9092

    log.dirs=/opt/kafka/logDir/

    zookeeper.connect=172.22.64.45:2181

      

      添加环境变量

      启动:./kafka-server-start.sh ../config/server.properties

      或者

    二、springboot整合kafka

    (1)、pom文件配置

    <dependency>

       <groupId>org.springframework.kafka</groupId>

       <artifactId>spring-kafka</artifactId>

       <version>2.2.0.RELEASE</version>

    </dependency>

                         

    (2)、application.properties 添加kafka配置

    kafka.consumer.zookeeper.connect=172.22.64.45:2181

    kafka.consumer.servers=172.22.64.45:9092
    kafka.consumer.enable.auto.commit=true
    kafka.consumer.session.timeout=6000
    kafka.consumer.auto.commit.interval=100
    kafka.consumer.auto.offset.reset=latest
    kafka.consumer.topic=test
    kafka.consumer.group.id=test
    kafka.consumer.concurrency=10

    kafka.producer.servers=172.22.64.45:9092
    kafka.producer.retries=0
    kafka.producer.batch.size=4096
    kafka.producer.linger=1
    kafka.producer.buffer.memory=40960

          

    (3)、生产者配置

    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {

        @Value("${kafka.producer.servers}")
        private String servers;
        @Value("${kafka.producer.retries}")
        private int retries;
        @Value("${kafka.producer.batch.size}")
        private int batchSize;
        @Value("${kafka.producer.linger}")
        private int linger;
        @Value("${kafka.producer.buffer.memory}")
        private int bufferMemory;

        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }

        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    (4)、消费者配置

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {

        @Value("${kafka.consumer.servers}")
        private String servers;
        @Value("${kafka.consumer.enable.auto.commit}")
        private boolean enableAutoCommit;
        @Value("${kafka.consumer.session.timeout}")
        private String sessionTimeout;
        @Value("${kafka.consumer.auto.commit.interval}")
        private String autoCommitInterval;
        @Value("${kafka.consumer.group.id}")
        private String groupId;
        @Value("${kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
        @Value("${kafka.consumer.concurrency}")
        private int concurrency;

        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(concurrency);
            factory.getContainerProperties().setPollTimeout(1500);
            return factory;
        }

        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }


        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            return propsMap;
        }

        @Bean
        public Listener getListener(){
            return new Listener();
        }
    }

      

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

                           

    (5)、编写监听

    public class Listener {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());

        @KafkaListener(topics = {"test"})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            logger.info(">>>>>>>>>> record =" + kafkaMessage);
            if(kafkaMessage.isPresent()){
                //得到Optional实例中的值
                Object message = kafkaMessage.get();
                logger.info("kafka的key: " + record.key());
                logger.info("kafka的value: " + record.value().toString());
            }
        }
    }

      

     

     

     

     

     

     

     

    (6)、Controller访问

    @RestController
    public class ProducerController {

        protected static final Logger logger = LoggerFactory.getLogger(ProducerController.class);

        @Autowired
        private KafkaTemplate kafkaTemplate;

        @RequestMapping(value = "send",method = {RequestMethod.GET})
        public String SendMsg(HttpServletRequest request, HttpServletResponse response){
            String message = request.getParameter("message");
            logger.info("kafka的消息={}",message);
            kafkaTemplate.send("test", "key", message);
            logger.info("发送kafka成功.");
            return "sucess";
        }
    }

     

    参考:https://gitee.com/pikapi/springboot.git

  • 相关阅读:
    在包a中新建一个类A,在类A中有一个int add(int m)方法,用来求1+2+…+m 的和。在包b中新建一个类B,在类B中有一个int cheng(int n)方法,用来求n! 的结果。在包c中新建一个主类C,调用A、B中的方法输出1+2+…+30的和, 以及5!的计算结果。
    在包a中编写一个类Father,具有属性:年龄(私有)、姓名(公有); 具有功能:工作(公有)、开车(公有)。 在包a中编写一个子类Son,具有属性:年龄(受保护的)、姓名; 具有功能:玩(私有)、学习(公有)。 最后在包b中编写主类Test,在主类的main方法中测试类Father与类Son。
    简单且线程安全的两个单例模式java程序
    Condition的优点
    Java设计模式—生产者消费者模式(阻塞队列实现)
    Java NIO使用及原理分析 (一)
    一步步优化JVM五:优化延迟或者响应时间(1)
    UML 类图
    Eclipse中构建Fluent风格到Formatter
    Memcache 问题集锦
  • 原文地址:https://www.cnblogs.com/yaozhixiang/p/11856572.html
Copyright © 2011-2022 走看看