zoukankan      html  css  js  c++  java
  • 电商系统中消息中间件的应用

    一、使用场景

    用户注册后,需要发注册邮件和注册短信

    传统的做法:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端

    这里的问题是,邮件和短信它只是一个通知,它们的执行结果不需要同步响应给用户,这种做法让客户端多等待了发送邮件和短信的时间

     

    使用消息中间件:将注册信息写入数据库后,发送注册邮件和注册短信的异步消息

    可以看到,响应给用户的时间就等于写入数据库的时间+写入消息队列的时间(可 以忽略不计),发送邮件和短信的任务异步的执行了,提高了响应速度

    二、消息中间件的选择

    持久化消息比较:zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者MQ所在的服务器down了,消息不会丢失的机制。

    可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统、社区—RabbitMq最好,ActiveMq次之,ZeroMq最差。

    高并发:从实现语言来看,RabbitMQ最高,原因是它的实现语言是天生具备高并发高可用的erlang语言。

    综上所述:RabbitMQ的性能相对来说更好更全面,是消息中间件的首选。

    三、RabbitMq简介

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)

    RabbitMQ消息推送到接收的流程图:

    黄色的圈圈就是我们的消息推送服务,例如用户注册时发送短信和邮件的异步消息,然后将消息推送到中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等将数据处理入列后,最终右边的蓝色圈圈就是消费者监听到的消息,监听到消息后可以处理我们的业务逻辑,例如发送短信和邮件

    四、RabbitMq的使用

    接下来,以b2b2c电商系统Javashop为例,具体说明RabbitMq的使用

     

    1.pom.xml文件中引入依赖

    <!--rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2.application.yml加入RabbitMq配置

    spring:
      rabbitmq:
        host: rabbitmq服务的ip
        port: 5672 #rabbitmq默认端口
        username: guest
        password: guest
        virtual-host: / #默认的vhost,用户也可以自己创建

    3.定义消息发送的实体类

    public class MqMessage {
        private String exchange;
        private String routingKey;
        private Object message;
        public MqMessage(String exchange, String routingKey, Object message) {
            this.exchange = exchange;
            this.routingKey = routingKey;
            this.message = message;
        }
        public String getExchange() {
            return exchange;
        }
    
        public void setExchange(String exchange) {
            this.exchange = exchange;
        }
    
        public String getRoutingKey() {
            return routingKey;
        }
    
        public void setRoutingKey(String routingKey) {
            this.routingKey = routingKey;
        }
    
        public Object getMessage() {
            return message;
        }
    
        public void setMessage(Object message) {
            this.message = message;
        }
    }

    4.定义发送异步事件的方法。我们需要先使用Spring的ApplicationEventPublisher来发送一个异步事件,原因是我们需要在事务提交后再发送rabbitmq消息

    @Service
    public class MessageSenderImpl implements MessageSender {
        @Autowired
        private ApplicationEventPublisher publisher;
    
        @Override
        public void send(MqMessage message) {
            publisher.publishEvent(message);
        }
    }

    5.定义监听异步事件的方法。使用Spring的@TransactionalEventListener监听异步事件,然后发送rabbitmq消息

    @Component
    public class TransactionalMessageListener {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        /**
         * 默认在事务提交后执行
         * @param message
         */
        @TransactionalEventListener(fallbackExecution = true)
        public void handleSupplierBillPush(MqMessage message){
    
            this.amqpTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), message.getMessage());
    
        }
    }

    配置好以上内容后可以使用rabbitmq发送消息了,下面以用户注册为例展示发送和接收消息的过程

    1. 用户注册,发送注册消息   

    //会员注册业务
     ...
    //组织数据结构发送会员注册消息
    MemberRegisterMsg memberRegisterMsg = new MemberRegisterMsg();
    memberRegisterMsg.setMember(member);
    memberRegisterMsg.setUuid(ThreadContextHolder.getHttpRequest().getHeader("uuid"));
    this.messageSender.send(new MqMessage(AmqpExchange.MEMEBER_REGISTER, AmqpExchange.MEMEBER_REGISTER + "_ROUTING", memberRegisterMsg));
    1. 消费者客户端接收消息,进行发送短信等操作

    @Component
    public class MemberRegisterReceiver {
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        @Autowired(required = false)
        private List<MemberRegisterEvent> events;
    
        /**
         * 会员注册
         *
         * @param vo
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = AmqpExchange.MEMEBER_REGISTER + "REGISTER_QUEUE"),
                exchange = @Exchange(value = AmqpExchange.MEMEBER_REGISTER, type = ExchangeTypes.FANOUT)
        ))
        public void memberRegister(MemberRegisterMsg vo) {
    
            if (events != null) {
                for (MemberRegisterEvent event : events) {
                    try {
                        //执行发送短信,邮件等操作
                        event.memberRegister(vo);
                    } catch (Exception e) {
                        logger.error("会员注册消息出错", e);
                    }
                }
            }
    
        }
    }
  • 相关阅读:
    数字音乐均衡器
    移植x264到vs2008之二
    无线连接频繁掉线,解决方法之telnet命令突破ddwrt端口最大数连接限制分析
    最新开发的消费平台开发过程 持续更新(二)
    .net 4.0 下请求验证模式变化 应对方法
    DDWRT无线参数解读
    利用 Application_Error 捕获所有异常
    location.reload() 和 location.replace()的区别和应用
    纯CSS 实现组织架构图,学习
    Syslog架设windows日志服务器
  • 原文地址:https://www.cnblogs.com/javashop-docs/p/13954055.html
Copyright © 2011-2022 走看看