zoukankan      html  css  js  c++  java
  • Spring boot+RabbitMQ环境

    Spring boot+RabbitMQ环境

    消息队列在目前分布式系统下具备非常重要的地位,如下的场景是比较适合消息队列的:

    • 跨系统的调用,异步性质的调用最佳。
    • 高并发问题,利用队列串行特点。
    • 订阅模式,数据被未知数量的消费者订阅,比如某种数据的变更会影响多个系统的数据,订单数据就是比较好理解的。

    之前有一个场景是商品数据在修改后需要推送到elasticsearch中,由于修改产品的并发量以及数据量均不大,所以对于消息未做持久化,而且为了快速上线简化系统,生产者与消费者更是部署在一个应用中,自生产自消费。这篇将从头搭建RabbitMQ环境,并且将之集成在Spring boot中。

    搭建RabbitMQ环境

    erlang

    由于RabbitMQ是基于erlang开发的,所以要安装RabbitMQ先必须安装erlang。

    更换软件源

    使用apt-get时默认的软件源是us.archive.ubuntu.com,这会经常发生安装问题,比如速度特别慢或者由于下载不了造成不能安装。

    可以更换成国内的数据源cn.archive.ubuntu.com,速度那是不用说的了(这里感谢我的同事的提醒)。找到下面这个文件然后进行替换。

    /etc/apt/sources.list
    
    :%s/us.archive/cn.archive/g 

    在没有更新软件源时,我采取的是源码编译安装方法,参考这篇文章。我安装的是最新19.2版本,安装过程中还遇到各种问题就不一一记录了。

    http://erlang.org/doc/installation_guide/INSTALL.html

    测试erlang安装是否正确,输入erl,如果看到如下图所示就说明安装成功了。

    安装RabbitMQ

    在未更换软件源之前我也是选择了源码编译安装方法,安装的最新的3.6.6,但手动启动时总是不成功,错误信息如下:

    版本问题

    RabbitMQ 3.6.6+ erlang 19.2 启动失败的问题暂时未解决,有谁知道的可以告诉我。

    由于启动不成功,最后在更新成国内软件源之后,再次通过 apt-get 安装RabbitMQ,默认的版本是3.5.7,好像也可以选版本,以后再尝试。可喜的是通过apt-get安装的RabbitMQ成功的启动起来了。可以通过如下命令查看RabbitMQ状态。

    ./rabbitmqctl stauts
    RabbitMQ管理工具

    这是自带的一个web插件,可以用来管理消息队列,启动它的方法比较简单:

    rabbitmq-plugins enable rabbitmq_management

    然后重启RabbitMQ即可生效。默认生成了guest用户,但这个guest用户只能在RabbitMQ所在主机才能访问,所以要想远程访问就需要重新分配一个用户,有两种办法:

    • 通过网页,以guest登录然后在页面上完成操作。
    • 通过命令,创建用户,授权也可以。

    创建用户,指定用户名以及密码

    ./rabbitmqctl add_user root root //用户名密码都是root

    分配角色,administrator是可以操作和guest本地用户一样的功能,当登录上rabbitmq_management之后,里面的所有功能都可以使用。

    rabbitmqctl set_user_tags root administrator

    授权,队列的操作管理权限。如果不配置,那么客户端在连接消息队列时会出问题。

    rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

    上面语句我没有执行成功,后续再研究下是不是写法问题

    Spring boot集成RabbitMQ

    我们在rabbitmq_management上面可以正常访问操作后,就可以放心的写demo了,这里采用spring boot。先看简单看下RabbitMQ的简易架构图,容易理解下面提到的一些组件。

      • 生产者,消息,消费者

     

      • 消息内部:Exchange,Binding,Queues

    引用amqp的starter

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    增加配置信息

    这里没有采用自动配置

    mq.rabbit.host=192.168.21.128
    mq.rabbit.port=5672
    mq.rabbit.virtualHost=/
    mq.rabbit.username=root
    mq.rabbit.password=root

    创建RabbitMQConfig

    • ConnectionFactory,类似于数据库连接等。
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort);
    
        connectionFactory.setUsername(this.mqRabbitUserName);
        connectionFactory.setPassword(this.mqRabbitPassword);
        connectionFactory.setVirtualHost(this.mqRabbitVirtualHost);
        connectionFactory.setPublisherConfirms(true);
    
        return connectionFactory;
    }
    • RabbitTemplate,用来发送消息。
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
    • DirectExchange
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }
    • Queue,构建队列,名称,是否持久化之类
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }
    • Binding,将DirectExchange与Queue进行绑定
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY);
    }
    • SimpleMessageListenerContainer,消费者

    需要将ACK修改为手动确认,避免消息在处理过程中发生异常造成被误认为已经成功消费的假象。

    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {
    
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                logger.info("消费端接收到消息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });
        return container;
    }

    服务端,业务逻辑,调用消息队列。

    为了让客户端知道消息是否已经成功,消息队列提供了回调机制(需要实现ConfirmCallback),当消息服务器接收到消息之后会给客户端一个通知,此时客户端根据消息应答来决定后续的流程。

    @Service
    public class ProductServiceImpl extends BaseService implements ProductService, RabbitTemplate.ConfirmCallback {
    
        @Autowired
        private ProductMapper productMapper;
    
        private RabbitTemplate rabbitTemplate;
    
        public ProductServiceImpl(RabbitTemplate rabbitTemplate){
            this.rabbitTemplate=rabbitTemplate;
            this.rabbitTemplate.setConfirmCallback(this);
        }
    
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            this.logger.info(" 消息id:" + correlationData);
            if (ack) {
                this.logger.info("消息发送确认成功");
            } else {
                this.logger.info("消息发送确认失败:" + cause);
    
            }
        }
    
        @Override
        public void save(Product product) {
    
            //执行保存
            String uuid = UUID.randomUUID().toString();
            CorrelationData correlationId = new CorrelationData(uuid);
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, product.getName(),correlationId);
        }
    }

    执行结果

    可以清晰的看到RabbitMQ发给生产者的信息收到的确认信息,也能看到消息被消费端消费后的信息。

    RabbitMQ的其它方面

    高可用方案

    与常见的数据库类似,都是主从模式来保证高可用,可以利用HAProxy来实现主从备份方案。

    水平扩展方案

    主要是为了解决垂直优化的瓶颈问题,主要有这三种:

    • clustering,这是默认内置的一种集群模式,与下面两种不同的是clustering一般应用于同一局域网。
    • federation,有待后续学习
    • shovel,有待后续学习

    不丢消息特性

    这个不是RabbitMQ的专利,将消息持久化可以确保RabbitMQ重启或者死机过程中不至于丢掉没有消费的消息。

    消息不被重复消费

    这点要靠消费端来完成,尽管消费端可以通过ACK来通知消息队列消息已经被消费,但如果消费端消费了消息,此时ACK过程中的通知出现异常,消息队列会认为消息未被消费会继续发给消费端。

    总结

    初次安装可能会出现一堆问题,特别是需要安装所依赖的众多包。RabbitMQ与Erlang可能存在版本依赖问题待后续确认。spring boot下集成RabbitMQ异常简单,可以根据需求部署集群来实现可扩展高可用的消息系统。

    引用

     
     
  • 相关阅读:
    Centos-706-在Win10中共享目录然后在Linux中访问
    Centos-706-在Linux中共享目录然后在Win10中访问
    Centos-706-配置文件
    Centos-706-daemon.json启用hosts后无法启动
    Centos-706-Docker开启远程访问
    Centos-706-Docker镜像-生成以及推送
    Centos-706-固定IP设置
    jquery.chosen.js实现模糊搜索
    Mysql索引分析:适合建索引?不适合建索引?【转】
    查看 Apache并发请求数及其TCP连接状态【转】
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/6423120.html
Copyright © 2011-2022 走看看