zoukankan      html  css  js  c++  java
  • Kafka及Spring Cloud Stream

    安装

    下载kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz 

    kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

    在kafka server端 config/server.properties中设置

    必须要配置:

    advertised.listeners=PLAINTEXT://192.168.3.201:9092    # 公布访问地址和端口 

    启动kafka 

     bin/kafka-server-start.sh ../config/server.properties 

    检测是否启动 

    netstat -tunlp | egrep " (2181|9092)"

    或 lsof -i:9092

    测试发送信息和消费消息

    创建主题

    ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 - topic test

    生产者 

     ./kafka-console-producer.sh --broker-list localhost:9092 --topic test 

    消费者 

    ./kafkaconsole-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    如果想在外部使用kafka必须 9092 端口加入到防火墙列表

    firewall-cmd --list-ports 查询所有放行端口
    firewall-cmd --add-port=9092/tcp # 临时端口放行
    firewall-cmd --add-port=9092/tcp --permanent # 永久放行
    firewall-cmd --reload # 重新载入放行列表

    简单API的应用 

    引入依赖

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>

    编写生成者

    package com.example.springkafka.api;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    /**
     * @Date: 2018/11/6 20:25
     * @Description: 生产者
     */
    public class KafkaProducerDemo {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers","192.168.3.221:9092");
            properties.setProperty("key.serializer", StringSerializer.class.getName());
            properties.setProperty("value.serializer", StringSerializer.class.getName());
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            String topic = "message"; // 主题
            Integer partition = 0; // 指定分区
            long timeMillis = System.currentTimeMillis(); // 毫秒值 15分钟
            String key = "key-message"; // key
            String value = "value-message"; // value
            // 创建ProducerRecord
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, partition, timeMillis, key, value);
            // 生产消息
            kafkaProducer.send(producerRecord);
            kafkaProducer.close();
        }
    }

    编写消费者

    package com.example.springkafka.api;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * @Date: 2018/11/6 20:25
     * @Description: 消费者
     */
    public class KafkaConsumerDemo {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "192.168.3.221:9092");
            properties.setProperty("group.id", "group-1");
            properties.setProperty("key.deserializer", StringDeserializer.class.getName());
            properties.setProperty("value.deserializer", StringDeserializer.class.getName());
            // 创建kafka的消费者对象
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
            // 订阅kafka主题
            kafkaConsumer.subscribe(Arrays.asList("message"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("========offset = %d, key = %s, value = %s
    ", record.offset(), record.key(), record.value());
            }
        }
    }

     spring kafka

    依赖

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>

    生成者与消费者配置

    # 生成者配置
    spring:
      kafka:
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        bootstrap-servers: 192.168.3.221:9092
        consumer: # 消费者
          group-id: gerry-1
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    kafka:
      topic: gerry

    生成者代码

    package com.example.springcloudkafka.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @Date: 2018/11/6 21:03
     * @Description:
     */
    @RestController
    public class KafkaProducerController {
        public final KafkaTemplate<String, String> kafkaTemplate;
        private final String topic;
    
        public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
                                       @Value("${kafka.topic}") String topic) {
            this.kafkaTemplate = kafkaTemplate;
            this.topic = topic;
        }
    
        @PostMapping("message/send") // 这种方式只支持post
        public boolean sendMessage(@RequestParam String message) {
            kafkaTemplate.send(topic,message);
    
            return true;
        }
    }

    消费者代码

    package com.example.springcloudkafka.listener;
    
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Date: 2018/11/6 21:20
     * @Description:
     */
    @Component
    public class KafkaConsumerListener {
    
        @KafkaListener(topics={"${kafka.topic}"})
        public void getMessage(String message) {
            System.out.println("kafka 消费者监听,接收到消息:" + message);
        }
    }

    Spring Cloud Stream 

    官方定义三个接口
    Source=> 发送者 Producer、Publisher
    Sink=> 接收器 Consumer、 Subscriber Processor: 上流而言Sink、下流而言Souce

    Spring Cloud Stream Binder: Kafka 

    引入依赖:

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>

    配置:

    # 生成者配置
    spring:
      kafka:
        bootstrap-servers: 192.168.3.221:9092
      cloud:
        stream:
          bindings:
            output:
              destination: ${kafka.topic}
            input:
              destination: ${kafka.topic}
    kafka:
      topic: cloud-stream

    生产者:

    package com.example.springcloudstreamkafkademo.producer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    @Component
    @EnableBinding(Source.class)
    public class MessageProducerBean {
        @Autowired
        @Qualifier(Source.OUTPUT)
        private MessageChannel messageChannel;
    
        @Autowired
        private Source source;
    
        /**
         * 发送信息
         * @param message
         */
        public void send(String message) {
            // 通过消息管道发送消息
            // messageChannel.send(MessageBuilder.withPayload(message).build());
            source.output().send(MessageBuilder.withPayload(message).build());
        }
    }

    消费者

    package com.example.springcloudstreamkafkademo.consumer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.SubscribableChannel;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    @Component
    @EnableBinding(value={Sink.class})
    public class MessageConsumerBean {
        @Autowired
        @Qualifier(Sink.INPUT)
        private SubscribableChannel subscribableChannel;
    
        //1、 当subscribableChannel注入完成后完成回调
        @PostConstruct
        public void init() {
            subscribableChannel.subscribe(message->{
                System.out.println(message.getPayload());
            });
        }
        // 2、@ServiceActivator
        @ServiceActivator(inputChannel=Sink.INPUT)
        public void message(String message) {
            System.out.println("@ServiceActivator:"+message);
        }
        //3、@StreamListener
        @StreamListener(Sink.INPUT)
        public void onMessage(String message) {
            System.out.println("@StreamListener:"+message);
        }
    }

    Spring Cloud Stream Binder: RabbitMQ 

     引入依赖

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>

    配置

    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: ${rabbit.queue}
            input:
              destination: ${rabbit.queue}
      rabbitmq:
        host: 192.168.3.221
        port: 5672
        username: rabbit
        password: rabbit
    rabbit:
      queue: cloud-stream-queue

    代码同kafka

    完整代码详见:https://gitee.com/lm970585581/cloud-config/tree/master/Spring%20Cloud%20Stream%20

  • 相关阅读:
    markdown常用语法
    利用 js-xlsx 实现选择 Excel 文件在页面显示
    HTML中meta标签
    wxpy模块
    Python基础(3)
    Python基础(2)
    Python基础(1)
    Python之递归锁与互斥锁
    Python进程与线程
    Docker
  • 原文地址:https://www.cnblogs.com/lm970585581/p/9920978.html
Copyright © 2011-2022 走看看