zoukankan      html  css  js  c++  java
  • RabbitMQ之项目中实战

      说了那么多,还不是为了在项目中进行实战吗,在实践中检验真理,不然我学他干嘛,不能解决项目中的实际问题的技术都是耍流氓。。。

    一、后台管理系统发送消息

    瞎咧咧:后台管理系统发送消息到交换机中,然后通知其他系统进行相应的操作,这岂不是美滋滋

    1、导入依赖

    注意:这里使用的是spring封装的rabbitMQ,只需要导入这一个依赖就可以了,这个版本有点老了

    1         <dependency>
    2             <groupId>org.springframework.amqp</groupId>
    3             <artifactId>spring-rabbit</artifactId>
    4             <version>1.4.0.RELEASE</version>
    5         </dependency>

    2、队列和交换机的绑定关系

    实现:

    2.1  在配置文件中将队列和交换机完成绑定

    2.2  可以在管理界面中完成绑定

            2.2.1 绑定关系如果发生变化,需要修改配置文件,并且服务需要重启

            2.2.2 管理更加灵活

            2.2.3 更容易对绑定关系的权限管理,流程管理

    3、配置文件

    注意:这个单独的放在一个配置文件中 applicationContext-rabbitmq.xml

     1 <beans xmlns="http://www.springframework.org/schema/beans"
     2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     3     xsi:schemaLocation="http://www.springframework.org/schema/rabbit
     4     http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
     5     http://www.springframework.org/schema/beans
     6     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
     7 
     8     <!-- 定义RabbitMQ的连接工厂 -->
     9     <rabbit:connection-factory id="connectionFactory"
    10         host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
    11         password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" />
    12 
    13     <!-- 管理 -->
    14     <rabbit:admin connection-factory="connectionFactory" />
    15 
    16     <!-- 定义交换机 -->
    17     <rabbit:topic-exchange name="TAOTAO-ITEM-EXCHANGE"
    18         auto-declare="true" durable="true" />
    19 
    20     <!-- 定义模板 -->
    21     <rabbit:template id="rabbitTemplate"
    22         connection-factory="connectionFactory" exchange="TAOTAO-ITEM-EXCHANGE" />
    23         
    24     
    25 </beans>

    4、具体代码实现

    注意:这个实现是把发送消息的代码抽离出来了,具体发送的内容包括商品的id、操作类型(就是具体的删除、更新还是插入操作)、时间戳,那个ObjectMapper是jackson jar包中转json的类

     1     @Autowired
     2     private RabbitTemplate RabbitTemplate;
     3 
     4     private static final ObjectMapper MAPPER = new ObjectMapper();
     5 
     6     private void sendMsg(Long itemId, String type) {
     7         try {
     8             // 发送消息到MQ队列中,并通知其他系统
     9             Map<String, Object> msg = new HashMap<String, Object>();
    10             msg.put("itemId", itemId);
    11             msg.put("type", type);
    12             msg.put("date", System.currentTimeMillis());
    13             this.RabbitTemplate.convertAndSend("item." + type, MAPPER.writeValueAsString(msg));
    14         } catch (Exception e) {
    15             e.printStackTrace();
    16         }
    17     }
    18 
    19 //例如:更新操作中,在方法的最后就直接调用此方法就可以了
    20 // 发送消息到MQ队列中,并通知其他系统
    21 sendMsg(item.getId(), "update");

     二、 前台系统接收消息

    1、导入依赖

    1         <dependency>
    2             <groupId>org.springframework.amqp</groupId>
    3             <artifactId>spring-rabbit</artifactId>
    4             <version>1.4.0.RELEASE</version>
    5         </dependency>

    2、配置文件

     1 <beans xmlns="http://www.springframework.org/schema/beans"
     2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     3     xsi:schemaLocation="http://www.springframework.org/schema/rabbit
     4     http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
     5     http://www.springframework.org/schema/beans
     6     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
     7 
     8     <!-- 定义RabbitMQ的连接工厂 -->
     9     <rabbit:connection-factory id="connectionFactory"
    10         host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
    11         password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" />
    12 
    13     <!-- 管理 -->
    14     <rabbit:admin connection-factory="connectionFactory" />
    15 
    16     <!-- 定义队列 -->
    17     <rabbit:queue name="TAOTAO-WEB-ITEM-QUEUE" auto-declare="true"
    18         durable="true" />
    19 
    20     <!-- 消费者对象 -->
    21     <bean id="itemMQHandler" class="com.taotao.web.mq.handler.ItemMQHandler" />
    22 
    23     <!-- 监听队列 -->
    24     <rabbit:listener-container
    25         connection-factory="connectionFactory">
    26         <rabbit:listener ref="itemMQHandler" method="execute"
    27             queue-names="TAOTAO-WEB-ITEM-QUEUE" />
    28     </rabbit:listener-container>
    29 
    30 </beans>

    3、具体处理逻辑

     1     @Autowired
     2     private RedisService redisService;
     3     
     4     private static final ObjectMapper MAPPER = new ObjectMapper();
     5 
     6     /**
     7      * 删除缓存中的数据,完成数据同步
     8      * 
     9      * @param msg
    10      */
    11     public void execute(String msg) {
    12         try {
    13             JsonNode jsonNode = MAPPER.readTree(msg);
    14             Long itemId = jsonNode.get("itemId").asLong();
    15             String key = ItemService.REDIS_KEY + itemId;
    16             this.redisService.del(key);
    17         } catch (Exception e) {
    18             e.printStackTrace();
    19         }
    20 
    21     }

    4、在界面管理工具中完成绑定关系

    注意:这个操作是在 Exchanges菜单下面完成的

     三、搜索系统中接收消息

    1、导入依赖

    1         <dependency>
    2             <groupId>org.springframework.amqp</groupId>
    3             <artifactId>spring-rabbit</artifactId>
    4             <version>1.4.0.RELEASE</version>
    5         </dependency>

    2、配置文件

    注意:基本上是复制粘贴那个前台系统的

     1 <beans xmlns="http://www.springframework.org/schema/beans"
     2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     3     xsi:schemaLocation="http://www.springframework.org/schema/rabbit
     4     http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
     5     http://www.springframework.org/schema/beans
     6     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
     7 
     8     <!-- 定义RabbitMQ的连接工厂 -->
     9     <rabbit:connection-factory id="connectionFactory"
    10         host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
    11         password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" />
    12 
    13     <!-- 管理 -->
    14     <rabbit:admin connection-factory="connectionFactory" />
    15 
    16     <!-- 定义队列 -->
    17     <rabbit:queue name="TAOTAO-SEARCH-ITEM-QUEUE" auto-declare="true"
    18         durable="true" />
    19 
    20     <!-- 消费者对象 -->
    21     <bean id="itemMQHandler" class="com.taotao.search.mq.handler.ItemMQHandler" />
    22 
    23     <!-- 监听队列 -->
    24     <rabbit:listener-container
    25         connection-factory="connectionFactory">
    26         <rabbit:listener ref="itemMQHandler" method="execute"
    27             queue-names="TAOTAO-SEARCH-ITEM-QUEUE" />
    28     </rabbit:listener-container>
    29 
    30 </beans>

    3、业务逻辑处理

     1 @Service
     2 public class ItemService {
     3 
     4     @Autowired
     5     private ApiService apiService;
     6     
     7     @Value("${TAOTAO_MANAGE_URL}")
     8     private String TAOTAO_MANAGE_URL;
     9     
    10     private static final ObjectMapper MAPPER = new ObjectMapper();
    11     
    12     public Item queryById(Long itemId){
    13         try {
    14             String url = TAOTAO_MANAGE_URL + "/rest/api/item/"+itemId;
    15             String jsonData = this.apiService.doGet(url);
    16             if(StringUtils.isNotEmpty(jsonData)){
    17                 return MAPPER.readValue(jsonData, Item.class);
    18             }
    19         } catch (Exception e) {
    20             e.printStackTrace();
    21         }
    22         return null;
    23     }
    24 }
     1 public class ItemMQHandler {
     2 
     3     private static final ObjectMapper MAPPER = new ObjectMapper();
     4     
     5     @Autowired
     6     private HttpSolrServer httpSolrServer;
     7     
     8     @Autowired
     9     private ItemService itemService;
    10     
    11     /**
    12      * 处理消息,新增、修改、删除的消息,将商品数据同步到solr中
    13      * 消息中并没有上商品的信息,需要通过商品id通过后台接口进行查询
    14      * 
    15      * @param msg
    16      */
    17     public void execute(String msg){
    18         try {
    19             JsonNode jsonNode = MAPPER.readTree(msg);
    20             Long itemId = jsonNode.get("itemId").asLong();
    21             String type = jsonNode.get("type").asText();
    22             if(StringUtils.equals(type, "insert") || StringUtils.equals(type, "update")){
    23                 Item item  = this.itemService.queryById(itemId);
    24                 this.httpSolrServer.addBean(item);
    25                 this.httpSolrServer.commit();
    26             }else if(StringUtils.equals(type, "delete")){
    27                 this.httpSolrServer.deleteById(String.valueOf(itemId));
    28                 this.httpSolrServer.commit();
    29             }
    30         } catch (Exception e) {
    31             e.printStackTrace();
    32         }
    33     }
    34 }

    4、在界面管理工具中完成绑定关系

    四、总结

    使用MQ实现商品数据的同步优势:

    1、  降低系统间耦合度

    2、  便于管理数据的同步

  • 相关阅读:
    第九章:switch语句
    第八章:if-else语句
    第七章:运算符及运用
    第六章:名命规范
    事务
    jdbc
    Object
    容器
    Java exception
    Java OO1
  • 原文地址:https://www.cnblogs.com/ssh-html/p/10549087.html
Copyright © 2011-2022 走看看