zoukankan      html  css  js  c++  java
  • 从头开始搭建一个Spring boot+RabbitMQ环境

    消息队列在目前分布式系统下具备非常重要的地位,如下的场景是比较适合消息队列的:

    • 跨系统的调用,异步性质的调用最佳。
    • 高并发问题,利用队列串行特点。
    • 订阅模式,数据被未知数量的消费者订阅,比如某种数据的变更会影响多个系统的数据,订单数据就是比较好理解的。

    之前有一个场景是商品数据在修改后需要推送到elasticsearch中,由于修改产品的并发量以及数据量均不大,所以对于消息未做持久化,而且为了快速上线简化系统,生产者与消费者更是部署在一个应用中,自生产自消费。这篇将从头搭建RabbitMQ环境,并且将之集成在Spring boot中。

    搭建RabbitMQ环境

    erlang

    由于RabbitMQ是基于erlang开发的,所以要安装RabbitMQ先必须安装erlang。

    更换软件源

    使用apt-get时默认的软件源是us.archive.ubuntu.com,这会经常发生安装问题,比如速度特别慢或者由于下载不了造成不能安装。

    可以更换成国内的数据源cn.archive.ubuntu.com,速度那是不用说的了(这里感谢我的同事的提醒)。找到下面这个文件然后进行替换。

    /etc/apt/sources.list
    
    :%s/us.archive/cn.archive/g 

    在没有更新软件源时,我采取的是源码编译安装方法,参考这篇文章。我安装的是最新19.2版本,安装过程中还遇到各种问题就不一一记录了。

    http://erlang.org/doc/installation_guide/INSTALL.html

    测试erlang安装是否正确,输入erl,如果看到如下图所示就说明安装成功了。

    安装RabbitMQ

    在未更换软件源之前我也是选择了源码编译安装方法,安装的最新的3.6.6,但手动启动时总是不成功,错误信息如下:

    版本问题

    RabbitMQ 3.6.6+ erlang 19.2 启动失败的问题暂时未解决,有谁知道的可以告诉我。

    由于启动不成功,最后在更新成国内软件源之后,再次通过 apt-get 安装RabbitMQ,默认的版本是3.5.7,好像也可以选版本,以后再尝试。可喜的是通过apt-get安装的RabbitMQ成功的启动起来了。可以通过如下命令查看RabbitMQ状态。

    ./rabbitmqctl stauts
    RabbitMQ管理工具

    这是自带的一个web插件,可以用来管理消息队列,启动它的方法比较简单:

    rabbitmq-plugins enable rabbitmq_management

    然后重启RabbitMQ即可生效。默认生成了guest用户,但这个guest用户只能在RabbitMQ所在主机才能访问,所以要想远程访问就需要重新分配一个用户,有两种办法:

    • 通过网页,以guest登录然后在页面上完成操作。
    • 通过命令,创建用户,授权也可以。

    创建用户,指定用户名以及密码

    ./rabbitmqctl add_user root root //用户名密码都是root

    分配角色,administrator是可以操作和guest本地用户一样的功能,当登录上rabbitmq_management之后,里面的所有功能都可以使用。

    rabbitmqctl set_user_tags root administrator

    授权,队列的操作管理权限。如果不配置,那么客户端在连接消息队列时会出问题。

    rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

    上面语句我没有执行成功,后续再研究下是不是写法问题

    Spring boot集成RabbitMQ

    我们在rabbitmq_management上面可以正常访问操作后,就可以放心的写demo了,这里采用spring boot。先看简单看下RabbitMQ的简易架构图,容易理解下面提到的一些组件。

      • 生产者,消息,消费者

     

      • 消息内部:Exchange,Binding,Queues

    引用amqp的starter

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    增加配置信息

    这里没有采用自动配置

    mq.rabbit.host=192.168.21.128
    mq.rabbit.port=5672
    mq.rabbit.virtualHost=/
    mq.rabbit.username=root
    mq.rabbit.password=root

    创建RabbitMQConfig

    • ConnectionFactory,类似于数据库连接等。
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort);
    
        connectionFactory.setUsername(this.mqRabbitUserName);
        connectionFactory.setPassword(this.mqRabbitPassword);
        connectionFactory.setVirtualHost(this.mqRabbitVirtualHost);
        connectionFactory.setPublisherConfirms(true);
    
        return connectionFactory;
    }
    • RabbitTemplate,用来发送消息。
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
    • DirectExchange
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }
    • Queue,构建队列,名称,是否持久化之类
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }
    • Binding,将DirectExchange与Queue进行绑定
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY);
    }
    • SimpleMessageListenerContainer,消费者

    需要将ACK修改为手动确认,避免消息在处理过程中发生异常造成被误认为已经成功消费的假象。

    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {
    
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                logger.info("消费端接收到消息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });
        return container;
    }

    服务端,业务逻辑,调用消息队列。

    为了让客户端知道消息是否已经成功,消息队列提供了回调机制(需要实现ConfirmCallback),当消息服务器接收到消息之后会给客户端一个通知,此时客户端根据消息应答来决定后续的流程。

    @Service
    public class ProductServiceImpl extends BaseService implements ProductService, RabbitTemplate.ConfirmCallback {
    
        @Autowired
        private ProductMapper productMapper;
    
        private RabbitTemplate rabbitTemplate;
    
        public ProductServiceImpl(RabbitTemplate rabbitTemplate){
            this.rabbitTemplate=rabbitTemplate;
            this.rabbitTemplate.setConfirmCallback(this);
        }
    
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            this.logger.info(" 消息id:" + correlationData);
            if (ack) {
                this.logger.info("消息发送确认成功");
            } else {
                this.logger.info("消息发送确认失败:" + cause);
    
            }
        }
    
        @Override
        public void save(Product product) {
    
            //执行保存
            String uuid = UUID.randomUUID().toString();
            CorrelationData correlationId = new CorrelationData(uuid);
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, product.getName(),correlationId);
        }
    }

    执行结果

    可以清晰的看到RabbitMQ发给生产者的信息收到的确认信息,也能看到消息被消费端消费后的信息。

    RabbitMQ的其它方面

    高可用方案

    与常见的数据库类似,都是主从模式来保证高可用,可以利用HAProxy来实现主从备份方案。

    水平扩展方案

    主要是为了解决垂直优化的瓶颈问题,主要有这三种:

    • clustering,这是默认内置的一种集群模式,与下面两种不同的是clustering一般应用于同一局域网。
    • federation,有待后续学习
    • shovel,有待后续学习

    不丢消息特性

    这个不是RabbitMQ的专利,将消息持久化可以确保RabbitMQ重启或者死机过程中不至于丢掉没有消费的消息。

    消息不被重复消费

    这点要靠消费端来完成,尽管消费端可以通过ACK来通知消息队列消息已经被消费,但如果消费端消费了消息,此时ACK过程中的通知出现异常,消息队列会认为消息未被消费会继续发给消费端。

    总结

    初次安装可能会出现一堆问题,特别是需要安装所依赖的众多包。RabbitMQ与Erlang可能存在版本依赖问题待后续确认。spring boot下集成RabbitMQ异常简单,可以根据需求部署集群来实现可扩展高可用的消息系统。

    引用

  • 相关阅读:
    Windows SDK编程(Delphi版) 之 应用基础,楔子
    一个小问题引发的论证思考
    Delphi 组件开发教程指南(7)继续模拟动画显示控件
    用PyInstaller将python转成可执行文件exe笔记
    使用 .Net Memory Profiler 诊断 .NET 应用内存泄漏(方法与实践)
    Microsof Office SharePoint 2007 工作流开发环境搭建
    How to monitor Web server performance by using counter logs in System Monitor in IIS
    LINQ之Order By
    window 性能监视器
    内存泄露检测工具
  • 原文地址:https://www.cnblogs.com/ASPNET2008/p/6414145.html
Copyright © 2011-2022 走看看