zoukankan      html  css  js  c++  java
  • SpringCloudStream集成kafka

    Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。

     

    一、引入依赖包

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    

    二、自定义信息通道

    官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。
    @Input注解标识一个输入通道
    @Output注解标识一个输出通道
    通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。
    如下我们自定义信息通道EsChannel

    /**
     * 自定义信息通道
     * @author 47Gamer
     * @date 2019/9/26 14:54
     */
    public interface EsChannel {
        /**
         * 缺省发送消息通道名称
         */
        String ES_DEFAULT_OUTPUT = "es_default_output";
    
        /**
         * 缺省接收消息通道名称
         */
        String ES_DEFAULT_INPUT = "es_default_input";
    
        /**
         * 告警发送消息通道名称
         */
        String ES_ALARM_OUTPUT = "es_alarm_output";
    
        /**
         * 告警接收消息通道名称
         */
        String ES_ALARM_INPUT = "es_alarm_input";
    
        /**
         * 缺省发送消息通道
         * @return channel 返回缺省信息发送通道
         */
        @Output(ES_DEFAULT_OUTPUT)
        MessageChannel sendEsDefaultMessage();
    
        /**
         * 告警发送消息通道
         * @return channel 返回告警信息发送通道
         */
        @Output(ES_ALARM_OUTPUT)
        MessageChannel sendEsAlarmMessage();
    
        /**
         * 缺省接收消息通道
         * @return channel 返回缺省信息接收通道
         */
        @Input(ES_DEFAULT_INPUT)
        MessageChannel recieveEsDefaultMessage();
    
        /**
         * 告警接收消息通道
         * @return channel 返回告警信息接收通道
         */
        @Input(ES_ALARM_INPUT)
        MessageChannel recieveEsAlarmMessage();
    }
    

    三、@EnableBinding使应用程序连接到消息代理

    @EnableDiscoveryClient
    @SpringBootApplication
    @EnableFeignClients
    @EnableHystrix
    @MapperScan(basePackages = "com.es.mapper")
    @EnableBinding(EsChannel.class)
    public class EsOnenetApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(EsOnenetApplication.class, args);
        }
    }
    

    四、SpringCloudStream及kafka配置

    #==============================================================
    #spring-cloud-stream-Kafka配置 开始
    #==============================================================
    #是否开启kafka(非spring-cloud-stream配置)
    spring.kafka.enabled=false
    #缺省的输入、输出通道
    spring.cloud.stream.bindings.es_default_input.destination=es_default_topic
    spring.cloud.stream.bindings.es_default_input.binder=kafka
    spring.cloud.stream.bindings.es_default_input.group=es_default_group
    
    spring.cloud.stream.bindings.es_default_output.destination=es_default_topic
    spring.cloud.stream.bindings.es_default_output.binder=kafka
    
    #入站消费者的并发性
    spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2
    
    #告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义)
    spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic
    spring.cloud.stream.bindings.es_alarm_input.binder=kafka
    spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group
    
    spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic
    spring.cloud.stream.bindings.es_alarm_output.binder=kafka
    
    #kafka配置
    spring.cloud.stream.kafka.binder.brokers=172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092
    spring.cloud.stream.kafka.binder.zkNodes=172.*.*.6:2181,172.*.*.7:2181,172.*.*.8:2181
    spring.cloud.stream.kafka.binder.requiredAcks=1
    #==============================================================
    #spring-cloud-stream-Kafka配置 结束
    #==============================================================
    从上面配置可以看出
    1、定义了通道名称及分组,binder代表绑定实现的标识名称(如kafka或者rabbit),与3中的定义名称相对应。
    2、定义了入站消费者的并发性,指在一个实例内的并发性,不同实例之间本身就是并发的,默认值为1
    spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
    3、定义了kafka连接信息
    如果未配置autoCommitOffset,默认自动提交偏移量
    详细参数配置可参考官网

    五、发送消息到输出通道

    /**
     * kafka消息发送器
     * @author 47Gamer
     * @date 2019/9/26 17:50
     */
    @Component
    public class EsKafkaMessageSender {
        @Autowired
        private EsChannel channel;
    
        /**
         * 消息发送到默认通道:缺省通道对应缺省主题
         * @param message
         */
        public void sendToDefaultChannel(String message){
            channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
        }
    
        /**
         * 消息发送到告警通道:告警通道对应告警主题
         * @param message
         */
        public void sendToAlarmChannel(String message){
            channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
        }
    }
    注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。

    六、从输入通道订阅消息

    @EnableBinding(value = EsChannel.class)
    public class EsStreamListener {
    
        /**
         * 从缺省通道接收消息
         * @param message
         */
        @StreamListener(EsChannel.ES_DEFAULT_INPUT)
        public void receive(Message<String> message){
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message);
            try {
                Thread.sleep(1000*10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息");
        }
    
        /**
         * 从告警通道接收消息
         * @param message
         */
        @StreamListener(EsChannel.ES_ALARM_INPUT)
        public void receiveAlarm(Message<String> message){
            System.out.println("订阅告警消息:" + message);
        }
    }

    从不同的通道实现消息的订阅。

    七、这样完整的消息系统就搭建好了,定义Controller发送消息测试

    @ApiOperation(value = "test1", httpMethod = "POST")
        @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8")
        public void test1(String message, HttpServletRequest request,
                                 HttpServletResponse response) {
            sender.sendToDefaultChannel(message);
            sender.sendToDefaultChannel(message);
            sender.sendToDefaultChannel(message);
            sender.sendToDefaultChannel(message);
        }
    
        @ApiOperation(value = "test", httpMethod = "POST")
        @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
        public void test2(String message, HttpServletRequest request,
                          HttpServletResponse response) {
            sender.sendToAlarmChannel(message);
        }
    

    test1:发送消息的缺省消息通道
    test2:发送消息到告警消息通道

    八、并发性测试

    如七中所示,一次发送4条消息到缺省消息通道中,并启动两个实例(即两个微服务组成一个小型集群),在并发性配置为1的情况下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1

    实例1:
    2019-09-30 11:13:14------start--------默认消息...
    2019-09-30 11:13:24------end--------默认消息
    

    实例2:

    2019-09-30 11:13:14------start--------默认消息:...
    2019-09-30 11:13:24------end--------默认消息
    
    2019-09-30 11:13:24------start--------默认消息:...
    2019-09-30 11:13:34------end--------默认消息
    
    2019-09-30 11:13:34------start--------默认消息:...
    2019-09-30 11:13:44------end--------默认消息
    

    通过打印日志(日志做了简化处理)可以看出,两个实例之间是做到了并发消费,但是在1个实例内部,并没有并发消费。
    如果将concurrency修改为2.
    日志如下
    实例1:

    2019-09-30 11:31:13------start--------:...
    2019-09-30 11:31:13------start--------默认消息:...
    2019-09-30 11:31:23------end--------默认消息
    2019-09-30 11:31:23------end--------默认消息
    2019-09-30 11:31:23------start--------默认消息:...
    2019-09-30 11:31:33------end--------默认消息
    

    实例2:

    2019-09-30 11:31:13------start--------默认消息:...
    2019-09-30 11:31:23------end--------
    

    从日志可以看出,实例1中实现了两个线程的并发消费。

  • 相关阅读:
    LaZagne — 一键抓取目标机器上的所有明文密码,todo,自己手动试试效果
    jsch文件复制(拷贝)
    使用Apache commons-pool2实现高效的FTPClient连接池的方法
    室内定位会议和期刊(2020-2021)
    Axure RP 9.0 Team Edition
    新电脑 windows 10 登录 执行此操作需要Internet
    maven配置,以及项目 Dependency xxxx not found 解决过程
    MySQL 下载网站
    Tomcat8.5和Tomcat9安装SSL证书的教程
    Python id() 函数
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13751035.html
Copyright © 2011-2022 走看看