zoukankan      html  css  js  c++  java
  • Spring Cloud Alibaba RocketMQ 消息驱动

    比如你的订单系统,平时每秒最多能处理一万单请求,但促销活动的时候可能会有五万个请求,不限制会导致系统崩溃,限制,导致四万个订单失败。可以用消息队列来做请求缓冲,异步平缓的处理请求,实现流量削峰。

    SpringClud 已经为我们提供了消息驱动框架 SpringCloud Stream。Stream定义了一个消息模型,对消息中间件进行一步封装,可以做到代码层面对中间件的无感知,使得微服务开发高度解耦。

    1.消息系统通用模型

    2.RocketMQ 架构

    3.RocketMQ 环境搭建

    RocketMQ 部署结构中主要包括

    NameServer - Producer 和 Consumer 通过 NameServer 查找 Topic 所在的 Broker。
    Broker - 负责消息的存储、转发。
    部署完 NameServer、Broker 之后,RocketMQ 就可以正常工作了,但所有操作都是通过命令行,不太方便,所以我们还需要部署一个扩展项目 rocketmq-console,可以通过web界面来管理 RocketMQ。

    下载地址:http://rocketmq.apache.org/dowloading/releases/

    解压编译

    unzip rocketmq-all-4.7.0-source-release.zip
    cd rocketmq-all-4.7.0-source-release
    mvn -Prelease-all -DskipTests clean install -U

    创建配置文件: conf/broker.properties

     brokerIP1=【你的IP】

    启动 NameServer,nohup后台启动,>> nameserver.log 2> 指定日志生成文件

    > cd distribution/target/rocketmq-4.6.0/rocketmq-4.6.0
    > nohup sh bin/mqnamesrv >> nameserver.log 2>&1 &

    查看日志文件

    tail -f nameserver.log

    启动 Broker

    nohup sh bin/mqbroker -n IP:9876 -c conf/broker.properties >>broker.log 2>&1 &

    查看日志文件,有可能会没有足够内存而报错。

    tail -f broker.log

    解决内存不足的问题,修改 bin/runbroker.sh ,把内存参数改小一点。

    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

    进行测试,新建两个终端窗口。

    //进行生产消息
    export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    //用于消费消息
    export NAMESRV_ADDR=localhost:9876
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    一个 rocketmq 的扩展项目,其中的 rocketmq-console 是控制台,下载项目:

    https://github.com/apache/rocketmq-externals

    配置

    cd rocketmq-console
    vim src/main/resources/application.properties

    vim src/main/resources/application.properties

    设置 console 的端口

    找到 rocketmq.config.namesrvAddr ,填上自己的地址端口

    或者直接运行jar

    java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 -rocketmq.config.namesrvAddr=192.168.31.113.9876

    启动成功 访问页面地址:http://192.168.31.113:8080

    4.RocketMQ 生产者与消费者开发

    开发步骤 - Producer(生产者)

    添加 RocketMQ 依赖

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
    </dependency>

    RocketMQ 配置

    rocketmq:
     name-server: 192.168.31.113:9876
     producer:
       group: test-group

    创建消息实体

    public class User {
     Long id;
     String name;
     public User(){}
     public User(Long id, String name) {
     this.id = id; this.name = name;
     }
     // setter/getter
     @Override
     public String toString() {
     return "User{id=" + id +", name='" + name + "'}";
     }
    }

    使用 RocketMQTemplate 发送消息

        @RestController
        public class TestController {
            @Autowired
            RocketMQTemplate rocketMQTemplate;
            @GetMapping("/sendmsg")
            public String sendmsg(Long id, String name){
                rocketMQTemplate.convertAndSend("topic-test", new User(id, name));
                return "ok";
            }
        }

    开发步骤- Consumer(消费者)

    添加 RocketMQ 依赖

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
    </dependency>

    使用 RocketMQListener 接收消息

        @Service
        @RocketMQMessageListener(consumerGroup = "group-consumer", topic = "topic-test")
        public class MyMQConsumer implements RocketMQListener<User> {
            @Override
            public void onMessage(User user) {
                // consume logic
                System.out.println(user);
            }
        }

     5.RocketMQ 实现分布式事务

     分布式事务的解决方案中,有一个可靠消息模式,就是使用消息队列来实现的。这个方案的关键点:怎么保证本地事务与发送消息保持一直,本地成功 & 发送成功 || 本地失败 & 发送失败

     事务型的生产者

     代码实现第一步,发送事务消息。

        @GetMapping("/tx/test")
        public String sendTxMsg() {
            rocketMQTemplate.sendMessageInTransaction("topic-tx",
                    MessageBuilder.withPayload("hi").build(), null);
            return "ok";
        }

    Producer 事务消息监听器

    package com.example.demo;
    
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    @Component
    @RocketMQTransactionListener
    public class TxMsgListener implements RocketMQLocalTransactionListener {
    
         //本地事务的方法
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
            System.out.println("executeLocalTransaction ...");
    
            RocketMQLocalTransactionState state = RocketMQLocalTransactionState.ROLLBACK;
    
            try{
                Thread.sleep(60* 1000);
                state = RocketMQLocalTransactionState.COMMIT;
            }catch (Exception e){
                e.printStackTrace();
            }
    
            System.out.println("executeLocalTransaction return : " + state);
    
            return state;
        }
    
        //本地事务检查执行结果的方法
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            System.out.println("checkLocalTransaction ...");
            return RocketMQLocalTransactionState.COMMIT;
        }
    }

    实验场景
    1. 本地事务正常,提交事务消息,Consumer 接收
    2. 本地事务失败,回滚事务消息,Consumer 未接收
    3. 本地事务没返回,mq 回查,Consumer 接收
    上面测试第 3 个场景的时候,Consumer 会收到 2 次消息,可能导致重复增加积分。保证消息不被重复处理 ,就是“幂等”幂等是一个数学概念,可以理解为:同一个函数,参数相同的情况下,多次执行后的结果一致
    解决方法:Consumer 端建立一个判重表,每次收到消息后先,先到判重表中看一下,看这条消息是否处理过。


    6.SpringCloud Stream 开发模型

     

     SpringCloud Stream 生产与消费开发实践

    1. 创建一个 stream-producer,集成 SpringCloud Stream,绑定 RocketMQ,发送消息
    2. 创建一个 stream-Consumer,集成 SpringCloud Stream,绑定 RocketMQ,接收消息

     stream-producer

     添加 stream-rocketmq 依赖:

    <dependency>
     <groupId>com.alibaba.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>

    添加 stream-rocketmq 依赖:

    spring:
     cloud:
       stream:
         rocketmq:
           binder:
             name-server: 49.235.54.12:9876
    bindings:
      output:
         destination: topic-test-stream
         group: stream-consumer-group

    开启 Binding:@EnableBinding(Source.class)

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class StreamproducerApplication {
     public static void main(String[] args) {
       SpringApplication.run(StreamproducerApplication.class, args);
     }
    }

     发送消息

    @Autowired
    Source source;
    
    @GetMapping("teststream")
    public String testStream(){
     source.output().send(MessageBuilder.withPayload("msg").build());
     return "ok";
    }

    stream-consumer

    添加 stream-rocketmq 依赖

    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>

    rocketmq binder、binding destination 属性配置:

    spring:
     cloud:
       stream:
         rocketmq:
           binder:
             name-server: 49.235.54.12:9876
    bindings:
      input:
        destination: topic-test-stream
        group: stream-consumer-group

    开启 Binding:

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class StreamconsumerApplication {
    public static void main(String[] args) {
      SpringApplication.run(StreamconsumerApplication.class, args);
     }
    }

    接收消息

    @Service
    public class MyStreamConsumer {
      @StreamListener(Sink.INPUT)
      public void receive(String msg){
      // consume logic
      System.out.println("receive: " + msg);
     }
    }

    消息过滤
    Consumer 可能希望处理具有某些特征的消息,这就需要对消息进行过滤。最简单的方法就是收到消息后自己判断一下 if ... else ...为了简化开发,Stream 提供了消息过滤的方式,在 Listener 注解中加一个判断条件即可:

    @Service
    public class MyStreamConsumer {
     @StreamListener(value = Sink.INPUT,
      condition = "headers['test-header']=='my test'")
     public void receive(String msg){
       System.out.println("receive: " + msg);
     }
    }

    消息监控

    收发消息不正常时怎么办?可以查看监控信息actuator 中有 binding 信息、健康检查信息,为我们提供排错依据

    /actuator/bindings
    /actuator/health
    /actuator/channels

    7.SpringCloud Stream 自定义接口

    上节通过 Stream 发送消息的方式:配置文件中指定了“bindings.output”,使用注解开启了 binding“@EnableBinding(Source.class)”就可以使用“Source”发送消息了。这种默认的自动化方式非常便利,但是,如果想再加一个“output”通道怎么办?

     producer 添加 output 配置

    server:
      port: 8081
    spring:
      application:
        name: stream-producer
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.31.113:9876
          bindings:
            output:
              destination: topic-test-stream
            my-output:
              destination: topic-test-stream-myoutput

    producer 创建 source 接口

    package com.example.demo;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    public interface MySource {
        @Output("my-output")
        MessageChannel output();
    }

    producer 启用自定义 source

    package com.example.demo;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @EnableBinding({Source.class, MySource.class})
    public class StreamproducerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(StreamproducerApplication.class, args);
        }
    
    }

    producer 发送消息

    package com.example.demo;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class TestController {
    
        @Autowired
        Source source;
    
        @Autowired
        MySource mySource;
    
        @GetMapping("testmysource")
        public String testmysource(String msg){
            mySource.output().send(MessageBuilder.withPayload(msg).build());
            return "ok";
        }
    
        @GetMapping("teststream")
        public String teststream(String msg){
            source.output().send(MessageBuilder.withPayload(msg)
                    .setHeader("test-header", "my test").build());
            return "ok";
        }
    
        @GetMapping("/hi")
        public String hi() {
            return "hi";
        }
    
        @GetMapping("/hello")
        public String hello(@RequestParam String name) {
            return "hello " + name + "!";
        }
    }

    自定义 sink

    consumer 添加 iutput 配置

    server:
      port: 8082
    spring:
      application:
        name: stream-consumer
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.31.113:9876
          bindings:
            input:
              destination: topic-test-stream
              group: stream-consumer-group
            my-input:
              destination: topic-test-stream-myoutput
              group: my-group

    consumer  创建 sink 接口

    package com.example.demo;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface MySink {
        String INPUT = "my-input";
    
        @Input(INPUT)
        SubscribableChannel input();
    }

    consumer  启用自定义 sink

    package com.example.demo;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @EnableBinding({Sink.class,MySink.class})
    public class StreamconsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(StreamconsumerApplication.class, args);
        }
    
    }

    consumer 接收消息

    package com.example.demo;
    
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MyStreamConsumer {
        @StreamListener(value= Sink.INPUT,
            condition = "headers['test-header']=='my test'")
        public void receive(String msg){
            System.out.println("receive: " + msg);
        }
    
        @StreamListener(value= MySink.INPUT)
        public void receive_myinput(String msg){
            System.out.println("receive: " + msg);
        }
    }

    8.SpringCloud Stream 消费异常处理

    消费者在接收消息时,可能会发生异常,如果我们想处理这些异常,需要采取一些处理策略,可以分为:
    1. 应用级处理 - 通用,与底层 MQ 无关
    2. 系统级处理 - 根据不同的 MQ 特性进行处理,例如 RabbitMQ 可以放入死信队列
    3. 重试 RetryTemplate - 配置消费失败后如何重试

    本节我们学习最通用的“应用级处理”策略,此方式又分为:局部处理方式(某个消息组),全局处理方式。

    配置文件

    server:
      port: 8080
    spring:
      application:
        name: consumer-exception
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.31.113:9876
          bindings:
            input:
              destination: stream-exception
              group: group-exception

    开启注解添加绑定

    package com.example.demo;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class ConsumerexceptionApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ConsumerexceptionApplication.class, args);
        }
    
    }

    创建消息监听器

    package com.example.demo;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.support.ErrorMessage;
    import org.springframework.stereotype.Service;
    
    @Service
    @Slf4j
    public class MyStreamConsumer {
        @StreamListener(Sink.INPUT)
        public void receive(String msg){
            log.info("msg: {}", msg);
            throw new IllegalArgumentException("param error");
        }
    
        @StreamListener("errorChannel")
        public void handleError(ErrorMessage errorMessage){
            log.error("全局异常. errorMsg: {}", errorMessage);
        }
    
    //    @ServiceActivator(
    //            inputChannel = "stream-exception.group-exception.errors"
    //    )
    //    public void handleError(ErrorMessage errorMessage){
    //        log.error("局部异常. errorMsg: {}", errorMessage);
    //    }
    
    
    }

    9.SpringCloud Stream 消费组

    线上环境中,一个服务通常都会运行多个实例,以保证高可靠,对于消费服务,运行多个实例的时候,每个实例就都会去消费消息,造成重复消费,设置 Consumer Group(消费组)可以实现组内消费者均衡消费。本节我们就学习消费组的设置,体验其效果。

    consumer-group 配置文件

    server:
      port: 8082
    spring:
      application:
        name: consumer-group
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 192.168.31.113:9876
          bindings:
            input:
              destination: topic-consumer-group
              group: test-group

    添加注解

    package com.example.demo;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class ConsumergroupApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ConsumergroupApplication.class, args);
        }
    
    }

    消息监听器

    package com.example.demo;
    
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MyConsumer {
        @StreamListener(Sink.INPUT)
        public void receive(String msg)
        {
            System.out.println("msg: " + msg);
        }
    }

    10.SpringCloud Stream 消息分区

    消息被哪个实例消费是不一定的,但如果我们希望同一类的消息被同一个实例消费怎么办?例如同一个用户的订单消息希望被同一个示例处理,这样更便于统计。SpringCloud Stream 提供了消息分区的功能,可以满足这个场景的需求,本节我们就学习如何使用。

    创建1个 Producer 一直发送消息,设置消息如何分区
    创建1个 Consumer 接收消息,设置按分区接收消息
    启动4个 Consumer 实例,指定分区标识,同一分区的消息应被相同的 Consumer 实例接收

    producer 使用 rabbitmq

    producer 配置文件

    server:
      port: 8081
    spring:
      application:
        name: partition-producer
      cloud:
        stream:
          default-binder: rabbit
          bindings:
            output:
              destination: topic-test-stream-partition
              producer:
                partition-key-expression: headers['partitionKey'] - 1
                partition-count: 4
      rabbitmq:
        addresses: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: /

    producer TestController 

    package com.example.demo;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Random;
    
    @RestController
    public class TestController {
    
        @Autowired
        Source source;
    
        // 消息内容
        private final String[] data = new String[]{
                "f", "g", "h",
                "fo", "go", "ho",
                "foo", "goo", "hoo",
                "fooz", "gooz", "hooz"
        };
    
        @GetMapping("/produce")
        public String produce() {
            for (int i = 0; i < 100; i++) {
    
                try {
                    // 随机从 data 数组中获取一个字符串,作为消息内容
                    Random RANDOM = new Random(System.currentTimeMillis());
                    String value = data[RANDOM.nextInt(data.length)];
                    System.out.println("Sending: " + value);
    
                    // 发送消息
                    source.output().send(
                            MessageBuilder.withPayload(value)
                                    // 设置头信息 partitionKey,值为字符串的长度
                                    .setHeader("partitionKey", value.length())
                                    .build());
    
    
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return "ok";
        }
    }

    consumer 配置文件

    server:
      port: 9003
    spring:
      application:
        name: partition-consumer
      cloud:
        stream:
          default-binder: rabbit
          bindings:
            input:
              destination: topic-test-stream-partition
              group: stream-test-partition
              consumer:
                partitioned: true
          instance-index: 3
          instance-count: 4
      rabbitmq:
        addresses: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: /
  • 相关阅读:
    bzoj3110
    idea 设置系列 各种乱码
    vim 系列
    idea 神键
    简单工厂,工厂方法,抽象工厂
    log4 按包进行日志输出
    maven依赖本地宝
    kafka 理论学习
    kafka windows环境搭建 测试
    linux 查找文件的命令
  • 原文地址:https://www.cnblogs.com/lilb/p/14421639.html
Copyright © 2011-2022 走看看