zoukankan      html  css  js  c++  java
  • 5.【Spring Cloud Alibaba】消息驱动的微服务-SpringCloudAlibabaRocketMQ

    Spring实现异步的方式

    image

    引入MQ后的架构演进

    image

    MQ的使用场景

    • 异步处理
    • 流量削峰填谷
    • 解耦微服务

    MQ的选择

    image

    mq对比详情

    mq对比详情

    image

    搭建MQ

    搭建教程

    搭建RocketMq控制台

    RocketMQ控制台安装教程

    RocketMq的术语与概念

    image

    image

    RocketMQ进阶

    看官方RocketMQ指导

    消息编程模型01-编写生产者

    image

    pom.xml
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
    
    application.yml

    image

    代码实现
    private final RocketMQTemplate rocketMQTemplate;
    

    image

    消息编程模型02-编写消费者

    image

    pom.xml
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
    </dependency>
    
    application.yml

    image

    代码实现

    image

    分布式事务01-流程剖析,概念术语,事务消息状态

    image

    image

    image

    分布式事务02-编码实现

    表创建
    CREATE TABLE `rocketmq_transaction_log` (
    `id`  int(11) NOT NULL AUTO_INCREMENT COMMENT '主键' ,
    `transaction_Id`  varchar(45) NOT NULL COMMENT '事务' ,
    `log`  varchar(45) NOT NULL COMMENT '日志' ,
    PRIMARY KEY (`id`)
    );
    
    ShareService
    public Share auditById(Integer id, ShareAuditDTO auditDTO) {
        // 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
        Share share = this.shareMapper.selectByPrimaryKey(id);
        if (share == null) {
            throw new IllegalArgumentException("参数非法!该分享不存在!");
        }
        if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
            throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
        }
    
        // 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
        if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
            // 发送半消息。。
            String transactionId = UUID.randomUUID().toString();
    
            this.rocketMQTemplate.sendMessageInTransaction(
                    "tx-add-bonus-group",
                    "add-bonus",
                    MessageBuilder
                        .withPayload(
                            UserAddBonusMsgDTO.builder()
                                .userId(share.getUserId())
                                .bonus(50)
                                .build()
                        )
                        // header也有妙用...
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                        .setHeader("share_id", id)
                        .setHeader("dto", JSON.toJSONString(auditDTO))
                        .build(),
                    auditDTO
                );
        }
        else {
            this.auditByIdInDB(id, auditDTO);
        }
        return share;
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
        Share share = Share.builder()
            .id(id)
            .auditStatus(auditDTO.getAuditStatusEnum().toString())
            .reason(auditDTO.getReason())
            .build();
        this.shareMapper.updateByPrimaryKeySelective(share);
    
        // 4. 把share写到缓存
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
        this.auditByIdInDB(id, auditDTO);
    
        this.rocketmqTransactionLogMapper.insertSelective(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .log("审核分享...")
                .build()
        );
    }
    
    AddBonusTransactionListener
    @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
    @RequiredArgsConstructor(onConstructor = @__(@Autowired))
    public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
        private final ShareService shareService;
        private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            MessageHeaders headers = msg.getHeaders();
    
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            Integer shareId = Integer.valueOf((String) headers.get("share_id"));
    
            ShareAuditDTO auditDTO = (ShareAuditDTO)arg;
    
            try {
                this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            MessageHeaders headers = msg.getHeaders();
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
    
            // select * from xxx where transaction_id = xxx
            RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                    .transactionId(transactionId)
                    .build()
            );
            if (transactionLog != null) {
                return RocketMQLocalTransactionState.COMMIT;
            }
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    

    SpringCloudStream

    image

    image

    image

    SpringCloudStream-编写生产者

    pom.xml
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
    
    application.yml

    image

    代码编写

    @EnableBinding({Source.class})

    // 扫描mybatis哪些包里面的接口
    @MapperScan("com.itmuch.contentcenter.dao")
    @SpringBootApplication
    @EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
    @EnableBinding({Source.class})
    public class ContentCenterApplication {
    

    image

    修改com.alibaba.nacos日志级别

    logging:
      level:
        com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug
        com.alibaba.nacos: error
    

    SpringCloudStream-编写消费者

    pom.xml
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
    
    application.yml

    image

    代码编写

    @EnableBinding({Sink.class})

    // 扫描mybatis哪些包里面的接口
    @MapperScan("com.itmuch.usercenter.dao")
    @SpringBootApplication
    //@EnableDiscoveryClient
    @EnableBinding({Sink.class})
    public class UserCenterApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(UserCenterApplication.class, args);
        }
    
    }
    

    image

    SpringCloudStream自定义接口-发送消息

    image

    image

    image

    image

    SpringCloudStream自定义接口-消费消息

    image

    image

    image

    image

    修改com.alibaba.nacos日志级别

    logging:
      level:
        com.alibaba.nacos: error
    

    消息过滤

    Spring Cloud Stream实现消息过滤消费

    • condition
    • Tags
    • Sql 92

    SpringCloudStream的监控

    如下三个链接查看SpringCloudStream的监控
    image

    application.yml
    management:
      endpoints:
        web:
          exposure:
            include: '*'
      endpoint:
        health:
          show-details: always
    

    SpringCloudStream的异常处理

    SpringCloudStream的异常处理

    全局处理【通用】
    @StreamListener(value = Processor.INPUT)
    public void handle(String body) {
        throw new RuntimeException("x");
    }
    
    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        ErrorMessage errorMessage = (ErrorMessage) message;
        System.out.println("Handling ERROR: " + errorMessage);
    }
    

    SpringCloudStream+RocketMQ实现分布式事务01-重构生产者

    application.yml
    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
            bindings:
              output:
                producer:
                  transactional: true
                  group: tx-add-bonus-group
          bindings:
            output:
              # 用来指定topic
              destination: add-bonus
    

    @EnableBinding({Source.class})

    @MapperScan("com.itmuch.contentcenter.dao")
    @SpringBootApplication
    @EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
    @EnableBinding({Source.class})
    public class ContentCenterApplication {
    
    ShareService
    public Share auditById(Integer id, ShareAuditDTO auditDTO) {
        // 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
        Share share = this.shareMapper.selectByPrimaryKey(id);
        if (share == null) {
            throw new IllegalArgumentException("参数非法!该分享不存在!");
        }
        if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
            throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
        }
    
        // 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
        if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
            // 发送半消息。。
            String transactionId = UUID.randomUUID().toString();
    
            this.source.output()
                .send(
                    MessageBuilder
                        .withPayload(
                            UserAddBonusMsgDTO.builder()
                                .userId(share.getUserId())
                                .bonus(50)
                                .build()
                        )
                        // header也有妙用...
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                        .setHeader("share_id", id)
                        .setHeader("dto", JSON.toJSONString(auditDTO))
                        .build()
                );
        }
        else {
            this.auditByIdInDB(id, auditDTO);
        }
        return share;
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
        Share share = Share.builder()
            .id(id)
            .auditStatus(auditDTO.getAuditStatusEnum().toString())
            .reason(auditDTO.getReason())
            .build();
        this.shareMapper.updateByPrimaryKeySelective(share);
    
        // 4. 把share写到缓存
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
        this.auditByIdInDB(id, auditDTO);
    
        this.rocketmqTransactionLogMapper.insertSelective(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .log("审核分享...")
                .build()
        );
    }
    
    AddBonusTransactionListener
    @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
    @RequiredArgsConstructor(onConstructor = @__(@Autowired))
    public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
        private final ShareService shareService;
        private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            MessageHeaders headers = msg.getHeaders();
    
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
            Integer shareId = Integer.valueOf((String) headers.get("share_id"));
    
            String dtoString = (String) headers.get("dto");
            ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);
    
            try {
                this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            MessageHeaders headers = msg.getHeaders();
            String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
    
            // select * from xxx where transaction_id = xxx
            RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                    .transactionId(transactionId)
                    .build()
            );
            if (transactionLog != null) {
                return RocketMQLocalTransactionState.COMMIT;
            }
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    

    SpringCloudStream+RocketMQ实现分布式事务02-重构消费者

    application.yml
    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
          bindings:
            input:
              destination: add-bonus
              group: binder-group
    

    @EnableBinding({Sink.class})

    // 扫描mybatis哪些包里面的接口
    @MapperScan("com.itmuch.usercenter.dao")
    @SpringBootApplication
    //@EnableDiscoveryClient
    @EnableBinding({Sink.class})
    public class UserCenterApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(UserCenterApplication.class, args);
        }
    
    }
    
    AddBonusStreamConsumer
    @Service
    @RequiredArgsConstructor(onConstructor = @__(@Autowired))
    @Slf4j
    public class AddBonusStreamConsumer {
        private final UserService userService;
    
        @StreamListener(Sink.INPUT)
        public void receive(UserAddBonusMsgDTO message) {
            message.setEvent("CONTRIBUTE");
            message.setDescription("投稿加积分..");
            this.userService.addBonus(message);
        }
    }
    
    UserService
    @Transactional(rollbackFor = Exception.class)
    public void addBonus(UserAddBonusMsgDTO msgDTO) {
        // 1. 为用户加积分
        Integer userId = msgDTO.getUserId();
        Integer bonus = msgDTO.getBonus();
        User user = this.userMapper.selectByPrimaryKey(userId);
    
        user.setBonus(user.getBonus() + bonus);
        this.userMapper.updateByPrimaryKeySelective(user);
    
        // 2. 记录日志到bonus_event_log表里面
        this.bonusEventLogMapper.insert(
            BonusEventLog.builder()
                .userId(userId)
                .value(bonus)
                .event(msgDTO.getEvent())
                .createTime(new Date())
                .description(msgDTO.getDescription())
                .build()
        );
        log.info("积分添加完毕...");
    }
    

    SpringCloudStream知识盘点

    SpringCloudStream知识盘点

  • 相关阅读:
    Linux 添加环境变量
    postgresql 获取修改列的值
    5月30日周一上午
    周日5月29日
    2016年5月26日
    如何使用Gson(添加到项目里去)
    linux内核分析课程总结()待完善
    5月5日离散课笔记
    4月28日的离散课(还少了一部分)
    2016年4月29日
  • 原文地址:https://www.cnblogs.com/xjknight/p/12349104.html
Copyright © 2011-2022 走看看