zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列

    目录:

      1、介绍

      2、linux安装

      3、工作模式

      4、整合spring

    介绍:

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序应用程序的通信方法。

    应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

    消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

    排队指的是应用程序通过 队列来通信。

    队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。

    项目访问数据库的压力问题

      说明 :

      虽然后台的数据库已经通过mycat实现了数据库的高可用,并且可以抗击部分高并发。但是如果并发压力特别大。这时数据库的运行必定满负荷。很容易造成数据库宕机。

      分析问题:

        并发要求数据库第一时间执行更新操作,但是数据库现在满负荷,没有能力处理多余的请求,导致容器产品宕机风险。

      解决办法:

        准备一个消息队列,如果有高并发的请求,数据库处理不完,则将消息存入队列中.这时数据库先处理自己的请求,当请求处理完成后,从消息队列中获取请求.之后处理数.

    这样的做法,实现了用户的请求和数据库执行的异步操作!!

    架构升级:

      说明:引入消息队列后,主要解决一个数据库“更新并发压力”较大的问题,使用消息队列的机制,可以让后台的整个架构的性能提升至少30%,

      现在几乎全部的软件公司都在使用队列。

      服务器的请求多。

      mysql的处理有一定的峰值

      使用队列平衡这样的关系。两座上消除队列的内在是无限的。 

    其他mq:

      rocketMQ

      kafkaMQ

    linux安装rabbitmq

      1、导入 rabbitmq-server-3.6.1-1.noarch.rpm

      2、安装命令:rpm -ivh rabbitmq-server-3.6.1-1.noarch.rpm

      3、安装完成 

      配置文件修改:(rabbitmq.config)

        {loopback_users, []}

      复制配置文件到cd etc/rabbitmq

    启动mq

      rabbitmq-plugins enable rabbitmq_management

       启动:service rabbitmq-server start

      停止:service rabbitmq-server stop

      重启:service rabbitmq-server restart

    rabbitmq的端口号

      1、15672  rabbitMQ控制台端口

      2、5672客户端连接rabbitMQ的端口  

    浏览器远程访问:

      192.168.220.134:15672

      登陆账号:guest  密码:guest

      

    channels:链接rabbitMQ的唯一通道

    Exchanges:交换机,可以让消息发往指定的队列中

    queues:队列:在消息队列中可以有无数个队列

        如果控制台

    用户权限设定

      1、Admin > add user

           

       2、定义虚拟主机:

        自己维护的队列的内容,包含路由/交换机/队列

        命名:/jt

        

      3、为用户分配虚拟主机(双击jtadmin进入)

        

    rabbitMQ的工作模式

    1、简单模式

      

      p:provider:消息的提供者,消息的发出者

      c:Consumer:消费者,将消息 进行处理

      红色区域:rabbitMQ

       jar包导入

      <dependencies>
               <!-- 消息队列 -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.5.1</version>
            </dependency>
            <dependency>
                   <groupId>org.springframework.amqp</groupId>
                   <artifactId>spring-rabbit</artifactId>
                   <version>1.4.0.RELEASE</version>
            </dependency>
      </dependencies>

     测试代码:

    package com.jt.rabbitmq;
    
    import java.io.IOException;
    
    import org.junit.Before;
    import org.junit.Test;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    //测试rabbitMQ中的简单模式
    public class TestRabbitMQ_1_simple {
        private Connection connection = null;
        /**
         * rabbitMQ的连接步骤
         * 1、通过用户jtadmin连接rabbitMQ(ip:端口/用户名/密码/虚拟主机)
         * 2、定义消息的提供者provider
         *         2.1、创建channel对象(控制队列/路由/交换机等)
         *         2.2、定义消息队列
         *         2.3、发送消息到队列中
         * @throws IOException 
         */
        //表示从rabbitMQ工厂中获取链接
        @Before
        public void initConnection() throws IOException{
            //创建工厂对象
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.220.134");
            factory.setPort(5672);
            factory.setUsername("jtadmin");
            factory.setPassword("jtadmin");
            factory.setVirtualHost("/jt");    
            //获取连接
            connection = factory.newConnection();
        }
        /**
         * 创建生产者对象
         * @throws IOException 
         */
        @Test
        public void provider() throws IOException{
            //1、获取channel对象,控制队列和路由和交换机
            Channel channel = connection.createChannel();
            //2、定义队列
            /**
             * queue:队列名称
             * durable:是否持久化,true:当消息队列重新启动后,队列还存在
             *                         false:当队列重启后,队列不存在。
             * exclusive:独有的
             *             当前的消息队列是否由生产者独占,如果配置为true表示消费者不能
             * autoDelete:是否自动删除。如果为true,则消息队列中没有消息时,该队列自动删除。、
             * arguments:提交的参数。一般为null
             */
            channel.queueDeclare("queue_simple", false, false, false, null);
            //3、定义需要发送的消息 
            String msg = "我是简单模式";
            //将消息与队列绑定,并且发送
            /**
             * exchange:交换机名称:如果没有交换机,则为""串
             * routingkey:消息发往队列的ID(参数),如果没有路由key,则写队列名称
             * props:消息发送的额外的参数,如果没有参数为null
             * body:发送的消息 的二进制字节码文件
             */
            channel.basicPublish("", "queue_simple", null, msg.getBytes());
            //关闭流
            channel.close();
            connection.close();
            System.out.println("队列发送成功!");
        }
        
        //定义消费者
        /**
         * 1、先获取channel对象
         * 2、定义消息队列
         * 3、定义消费者对象
         * 4、将消费者与队列信息绑定
         * 5、通过循环的方式获取队列中的内容
         * 6、将获取的数据转化为字符串
         * @throws IOException 
         * @throws InterruptedException 
         * @throws ConsumerCancelledException 
         * @throws ShutdownSignalException 
         */
        @Test
        public void consumer() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            Channel channel = connection.createChannel();
            //定义队列
            channel.queueDeclare("queue_simple", false, false, false, null);
            //定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //将队列与消费者绑定
            /**
             * queue:队列的名称
             * autoAck:是否自动回复,队列确认后方能执行下次消息 
             * callback:回调参数,写的是消费者对象
             * 
             */
            channel.basicConsume("queue_simple", true,consumer);
            //通过循环方式获取消息 
            while(true){
                //获取消息队列的内容---delivery
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("消息者队列消息 :"+msg);
            }
        }
    }
    View Code

     

    2、工作模式

    工作原理:当生产者生产消息 后,保存到队列中。

    轮询模式

    测试代码:

    package com.jt.rabbitmq;
    
    import java.io.IOException;
    
    import org.junit.Before;
    import org.junit.Test;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class TestRabbitMQ_2_work {
        
        //工作模式:多个人一起消费一个队列消息.内部轮询机制
        
        
        /**
         * 1.定义rabbmq地址 ip:端口
         * 2.定义虚拟主机
         * 3.定义用户名和密码
         * @throws IOException 
         */
        private Connection connection = null;
        @Before
        public void init() throws IOException{
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.220.134");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/jt");
            connectionFactory.setUsername("jtadmin");
            connectionFactory.setPassword("jtadmin");
            
            //获取链接
            connection = connectionFactory.newConnection();
        }
        
        @Test
        public void provider() throws IOException{
            //定义通道对象
            Channel channel = connection.createChannel();
            
            //定义队列
            channel.queueDeclare("queue_work", false, false, false, null);
            
            //定义广播的消息
            String msg = "我是工作模式";
            
            //发送消息
            channel.basicPublish("", "queue_work", null, msg.getBytes());
            
            //关闭流文件
            channel.close();
            connection.close();
        }
        
        
        //定义消费者
        @Test
        public void consumer1() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            //定义通道
            Channel channel = connection.createChannel();
            
            //定义队列
            channel.queueDeclare("queue_work", false, false, false, null);
            
            //定义消费数  每次只能消费一条记录.当消息执行后需要返回ack确认消息 才能执行下一条
            channel.basicQos(1);
            
            //定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            //将队列和消费者绑定  false表示手动返回ack
            channel.basicConsume("queue_work", false, consumer);
            
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("队列A获取消息:"+msg);
                //deliveryTag 队列下标位置
                //multiple是否批量返回
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
            }
        }
        
        
        //定义消费者
            @Test
            public void consumer2() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
                //定义通道
                Channel channel = connection.createChannel();
                
                //定义队列
                channel.queueDeclare("queue_work", false, false, false, null);
                
                //定义消费数  每次只能消费一条记录.当消息执行后需要返回ack确认消息 才能执行下一条
                channel.basicQos(1);
                
                //定义消费者
                QueueingConsumer consumer = new QueueingConsumer(channel);
                
                //将队列和消费者绑定  false表示手动返回ack
                channel.basicConsume("queue_work", false, consumer);
                
                while(true){
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String msg = new String(delivery.getBody());
                    System.out.println("队列B获取消息:"+msg);
                    //deliveryTag 队列下标位置
                    //multiple是否批量返回
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                }
            }
        
        
            //定义消费者
                    @Test
                    public void consumer3() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
                        //定义通道
                        Channel channel = connection.createChannel();
                        
                        //定义队列
                        channel.queueDeclare("queue_work", false, false, false, null);
                        
                        //定义消费数  每次只能消费一条记录.当消息执行后需要返回ack确认消息 才能执行下一条
                        channel.basicQos(1);
                        
                        //定义消费者
                        QueueingConsumer consumer = new QueueingConsumer(channel);
                        
                        //将队列和消费者绑定  false表示手动返回ack
                        channel.basicConsume("queue_work", false, consumer);
                        
                        while(true){
                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            String msg = new String(delivery.getBody());
                            System.out.println("队列C获取消息:"+msg);
                            //deliveryTag 队列下标位置
                            //multiple是否批量返回
                            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                        }
                    }
        
        
        
        
    }
    View Code

     

    3、订阅模式

     

    只要队列绑定了交换机,当p生产者生产消息 时,这时连接交换机的全部队列都会收到这个消息 。并且所有的消费者都会执行消息 ,类似于广播-邮箱(群发)

     java测试代码

      说明:发布订阅模式测试时,需要先启动consumer,再启动provider

    package com.jt.rabbitmq;
    
    import java.io.IOException;
    
    import org.junit.Before;
    import org.junit.Test;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class TestRabbitMQ_3_publish {
    
    //发布订阅模式
    private Connection connection = null;
        
        //定义rabbit连接池
        @Before
        public void initConnection() throws IOException{
            //定义工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设定参数
            connectionFactory.setHost("192.168.220.134");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/jt");
            connectionFactory.setUsername("jtadmin");
            connectionFactory.setPassword("jtadmin");
            
            //创建连接
            connection = connectionFactory.newConnection();    
        }
        
        
        
        //定义生产者
        @Test
        public void  proverder() throws IOException{
            //定义通道
            Channel channel = connection.createChannel();        
            //定义交换机名称
            String exchange_name = "E1";
            //定义发布订阅模式    fanout    redirect 路由模式    topic 主题模式
            channel.exchangeDeclare(exchange_name, "fanout");        
            for(int i=0;i<10; i++){
                String msg = "发布订阅模式"+i;
                channel.basicPublish(exchange_name, "", null, msg.getBytes());
            }        
            channel.close();
            connection.close();
        }
        
        
        /**
         * 消费者需要定义队列名称  并且与交换机绑定
         * @throws IOException
         * @throws InterruptedException 
         * @throws ConsumerCancelledException 
         * @throws ShutdownSignalException 
         */
        @Test
        public void consumer1() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            Channel channel = connection.createChannel();
            
            String exchange_name = "E1";
            String queue_name = "c_1";
            
            //定义交换机模式
            channel.exchangeDeclare(exchange_name, "fanout");        
            //定义队列
            channel.queueDeclare(queue_name, false, false, false, null);        
            //将队列和交换机绑定
            channel.queueBind(queue_name, exchange_name, "");        
            //定义消费数量 
            channel.basicQos(1);        
            //定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);        
            //将消费者和队列绑定,并且需要手动返回
            channel.basicConsume(queue_name, false, consumer);
            
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();            
                String msg = "发布订阅模式-消费者1"+new String(delivery.getBody());
                System.out.println(msg);
                
                //false表示一个一个返回
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
        
        
        /**
         * 消费者需要定义队列名称  并且与交换机绑定
         * @throws IOException
         * @throws InterruptedException 
         * @throws ConsumerCancelledException 
         * @throws ShutdownSignalException 
         */
        @Test
        public void consumer2() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            Channel channel = connection.createChannel();
            
            String exchange_name = "E1";
            String queue_name = "c_2";
            
            //定义交换机模式
            channel.exchangeDeclare(exchange_name, "fanout");
            
            //定义队列
            channel.queueDeclare(queue_name, false, false, false, null);
            
            //将队列和交换机绑定
            channel.queueBind(queue_name, exchange_name, "");
            
            channel.basicQos(1);
            
            //定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            //定义回复方式
            channel.basicConsume(queue_name, false, consumer);
            
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                
                String msg = "发布订阅模式-消费者2"+new String(delivery.getBody());
                System.out.println(msg);
                
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
    
        }
        
        
        
    }
    View Code

    4、路由模式

    说明:每一个队列都有自己的路由key.当生产者发送消息时,会携带一个路由key,会将消息发往路由Key一致的队列中.

    路由模式是发布订阅模式的升级版

     java测试代码

    package com.jt.rabbitmq;
    
    
    import java.io.IOException;
    
    import org.junit.Before;
    import org.junit.Test;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class TestRabbitMQ_4_redirect {
    
    //发布订阅模式
    private Connection connection = null;
        
        //定义rabbit连接池
        @Before
        public void initConnection() throws IOException{
            //定义工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设定参数
            connectionFactory.setHost("192.168.126.146");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/jt");
            connectionFactory.setUsername("jtadmin");
            connectionFactory.setPassword("jtadmin");
            //创建连接
            connection = connectionFactory.newConnection();    
        }
        
        
        
        //定义生产者
        @Test
        public void  proverder() throws IOException{
            Channel channel = connection.createChannel();
            
            //定义交换机名称
            String exchange_name = "redirect";
            
            
            //定义发布订阅模式    fanout    direct 路由模式    topic 主题模式
            channel.exchangeDeclare(exchange_name, "direct");
            
            for(int i=0;i<10; i++){
                String msg = "生产者发送消息"+i;
                String rontKey = "1707B";
                
                /**
                 * exchange:交换机名称
                 * routingKey:路由key
                 * props:参数
                 * body:发送消息
                 */
                channel.basicPublish(exchange_name, rontKey, null, msg.getBytes());
            }
            
            channel.close();
            connection.close();
        }
        
        /**
         * 消费者需要定义队列名称  并且与交换机绑定
         * @throws IOException
         * @throws InterruptedException 
         * @throws ConsumerCancelledException 
         * @throws ShutdownSignalException 
         */
        @Test
        public void consumer1() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            
            //定义通道
            Channel channel = connection.createChannel();
            
            //定义交换机名称
            String exchange_name = "redirect";
            
            //定义队列名称
            String queue_name = "c_r_1";
            
            //定义交换机模式
            channel.exchangeDeclare(exchange_name, "direct");
            
            //定义队列
            channel.queueDeclare(queue_name, false, false, false, null);
            
            //将队列和交换机绑定
            /**
             * 参数介绍:
             *     queue:队列名称
             *  exchange:交换机名称
             *  routingKey:路由key
             */
            //channel.queueBind(queue, exchange, routingKey)
            channel.queueBind(queue_name, exchange_name, "1707A");
            
            //定义消费个数
            channel.basicQos(1);
            
            //定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            //绑定消息与消费者
            channel.basicConsume(queue_name, false, consumer);
            
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                
                String msg = "路由模式-消费者1"+new String(delivery.getBody());
                System.out.println(msg);
                
                //手动回复 一个一个回复
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
    
        }
        
        
        /**
         * 消费者需要定义队列名称  并且与交换机绑定
         * @throws IOException
         * @throws InterruptedException 
         * @throws ConsumerCancelledException 
         * @throws ShutdownSignalException 
         */
        @Test
        public void consumer2() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            Channel channel = connection.createChannel();
            
            String exchange_name = "redirect";
            String queue_name = "c_r_2";
            
            //定义交换机模式
            channel.exchangeDeclare(exchange_name, "direct");
            
            //定义队列
            channel.queueDeclare(queue_name, false, false, false, null);
            
            //将队列和交换机绑定
            channel.queueBind(queue_name, exchange_name, "1707B");
            
            channel.basicQos(1);
            
            //定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            //定义回复方式
            channel.basicConsume(queue_name, false, consumer);
            
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                
                String msg = "路由模式-消费者2"+new String(delivery.getBody());
                System.out.println(msg);
                
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    View Code

    5、主题模式

      其实就是在路由模式上添加了通配符的概念,表示有一类满足路由key的队列可以接受消息

        #号:可以匹配一个或多个字符

        Key:item.update.abc          消费者路由key item.#

        *:只能匹配单个字符或单词

        Key:item.update              消费者的路由key item.*  可以匹配

        Key:item.update.abc          消费者的路由key item.* 不能匹配

    java测试代码

    package com.jt.rabbitmq;
    
    import java.io.IOException;
    
    import org.junit.Before;
    import org.junit.Test;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    //主题模式
    public class TestRabbitMQ_5_topic {
        
        private Connection connection = null;
    
        @Before
        public void initConnection() throws IOException{
            //1.定义ConnectionFactory对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.220.134");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/jt");
            connectionFactory.setUsername("jtadmin");
            connectionFactory.setPassword("jtadmin");    
            //获取连接
            connection = connectionFactory.newConnection();    
        }
        
        //定义生产者
        @Test
        public void proverder() throws Exception{
            //获取通道
            Channel channel = connection.createChannel();
            
            //定义交换机的名称
            String exchange_name = "TOP";
            
            //创建交换机队列   
            //exchange  交换机名称
            //type 定义类型 fanout 发布订阅模式   direct-路由模式    topic-主题模式
            channel.exchangeDeclare(exchange_name, "topic");  //主题模式
            
            for (int i = 0; i < 100; i++) {
                
                String msg = "主题模式"+i;
                
                /**
                 * 参数说明:
                 *     exchange:交换机名称
                 *  routingKey:路由key
                 *  props:参数
                 *  body:数据字节码
                 */
                //channel.basicPublish(exchange, routingKey, props, body);
                channel.basicPublish(exchange_name,"item.update", null, msg.getBytes());
            }
            channel.close();
            connection.close();
        }
        
        @Test
        public  void consumer1() throws Exception{
            
            //定义通道
            Channel channel = connection.createChannel();
            
            //定义交换机名称
            String exchange_name = "TOP";
            
            //定义队列名称
            String queue_name = "TOP1";
            
            //声明交换机名称以及主题模式
            channel.exchangeDeclare(exchange_name, "topic");
            
            //定义队列
            channel.queueDeclare(queue_name, false, false, false, null);
            
            //将交换机和队列进行绑定   
            //参数1.队列名称     参数2交换机名称   参数3 路由key  #号匹配多个字符
            channel.queueBind(queue_name, exchange_name, "item.#");
            
            channel.basicQos(1);  //定义消费数量  1
            
            //定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            //将队列和消费者绑定
            channel.basicConsume(queue_name, false, consumer);  //定义手动回复
            
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                //获取消息队列中的数据
                String msg = new String(delivery.getBody());
                System.out.println("item.#消费者1:"+msg);
                
                //手动回复
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);    
            }
        }
        
        
        @Test
        public  void consumer2() throws Exception{
            Channel channel = connection.createChannel();
            String exchange_name = "TOP";
            String queue_name = "TOP2";
            //生命交换机模式
            channel.exchangeDeclare(exchange_name, "topic");
            //定义队列
            channel.queueDeclare(queue_name, false, false, false, null);
            //将交换机和队列进行绑定   
            //参数1.队列名称     参数2交换机名称     参数3定义路由key
            channel.queueBind(queue_name, exchange_name, "item.*");
            
            channel.basicQos(1);  //定义消费数量  1
            
            //定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queue_name, false, consumer);  //定义手动回复
            
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                //获取消息队列中的数据
                String msg = new String(delivery.getBody());
                System.out.println("item.*消费者2:"+msg);
                //定义回执
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);    
            }
        }
    }
    View Code

    整合rabbitmq

      1、模块划分 

        1、order订单服务器,作为消息队列的生产者

          a:消息队列中存的是order对象(1、order信息, 2、订单物流信息,3、订单商品信息)

          b:消息的生产者需要指定路由key(使用路由模式)

          c:标识交换机名称

        2、rabbitmq服务器,作为消息队列的消费者

          a:从消息队列中获取消息 ,数据可以直接进行网络传输,要求数据必须序列化

          b:定义接收消息的队列

          c:标识交换机

          d:定义路由key

          e:将数据进行入库操作

    文件导入:

      模块位置:order

      文件1:applicationContext-rabbitmq-send.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
        http://www.springframework.org/schema/task  
        http://www.springframework.org/schema/task/spring-task-4.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
        
        <!-- 异步的线程池,线程池的最在数不能设定太小,不然<rabbit:listener/>/@RabbitListener太多的话,会出现发无法正常消费问题 -->  
        <task:executor id="taskExecutor" pool-size="4-256" queue-capacity="128" />  
        
        <!-- 定义RabbitMQ的连接工厂 -->
        <rabbit:connection-factory id="connectionFactory"
            host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
            virtual-host="${rabbit.vhost}" 
            publisher-confirms="true"  
            publisher-returns="true"  
            channel-cache-size="5" 
            executor="taskExecutor"/>
    
        <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
            exchange="orderExchange" />
    
        <!-- MQ的管理,包括队列、交换器等 -->
        <rabbit:admin connection-factory="connectionFactory" />
    
        <!-- 定义交换器,自动声明交换机 ,durable持久化 -->
        <rabbit:direct-exchange name="" auto-declare="true" durable="true">
        </rabbit:direct-exchange>
    
    </beans>
    applicationContext-rabbitmq-send.xml
        <!-- 订阅发布模式 -->
        <rabbit:fanout-exchange name=""></rabbit:fanout-exchange>
        <!-- 主题模式 -->
        <rabbit:topic-exchange name=""></rabbit:topic-exchange>

     文件2:rabbitmq连接配置文件 rabbitmq.properties

    rabbit.ip=192.168.220.134
    rabbit.port=5672
    rabbit.username=jtadmin
    rabbit.password=jtadmin
    rabbit.vhost=/jt

      文件3:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
        http://www.springframework.org/schema/task  
        http://www.springframework.org/schema/task/spring-task-4.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
        
        <!-- 异步的线程池,线程池的最在数不能设定太小,不然<rabbit:listener/>/@RabbitListener太多的话,会出现发无法正常消费问题 -->  
        <task:executor id="taskExecutor" pool-size="4-256" queue-capacity="128" />  
        
        <!-- 定义RabbitMQ的连接工厂 -->
        <rabbit:connection-factory id="connectionFactory"
            host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
            virtual-host="${rabbit.vhost}" 
            publisher-confirms="true"  
            publisher-returns="true"  
            channel-cache-size="5" 
            executor="taskExecutor"/>
        
    
        <!-- MQ的管理,包括队列、交换器等 -->
        <rabbit:admin connection-factory="connectionFactory" />
        
        <!-- 定义消息队列 -->
        <rabbit:queue name="orderQueue" auto-declare="true"/>
        
        <!-- 定义交换机,并且完成队列和交换机的绑定 -->
        <rabbit:direct-exchange name="orderExchange" auto-declare="true">
            <rabbit:bindings>
                <!-- 前台系统只接收商品更新的消息,key路由key -->
                <!-- 将队列和路由key进行绑定 -->
                <rabbit:binding queue="orderQueue" key="order.save"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
        
        <!-- 定义监听 -->
        <rabbit:listener-container connection-factory="connectionFactory">
            <!-- 监听一个队列,当队列中有消息,就会自动触发类.方法,传递消息就作为方法的参数,根据方法声明的参数强转 -->
            <rabbit:listener ref="ordermqService" method="saveOrder" queue-names="orderQueue"/>
        </rabbit:listener-container> 
        
        <bean id="ordermqService" class="com.jt.order.rabbit.service.orderServiceImpl"></bean>
    
    </beans>
    applicationContext-rabbitmq-receive.xml

    java代码:

      通过模板发送消息 :

        说明:通过消息队列,数据处理和发送是异步的,消息已经发现,但是消息还没有处理。这时数据库中还没有数据,所以一般公司要求用户半小时后查询。

      代码 :  

    //引入消息队列的模板工具类
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    public String saveOrder(Order order){
            String orderId = order.getUserId() + System.currentTimeMillis() + "";
            //赋值orderId
            order.setOrderId(orderId);
            //通过消息队列处理消息 
            /**
             * routingKey:生产者的路由key
             * object:需要发送的对象
             */
            rabbitTemplate.convertAndSend("order.save", order);
            System.out.println("消息队列调用完成");
            System.out.println("订单入库成功~~~");
            return orderId;
        }

     接收处理队列消息 ,此类直接交给rabbitmq处理

    package com.jt.order.rabbit.service;
    
    import java.util.Date;
    import java.util.List;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.jt.dubbo.pojo.Order;
    import com.jt.dubbo.pojo.OrderItem;
    import com.jt.dubbo.pojo.OrderShipping;
    import com.jt.order.mapper.OrderItemMapper;
    import com.jt.order.mapper.OrderMapper;
    import com.jt.order.mapper.OrderShippingMapper;
    
    public class orderServiceImpl {
        //将消息队列中的内容写入数据库中
        @Autowired
        private OrderMapper orderMapper;
        @Autowired
        private OrderItemMapper orderItemMapper;
        @Autowired
        private OrderShippingMapper orderShippingMapper;
        
        public void saveOrder(Order order){
            Date date = new Date();
            order.setCreated(date);
            order.setStatus(1);
            order.setUpdated(date);
            System.out.println("order=" + order);
            orderMapper.insert(order);
            //完善物流信息及入库
            OrderShipping shipping = order.getOrderShipping();
            shipping.setCreated(date);
            shipping.setUpdated(date);
            shipping.setOrderId(order.getOrderId());
            orderShippingMapper.insert(shipping);
            //完善商品信息及入库
            //1、实现批量入库操作,自己手写sql
            //2、通过循环遍历方式,实现多次入库操作
            List<OrderItem> orderItems = order.getOrderItems();
            for(OrderItem orderItem : orderItems){
                orderItem.setOrderId(order.getOrderId());
                orderItemMapper.insert(orderItem);
            }
            System.out.println("消息队列入库成功~~~");
        }
    }
    orderServiceImpl
  • 相关阅读:
    构建之法阅读笔记04
    团队项目
    构建之法阅读笔记03
    第6周学习进度
    求最大子数组03
    四则运算4(完结)
    第5周学习进度
    敏捷开发概述
    第4周学习进度
    构建之法阅读笔记01
  • 原文地址:https://www.cnblogs.com/xiangyuqi/p/8603993.html
Copyright © 2011-2022 走看看