zoukankan      html  css  js  c++  java
  • RABBITMQ(Maven)

    简单模式(一个生产者,一个队列,一个消费者)

    1.导入jar包

                  <dependency>

                         <groupId>com.rabbitmq</groupId>

                         <artifactId>amqp-client</artifactId>

                         <version>3.4.1</version>

                  </dependency>

    2.创建rabbitmq连接的工具类

    package com.rabbitmq.util;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.ConnectionFactory;

    /**

     *   创建rabbitmq连接的工具类

     * @author KFS

     *

     */

    public class ConnectionUtil {

          

           public static Connection getConnection() throws Exception{

                  //创建连接工厂

                  ConnectionFactory connectionFactory=new ConnectionFactory();

                  //设置参数

                  connectionFactory.setHost("127.0.0.1");//主机ip

                  connectionFactory.setVirtualHost("/taotao");//虚拟主机名

                  connectionFactory.setUsername("admin");//账号

                  connectionFactory.setPassword("admin");//密码

                  //创建连接

                  Connection newConnection = connectionFactory.newConnection();

                  return newConnection;

           }

    }

    3.simple模式发送消息

    package com.simple.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   simple模式发送消息

     * @author KFS

     *

     */

    public class Send {

           public static void main(String[] args) throws Exception{

                  //通过rabbitmq工具类得到连接

                  Connection connection=ConnectionUtil.getConnection();

                  //创建通道

                  Channel channel = connection.createChannel();

                  /*

                   *   创建消息队列(如果有可以不用创建,但创建会覆盖之前的)

                   *   第一参数:队列名称

                   *   第二参数:队列是否持久化(存储到磁盘)

                   *   第三参数:队列是否被独占

                   *   第四参数:队列是否自动删除

                   *   第五参数:

                   */

                  channel.queueDeclare("test_simple_queue", false, false, false, null);

                  //创建消息

                  String message="simple_queue";

                  /*

                   *   发送消息

                   *   第一参数:交换机名(简单模式不用交换机,但不能用null)

                   *   第二参数:队列名称

                   *   第三参数:

                   *   第四参数:消息(字节流)

                   *

                   */

                  channel.basicPublish("", "test_simple_queue", null, message.getBytes());

                  System.out.println("发送的消息:"+message);

                  //关闭资源

                  channel.close();

                  connection.close();

           }

    }

    4.simple模式接受消息

    package com.simple.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.QueueingConsumer;

    import com.rabbitmq.client.QueueingConsumer.Delivery;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   simple模式接受消息

     * @author KFS

     *

     */

    public class Receive {

           public static void main(String[] args) throws Exception{

                  //通过rabbitmq工具类得到连接

                  Connection connection=ConnectionUtil.getConnection();

                  //创建通道

                  Channel channel = connection.createChannel();

                  /*

                   *   创建消息队列(如果有可以不用创建,但创建会覆盖之前的)

                   *   第一参数:队列名称

                   *   第二参数:队列是否持久化(存储到磁盘)

                   *   第三参数:队列是否被独占

                   *   第四参数:队列是否自动删除

                   *   第五参数:

                   */

                  channel.queueDeclare("test_simple_queue", false, false, false, null);

                  //定义消费者

                  QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

                  /*

                   *   监听队列

                   *   第一参数:队列名称

                   *   第二参数:是否自动回复完成接受

                   *   第三参数:消费者名称

                   */

                  channel.basicConsume("test_simple_queue",true, queueingConsumer);

                 

                  while(true) {

                         //获取消息

                         Delivery nextDelivery = queueingConsumer.nextDelivery();

                         //打印消息

                         String message=new String(nextDelivery.getBody());

                         System.out.println(message);

                  }

                 

           }

    }

    work模式(一个生产者,一个队列,多个消费者)

    1.work模式发送消息

    package com.work.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   work模式发送消息

     * @author KFS

     *

     */

    public class Send {

           public static void main(String[] args) throws Exception{

                  //通过rabbitmq连接工具类得到连接

                  Connection connection=ConnectionUtil.getConnection();

                  //创建通道

                  Channel channel = connection.createChannel();

                  /*

                   * 创建消息队列(如果有就不用创建,但创建不会错)

                   * 第一参数:队列名称

                   * 第二参数:该队列是否持久化

                   * 第三参数:该队列是否被独占

                   * 第四参数:该队列是否自动删除

                   * 第五参数:

                   *

                   */

                  channel.queueDeclare("test_work_queue", false, false, false, null);

                  for(int i=1;i<=100;i++) {

                         //创建消息

                         String message="work_queue"+i;

                         /*

                          * 发送消息

                          * 第一参数:交换机名

                          * 第二参数:队列名称

                          * 第三参数:

                          * 第四参数:消息(字节流)

                          *

                          */

                         channel.basicPublish("", "test_work_queue", null, message.getBytes());

                         System.out.println("work_发送的消息:"+message);

                  }

                  //关闭资源

                  channel.close();

                  connection.close();

           }

    }

    a)      work模式平均接受消息(一个就是一个消费者,可以创建多个消费者,不过是平均分配消息。)

    package com.work.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.QueueingConsumer;

    import com.rabbitmq.client.QueueingConsumer.Delivery;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   work模式接受消息1

     * @author KFS

     *

     */

    public class Receive1 {

           public static void main(String[] args) throws Exception{

                  //得到连接

                  Connection connection=ConnectionUtil.getConnection();

                  //创建通道

                  Channel channel = connection.createChannel();

                  /*

                   * 创建消息队列(如果有就不用创建,但创建不会错)

                   * 第一参数:队列名称

                   * 第二参数:该队列是否持久化

                   * 第三参数:该队列是否被独占

                   * 第四参数:该队列是否自动删除

                   * 第五参数:

                   *

                   */

                  channel.queueDeclare("test_work_queue", false, false, false, null);

                  //创建队列消费者

                  QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

                  /*

                   * 监听队列中的内容

                   * 第一参数:队列名称

                   * 第二参数:是否自动回复

                   *

                   */

                  channel.basicConsume("test_work_queue",true, queueingConsumer);

                 

                  //获取消息

                  while(true) {

                         Delivery nextDelivery = queueingConsumer.nextDelivery();

                         String message=new String(nextDelivery.getBody());

                         Thread.sleep(1000);

                         System.out.println("接受消息:"+message);

                  }

           }

    }

    b)      work模式能者多劳接受消息(一个生产者,一个队列,可以创建多个消费者,不过这是能者多劳的)

    package com.work.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.QueueingConsumer;

    import com.rabbitmq.client.QueueingConsumer.Delivery;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   work模式接受消息1

     * @author KFS

     *

     */

    public class Receive1 {

           public static void main(String[] args) throws Exception{

                  //得到连接

                  Connection connection=ConnectionUtil.getConnection();

                  //创建通道

                  Channel channel = connection.createChannel();

                  /*

                   * 创建消息队列(如果有就不用创建,但创建不会错)

                   * 第一参数:队列名称

                   * 第二参数:该队列是否持久化

                   * 第三参数:该队列是否被独占

                   * 第四参数:该队列是否自动删除

                   * 第五参数:

                   *

                   */

                  channel.queueDeclare("test_work_queue", false, false, false, null);

                  //创建队列消费者

                  QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

                  //设置同一时刻只会发送一条消息给消费者(要放在监听队列内容之上)

                  channel.basicQos(1);

                  /*

                   * 监听队列中的内容

                   * 第一参数:队列名称

                   * 第二参数:是否自动回复(能者多劳需要手动回复)

                   *

                   */

                  channel.basicConsume("test_work_queue",false, queueingConsumer);

                 

                 

                  //获取消息

                  while(true) {

                         Delivery nextDelivery = queueingConsumer.nextDelivery();

                         String message=new String(nextDelivery.getBody());

                         Thread.sleep(10);

                         System.out.println("接受消息:"+message);

                         //手动回复

                         channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

                  }

           }

    }

    订阅模式,交换机,exchange:fanout(一个生产者,一个交换机,多个队列,多个消费者)

    1.订阅模式发送消息

    package com.fanout.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   订阅模式(exchange,交换机)发送消息

     * @author KFS

     *

     */

    public class Send {

           public static void main(String[] args) throws Exception{

                  //通过rabbitmq连接工具类得到连接

                  Connection connection=ConnectionUtil.getConnection();

                  //创建通道

                  Channel channel = connection.createChannel();

                  /*

                   * 创建交换机exchange

                   * 第一参数:交换机名称

                   * 第二参数:交换机类型:

                   *

                   */

                  channel.exchangeDeclare("test_fanout", "fanout");

                 

                  //消息内容

                  String message="testFanout";

                  /*

                   * 发送消息

                   * 第一参数:交换机名称

                   * 第二参数:

                   * 第三参数:

                   * 第四参数:消息(字节流)

                   *

                   */

                  channel.basicPublish("test_fanout", "", null, message.getBytes());

                  System.out.println("发送消息:"+message);

                 

                  //关闭资源

                  channel.close();

                  connection.close();

           }

    }

    2.订阅模式接收消息(一个生产者,一个交换机,多个队列,队列名称不同,多个消费者,消费者都能收到消息)

    package com.fanout.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.QueueingConsumer;

    import com.rabbitmq.client.QueueingConsumer.Delivery;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   订阅模式接受消息

     * @author KFS

     *

     */

    public class Receive1 {

           public static void main(String[] args) throws Exception{

                  //通过rabbitmq连接工具类得到连接

                  Connection connection=ConnectionUtil.getConnection();

                  //创建通道

                  Channel channel = connection.createChannel();

                  /*

                   * 创建交换机exchange

                   * 第一参数:交换机名称

                   * 第二参数:交换机类型:

                   *

                   */

                  channel.exchangeDeclare("test_fanout", "fanout");

                  //创建队列

                  channel.queueDeclare("test_fanout_queue1", false, false, false, null);

                  /*

                   * 绑定队列到交换机

                   * 第一参数:队列名称

                   * 第二参数:交换机名称

                   * 第三参数:

                   *

                   */

                  channel.queueBind("test_fanout_queue1", "test_fanout", "");

            // 同一时刻服务器只会发一条消息给消费者

            channel.basicQos(1);

           

            //定义消费者

            QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

            //监听队列

            channel.basicConsume("test_fanout_queue1", false,queueingConsumer);

           

            while(true) {

                   //获取消息

                   Delivery nextDelivery = queueingConsumer.nextDelivery();

                   String message=new String(nextDelivery.getBody());

                   System.out.println(message);

                   Thread.sleep(10);

                  

                   //手动回复完成

                   channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

            }

           

           

           }

    }

    路由模式,exchange:direct(和订阅模式有相同点,一个生产者,一个交换机,多个队列,不同的队列名,多个消费者)

    1.路由模式发送消息

    package com.direct.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   路由模式redirect发送消息

     * @author KFS

     *

     */

    public class Send {

           public static void main(String[] args) throws Exception{

                  //1.通过rabbitmq连接工具类创建连接

                  Connection connection = ConnectionUtil.getConnection();

                  //2通过连接.创建通道

                  Channel channel = connection.createChannel();

                  /*

                   * 3.创建交换机

                   * 第一参数:交换机名称

                   * 第二参数:交换机类型

                   *

                   */

                  channel.exchangeDeclare("test_direct_exchange", "direct");

                 

                  String message="test_direct";

                  /*

                   * 4.发送消息

                   * 第一参数:交换机名称

                   * 第二参数:钥匙(接受方的是这个的才能接受)

                   * 第三参数:

                   * 第四参数:消息(字节流)

                   *

                   */

                  channel.basicPublish("test_direct_exchange", "key1", null, message.getBytes());

                  System.out.println("发送消息:"+message);

                 

                  //关闭资源

                  channel.close();

                  connection.close();

           }

    }

    2.路由模式接收消息(一个生产者,一个交换机,多个队列,队列名称不同,多个消费者,消费者都能收到消息)

    package com.direct.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.QueueingConsumer;

    import com.rabbitmq.client.QueueingConsumer.Delivery;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   路由模式接收消息1

     * @author KFS

     *

     */

    public class Receive1 {

           public static void main(String[] args) throws Exception{

                  //1.通过rabbitmq连接工具类创建连接

                  Connection connection = ConnectionUtil.getConnection();

                  //2.通过连接创建通道

                  Channel channel = connection.createChannel();

                  /*

                   * 3.通过通道创建队列

                   *

                   */

                  channel.queueDeclare("test_direct_queue1", false, false, false, null);

                  /*

                   * 4.绑定队列和交换机

                   * 第一参数:队列名称

                   * 第二参数:交换机名称

                   * 第三参数:钥匙

                   *

                   */

                  channel.queueBind("test_direct_queue1", "test_direct_exchange", "key1");

                  //同一时刻服务器只发一条消息

                  channel.basicQos(1);

                 

                  //定义队列消费者

                  QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

                  //监听队列,设置手动回复

                  channel.basicConsume("test_direct_queue1",false, queueingConsumer);

                 

                  //获取消息

                  Delivery nextDelivery = queueingConsumer.nextDelivery();

                  String message=new String(nextDelivery.getBody());

                  System.out.println(message);

                 

                  //手动回复

                  channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

           }

    }

    通配符模式(一个发送者,一个交换机,多个队列,多个消费者)

    1.通配符发送消息

    package com.topic.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   通配符模式topic发送消息

     * @author KFS

     *

     */

    public class Send {

           public static void main(String[] args) throws Exception{

                  //1.通过rabbitmq连接工具类创建连接

                  Connection connection = ConnectionUtil.getConnection();

                  //2.通过连接创建通道

                  Channel channel = connection.createChannel();

                  //3.通过通道创建交换机

                  channel.exchangeDeclare("test_topic_exchange", "topic");

                  String message="test_topic";

                  //4.发送消息

                  channel.basicPublish("test_topic_exchange", "key.1", null, message.getBytes());

                  System.out.println("发送消息:"+message);

                 

                  //关闭资源

                  channel.close();

                  connection.close();

           }

    }

    2.通配符接收消息

    package com.topic.rabbitmq;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.QueueingConsumer;

    import com.rabbitmq.client.QueueingConsumer.Delivery;

    import com.rabbitmq.util.ConnectionUtil;

    /**

     *   通配符模式topic接收消息1

     * @author KFS

     *

     */

    public class Receive1 {

           public static void main(String[] args) throws Exception{

                  //1.通过rabbitmq连接工具类创建连接

                  Connection connection = ConnectionUtil.getConnection();

                  //2.通过连接创建通道

                  Channel channel = connection.createChannel();

                  //3.通过通道创建队列

                  channel.queueDeclare("test_topic_queue1", false, false, false, null);

                  //4.通过通道绑定队列

                  channel.queueBind("test_topic_queue1", "test_topic_exchange", "key.*");

                  //同一时刻服务器只发送一条消息

                  channel.basicQos(1);

                  //5.通过通道创建消费者

                  QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

                  //6.监听队列

                  channel.basicConsume("test_topic_queue1", queueingConsumer);

                 

                  //7.接收消息

                  Delivery nextDelivery = queueingConsumer.nextDelivery();

                  String message=new String(nextDelivery.getBody());

                  System.out.println(message);

                 

           }

    }

    RabbitMQ和Spring整合

    1.监听者类方法

    package com.spring.rabbitmq;

    /**

     *   监听类方法

     * @author KFS

     *

     */

    public class SpringReceive1 {

           public void receive1(String msg) {

                  System.out.println("接收消息:"+msg);

           }

    }

    2.spring和rabbitmq整合xml:applicationContent-rabbitmq.xml

    <?xml version="1.0" encoding="UTF-8"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

           xmlns:rabbit="http://www.springframework.org/schema/rabbit"

           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

                  http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

          

           <!-- 发送者 -->

           <!-- 定义rabbitmq连接工厂 -->

           <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

          

           <!-- 创建rabbitmq管理 -->

           <rabbit:admin connection-factory="factory"/>

           <!-- 创建交换机: -->

           <rabbit:fanout-exchange name="springExchange" auto-declare="true">

           </rabbit:fanout-exchange>

           <!-- 创建rabbitmq模板 -->

           <rabbit:template id="template" exchange="springExchange" connection-factory="factory"/>

          

          

           <!-- 接收者 -->

           <!-- 定义队列:队列和交换机绑定去网页上绑定 -->

           <rabbit:queue name="fanoutQueue" auto-declare="true"/>

          

           <!-- 队列监听 -->

           <rabbit:listener-container connection-factory="factory">

                  <rabbit:listener ref="springReceive1" method="receive1" queue-names="fanoutQueue"/>

           </rabbit:listener-container>

           <!-- 定义监听类 -->

           <bean id="springReceive1" class="com.spring.rabbitmq.SpringReceive1"/>

          

    </beans>

    3.发送和接收代码

    package com.spring.rabbitmq;

    import org.springframework.amqp.rabbit.core.RabbitTemplate;

    import org.springframework.context.support.AbstractApplicationContext;

    import org.springframework.context.support.ClassPathXmlApplicationContext;

    /**

     *   rabbitmq和spring整合发送消息和监听接收消息

     * @author KFS

     *

     */

    public class SpringSend {

           public static void main(String[] args) throws Exception{

                  //读取xml文件

                  AbstractApplicationContext applicationContext=new ClassPathXmlApplicationContext("classpath:applicationContent-rabbitmq.xml");

                  //获取rabbitmq模板对象

                  RabbitTemplate rabbitTemplate=(RabbitTemplate)applicationContext.getBean("template");

                  //发送消息

                  rabbitTemplate.convertAndSend("spring_send");

                  //监听者会一直监听

                 

                  //休眠

                  Thread.sleep(1000);

                  //摧毁容器

                  applicationContext.destroy();

           }

    }

    RabbitMQ的使用(后台系统修改,删除,添加商品会触发消息队列,在搜索系统接收消息在进行相应操作)

    1.后台系统

    a)      需要的rabbitmq与spring的整合文件:applicationContext-rabbitmq.xml

    <?xml version="1.0" encoding="UTF-8"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

           xmlns:rabbit="http://www.springframework.org/schema/rabbit"

           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

                  http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

           <!-- rabbitmq生产者 -->

           <!-- 配置连接工厂 -->

           <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

          

           <!-- MQ的管理器:管理队列和交换机 -->

           <rabbit:admin connection-factory="factory"/>

           <!-- 定义交换机 -->

           <rabbit:topic-exchange name="taotao_item_topic" auto-declare="true">

           </rabbit:topic-exchange>

           <!-- rabbitmq模板 -->

           <rabbit:template id="rabbitTemplate" connection-factory="factory" exchange="taotao_item_topic"></rabbit:template>

          

    </beans>

    b)      商品业务层代码

    package com.taotao.manage.service;

    import java.util.Date;

    import java.util.HashMap;

    import java.util.Map;

    import org.springframework.amqp.rabbit.core.RabbitTemplate;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Service;

    import com.fasterxml.jackson.core.JsonProcessingException;

    import com.fasterxml.jackson.databind.ObjectMapper;

    import com.taotao.manage.bean.Item;

    import com.taotao.manage.bean.ItemDesc;

    import com.taotao.manage.bean.ItemParamItem;

    @Service

    public class ItemServiceImp extends BaseServiceImp<Item> implements ItemService{

           @Autowired

           private ItemDescService itemDescService;

           @Autowired

           private ItemParamItemService itemParamItemService;

           @Autowired

           private RabbitTemplate rabbitTemplate;

           private ObjectMapper objectMapper=new ObjectMapper();

          

           /**

            *   添加商品和商品描述

            * @param item

            * @param desc

            */

           @Override

           public void insertItem(Item item, String desc,String itemParams) {

                  item.setStatus(1);

                  this.insert(item);

                 

                  ItemDesc record=new ItemDesc();

                  record.setItemDesc(desc);

                  record.setItemId(item.getId());

                  itemDescService.insert(record);

                 

                  ItemParamItem itemParamItem=new ItemParamItem();

                  itemParamItem.setItemId(item.getId());

                  itemParamItem.setParamData(itemParams);

                  itemParamItemService.insert(itemParamItem);

                 

                  //以下是消息队列

                  Map<String, Object> message=new HashMap<>();

                  message.put("type", "item.insert");

                  message.put("date", new Date());

                  try {

                         message.put("item", objectMapper.writeValueAsString(item));

                         message.put("itemDesc", objectMapper.writeValueAsString(record));

                         message.put("itemParamItem", objectMapper.writeValueAsString(itemParamItem));

                         rabbitTemplate.convertAndSend("item.insert", message);

                  } catch (JsonProcessingException e) {

                         e.printStackTrace();

                  }

                 

           }

           /**

            *   修改商品和商品描述

            */

           @Override

           public void updateItem(Item item, String desc,String itemParams,Long itemParamId) {

                  item.setStatus(1);

                  this.update(item);

                 

                  ItemDesc itemDesc=new ItemDesc();

                  itemDesc.setItemId(item.getId());

                  itemDesc.setItemDesc(desc);

                  itemDescService.update(itemDesc);

                 

                  ItemParamItem itemParamItem=new ItemParamItem();

                  itemParamItem.setId(itemParamId);

                  itemParamItem.setParamData(itemParams);

                  itemParamItemService.update(itemParamItem);

                 

                  //以下是消息队列

                  Map<String, Object> message=new HashMap<>();

                  message.put("type", "item.update");

                  message.put("date", new Date());

                  try {

                         message.put("item", objectMapper.writeValueAsString(item));

                         message.put("itemDesc", objectMapper.writeValueAsString(itemDesc));

                         message.put("itemParamItem", objectMapper.writeValueAsString(itemParamItem));

                         rabbitTemplate.convertAndSend("item.update", message);

                  } catch (JsonProcessingException e) {

                         e.printStackTrace();

                  }

           }

           /**

            *   批量修改商品状态status:1-正常,2-下架,3-删除

            */

           @Override

           public void updateStatus(Long[] ids,Integer status) {

                  Item item=new Item();

                  item.setStatus(status);

                  for(Long id:ids) {

                         item.setId(id);

                         this.update(item);

                        

                         //以下是消息队列

                         Map<String, Object> message=new HashMap<>();

                         message.put("type", "item.delete");

                         message.put("date", new Date());

                         try {

                                message.put("item", objectMapper.writeValueAsString(item));

                                rabbitTemplate.convertAndSend("item.delete", message);

                         } catch (JsonProcessingException e) {

                                e.printStackTrace();

                         }

                  }

           }

          

    }

    2.搜索系统

    a)      rabbitmq与spring的整合文件:applicationContext-rabbitmq.xml

    <?xml version="1.0" encoding="UTF-8"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

           xmlns:rabbit="http://www.springframework.org/schema/rabbit"

           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

                  http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

           <!-- rabbitmq消费者 -->

           <!-- 配置连接工厂 -->

           <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

          

           <!-- MQ的管理器:管理队列和交换机 -->

           <rabbit:admin connection-factory="factory"/>

           <!-- 配置队列 -->

           <rabbit:queue name="taotao_item_search" auto-declare="true"></rabbit:queue>

           <bean id="itemsListener" class="com.taotao.search.listener.ItemsListener">

           </bean>

           <!-- 队列监听 -->

           <rabbit:listener-container connection-factory="factory">

                  <rabbit:listener ref="itemsListener" method="execut" queue-names="taotao_item_search"/>

           </rabbit:listener-container>

          

    </beans>

    b)      监听代码:rabbitmq传消息可以传Object类,但监听只能监听到String,其他的会无限循环。

    package com.taotao.search.listener;

    import java.util.Map;

    import org.apache.commons.lang3.StringUtils;

    import org.springframework.beans.factory.annotation.Autowired;

    import com.fasterxml.jackson.databind.ObjectMapper;

    import com.taotao.manage.bean.Item;

    import com.taotao.search.service.SearchService;

    /**

     *   rabbitmq监听类

     * @author KFS

     *

     */

    public class ItemsListener {

           @Autowired

           private SearchService searchService;

           private ObjectMapper objectMapper=new ObjectMapper();

           public void execut(Map<String, Object> map) throws Exception{

                  /*

                   * rabbitmq传消息可以传Object类,但监听只能监听到String,其他的会无限循环。

                   */

                  String type=map.get("type").toString();

                  if(StringUtils.equals(type, "item.insert") || StringUtils.equals(type, "item.update")) {

                         Item item = objectMapper.readValue(map.get("item").toString(), Item.class);

                         com.taotao.search.bean.Item record=new com.taotao.search.bean.Item();

                         record.setId(item.getId());

                         record.setTitle(item.getTitle());

                         record.setSellPoint(item.getSellPoint());

                         record.setPrice(item.getPrice());

                         record.setNum(item.getNum());

                         record.setBarcode(item.getBarcode());

                         record.setImage(item.getImage());

                         record.setCid(item.getCid());

                         record.setStatus(item.getStatus());

                         searchService.insert(record);

                  }else if(StringUtils.equals(type, "item.delete")) {

                         Item item = objectMapper.readValue(map.get("item").toString(), Item.class);

                         searchService.delete(item.getId());

                  }

                 

           }

    }

  • 相关阅读:
    MySQL令人咋舌的隐式转换
    MySQL 数据库基础(二)(MySQL 服务基础与使用 MySQL 数据库)
    以友盟+U-Push为例,深度解读消息推送的筛选架构解决方案应用与实践
    逆向工程,调试Hello World !程序(更新中)
    520了,用32做个简单的小程序
    postgresql 数据库 update更新慢的原因(已解决)
    面试题单例模式的五种写法(枚举妙用)
    人工智能能力提升指导总结
    数据结构-队列(2)-循环队列
    数据结构-队列(1)
  • 原文地址:https://www.cnblogs.com/kfsrex/p/11853645.html
Copyright © 2011-2022 走看看