zoukankan      html  css  js  c++  java
  • 关于RabbitMQ以及RabbitMQ和Spring的整合

    基本概念

    RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。
    RabbitMQ的结构图如下:
    几个概念说明:
    Broker:简单来说就是消息队列服务器实体。
      Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
      Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
      Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
      Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
      vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
      producer:消息生产者,就是投递消息的程序。
      consumer:消息消费者,就是接受消息的程序。
      channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
    消息队列的使用过程大概如下:
      (1)客户端连接到消息队列服务器,打开一个channel。
      (2)客户端声明一个exchange,并设置相关属性。
      (3)客户端声明一个queue,并设置相关属性。
      (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
      (5)客户端投递消息到exchange。
    exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
    exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
    RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
      (1)exchange持久化,在声明时指定durable => 1
      (2)queue持久化,在声明时指定durable => 1
      (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
    如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定
     
    rabbitmq整合spring
     1.需要下载rabbitmq, 而在rabbitmq安装之前需要安装erlang, 因为rabbitmq是用erlang写的.(下载好之后配置环境变量)

      1.1 启动rabbitmq服务器

        

      1.2 在rabbitmq服务器上创建虚拟主机
        

      1.3 创建用户

        

      1.4 给用户添加管理员角色

        

      1.5 设置用户在虚拟主机上的权限

        

        set_permissions -p vhostSJJ shijunjie ".*" ".*" ".*"

      1.6 安装基于web的管理插件

      

      按照上面配置好后,可以在管理界面上看到入下信息

      

      附:rabbitmq的一些常用命令  

      rabbitmq的安装、启动和停止

      rabbitmq-service.bat install 
      rabbitmq-service.bat start 
      rabbitmq-service.bat stop

      列出所有queue

      rabbitmqctl list_queues

      列出指定queue的信息

      rabbitmqctl list_queues [the queue name] messages_ready messages_unacknowledged

      列出所有exchange

      rabbitmqctl list_exchanges

      列出所有binding

      rabbitmqctl list_bindings

      安装基于web的管理插件

      rabbitmq-plugins.bat enable rabbitmq_management 

      关闭节点
      # rabbitmqctl stop
      2.停止RabbitMQ应用
      # rabbitmqctl stop_app
      3.启动RabbitMQ应用
      # rabbitmqctl start_app
      4.显示RabbitMQ中间件各种信息
      # rabbitmqctl status
      5.重置RabbitMQ节点
      # rabbitmqctl reset
      # rabbitmqctl force_reset
      从它属于的任何集群中移除,从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息。
      force_reset命令和reset的区别是无条件重置节点,不管当前管理数据库状态以及集群的配置。如果数据库或者集群配置发生错误才使用这个最后 的手段。
      注意:只有在停止RabbitMQ应用后,reset和force_reset才能成功。
      6.循环日志文件
      # rabbitmqctl rotate_logs[suffix]
      7.集群管理
      # rabbitmqctl cluster clusternode

      用户管理
      1.添加用户
      # rabbitmqctl add_user username password
      2.删除用户
      # rabbitmqctl delete_user username
      3.修改密码
      # rabbitmqctl change_password username newpassword
      4.列出所有用户
      # rabbitmqctl list_users

      权限控制1.创建虚拟主机
      # rabbitmqctl add_vhost vhostpath
      2.删除虚拟主机
      # rabbitmqctl delete_vhost vhostpath
      3.列出所有虚拟主机
      # rabbitmqctl list_vhosts
      4.设置用户权限
      # rabbitmqctl set_permissions [-p vhostpath] username regexp regexp regexp
      5.清除用户权限
      # rabbitmqctl clear_permissions [-p vhostpath] username
      6.列出虚拟主机上的所有权限
      # rabbitmqctl list_permissions [-p vhostpath]
      7.列出用户权限

      # rabbitmqctl list_user_permissions username

     2.建立一个maven项目.然后我们开始配置项目.

        2.1 由于是spring整合,我们需要加入spring的依赖.

      

        <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aop</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aspects</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-tx</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jdbc</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-web</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>

    2.2加入rabbitmq和spring的整合依赖

                  <!--rabbitmq依赖 -->  
            <dependency>  
                <groupId>org.springframework.amqp</groupId>  
                <artifactId>spring-rabbit</artifactId>  
                <version>1.3.5.RELEASE</version>  
            </dependency>  

    2.3定义消息生产者和消息发送者

    exchange有几种,这里我只测试了两种, 通过分别定义两个exchange去绑定directtopic..

    首先, 定义消息生产者, 通过配置将template链接connect-factory并注入到代码中使用.

    package me.shijunjie.producer;
    
    import java.io.IOException;
    
    import javax.annotation.Resource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MessageProducer {
        private Logger logger = LoggerFactory.getLogger(MessageProducer.class);  
    
        @Resource(name="amqpTemplate")  
        private AmqpTemplate amqpTemplate;  
    
        @Resource(name="amqpTemplate2")  
        private AmqpTemplate amqpTemplate2;  
    
        public void sendMessage(Object message) throws IOException {  
            logger.info("to send message:{}", message);  
            amqpTemplate.convertAndSend("queueTestKey", message);  
            amqpTemplate.convertAndSend("queueTestChris", message);  
            amqpTemplate2.convertAndSend("shijj.xxxx.wsdwd", message);  
        }  
    }

    然后我们定义消息消费者, 这里,我定义了三个消费者, 通过监听消息队列, 分别接受各自所匹配的消息.

    第一个消费者, 接受direct的消息, 他的exchange为exchangeTest,  rout-key为queueTestKey

    package me.shijunjie.consumer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class MessageConsumer implements MessageListener {
         private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);  
        @Override
        public void onMessage(Message message) {
             logger.info("consumer receive message------->:{}", message);  
            
        }
    
    }

    第二个消费者, 接受direct的消息(为了测试一个exchange可以发送多个消息), 他的exchange为exchangeTest,  rout-key为queueTestChris.

    package me.shijunjie.consumer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class ChrisConsumer implements MessageListener {
        private Logger logger = LoggerFactory.getLogger(ChrisConsumer.class);  
        @Override  
        public void onMessage(Message message) {  
            logger.info("chris receive message------->:{}", message);  
        }  
    
    }

    第三个消费者, 接受topic的消息他的exchange为exchangeTest2,  pattern为shijj.*..          .*可以匹配一个, .#可以匹配一个或多个..

    package me.shijunjie.consumer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class ShijjConsumer implements MessageListener {
        private Logger logger = LoggerFactory.getLogger(ShijjConsumer.class);  
          
        @Override  
        public void onMessage(Message message) {  
            logger.info("shijj receive message------->:{}", message);  
        }  
    }

    然后就是关键rabbit整合spring的配置文件.

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
         http://www.springframework.org/schema/rabbit  
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
        <!--配置connection-factory,指定连接rabbit server参数 -->
        <rabbit:connection-factory id="connectionFactory" virtual-host="vhostSJJ" 
            username="shijunjie" password="wssjj123" host="123.206.228.200" port="5672" 
            />
        <!-- <rabbit:connection-factory id="connectionFactory"
             username="test2" password="test2"
            host="123.206.228.200" port="5672" /> -->
    
        <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
    
        <!--定义queue -->
        <rabbit:queue name="queueTest" durable="true" auto-delete="false"
            exclusive="false" declared-by="connectAdmin" />
    
        <!-- 定义direct exchange,绑定queueTest -->
        <rabbit:direct-exchange name="exchangeTest"
            durable="true" auto-delete="false" declared-by="connectAdmin">
            <rabbit:bindings>
                <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!--定义rabbit template用于数据的接收和发送 -->
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
            exchange="exchangeTest" />
    
        <!-- 消息接收者 -->
        <bean id="messageReceiver" class="me.shijunjie.consumer.MessageConsumer"></bean>
    
        <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
        <rabbit:listener-container
            connection-factory="connectionFactory">
            <rabbit:listener queues="queueTest" ref="messageReceiver" />
        </rabbit:listener-container>
    
        <!--定义queue -->
        <rabbit:queue name="queueChris" durable="true"
            auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
        <!-- 定义direct exchange,绑定queueTest -->
        <rabbit:direct-exchange name="exchangeTest"
            durable="true" auto-delete="false" declared-by="connectAdmin">
            <rabbit:bindings>
                <rabbit:binding queue="queueChris" key="queueTestChris"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!-- 消息接收者 -->
        <bean id="receiverChris" class="me.shijunjie.consumer.ChrisConsumer"></bean>
    
        <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
        <rabbit:listener-container
            connection-factory="connectionFactory">
            <rabbit:listener queues="queueChris" ref="receiverChris" />
        </rabbit:listener-container>
    
        <!-- 分隔线 -->
        <!--配置connection-factory,指定连接rabbit server参数 -->
        <rabbit:connection-factory id="connectionFactory2" virtual-host="vhostSJJ" 
            username="shijunjie" password="wssjj123" host="123.206.228.200" port="5672" />
    
        <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2" />
    
        <!--定义queue -->
        <rabbit:queue name="queueShijj" durable="true"
            auto-delete="false" exclusive="false" declared-by="connectAdmin2" />
    
        <!-- 定义direct exchange,绑定queueTest -->
        <rabbit:topic-exchange name="exchangeTest2"
            durable="true" auto-delete="false" declared-by="connectAdmin2">
            <rabbit:bindings>
                <rabbit:binding queue="queueShijj" pattern="shijj.#"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:topic-exchange>
    
        <!--定义rabbit template用于数据的接收和发送 -->
        <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2"
            exchange="exchangeTest2" />
    
        <!-- 消息接收者 -->
        <bean id="recieverShijj" class="me.shijunjie.consumer.ShijjConsumer"></bean>
    
        <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
        <rabbit:listener-container
            connection-factory="connectionFactory2">
            <rabbit:listener queues="queueShijj" ref="recieverShijj" />
        </rabbit:listener-container>
    </beans>  

    以及我的application.xml

    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">  
      
        <import resource="classpath*:rabbitMQ.xml" />  
          
        <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->  
        <context:component-scan base-package="me.shijunjie.consumer, me.shijunjie.producer" />  
          
        <!-- 激活annotation功能 -->  
        <context:annotation-config />  
        <!-- 激活annotation功能 -->  
        <context:spring-configured />  
      
    </beans>  

    3.全部配置完成之后进行测试

     1.编写测试代码
     
    package me.shijunjie.test;
    
    import org.junit.Before;
    import org.junit.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import me.shijunjie.producer.MessageProducer;
    
    public class TestClass {
    
        private Logger logger = LoggerFactory.getLogger(TestClass.class);  
      
        private ApplicationContext context = null;  
      
        @Before
        public void setUp() throws Exception {  
            context = new ClassPathXmlApplicationContext("application.xml");  
        }  
      
        @Test  
        public void should_send_a_amq_message() throws Exception {  
            MessageProducer messageProducer = (MessageProducer) context.getBean("messageProducer");  
            int a = 100;  
            while (a > 0) {  
                messageProducer.sendMessage("Hello, I am amq sender num :" + a--);  
                try {  
                    //暂停一下,好让消息消费者去取消息打印出来  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
      
            }  
        }  
    }

    运行后

     
     
     
  • 相关阅读:
    Power BI 根据用户权限动态生成导航跳转目标
    Power BI Tooltips 增强功能
    Power BI refresh error “could not load file or assembly…provided impersonation level is invalid”
    SQL 错误代码 18456
    如何使用SQL Server Integration Services从多个Excel文件读取数据
    通过表格编辑器将现有表引入Power BI数据流
    Power BI 中动态增长的柱状图
    ambari2.7.3离线安装hdp3.1.0时,ambari-hdp-1.repo中baseurl无值
    ambari 安装 cannot download file mysql-connector-java from http://8080/resource/mysql-connector-java.jar
    洛谷P4180 [BJWC2010]严格次小生成树
  • 原文地址:https://www.cnblogs.com/s648667069/p/6401463.html
Copyright © 2011-2022 走看看