说了那么多,还不是为了在项目中进行实战吗,在实践中检验真理,不然我学他干嘛,不能解决项目中的实际问题的技术都是耍流氓。。。
一、后台管理系统发送消息
瞎咧咧:后台管理系统发送消息到交换机中,然后通知其他系统进行相应的操作,这岂不是美滋滋
1、导入依赖
注意:这里使用的是spring封装的rabbitMQ,只需要导入这一个依赖就可以了,这个版本有点老了
1 <dependency> 2 <groupId>org.springframework.amqp</groupId> 3 <artifactId>spring-rabbit</artifactId> 4 <version>1.4.0.RELEASE</version> 5 </dependency>
2、队列和交换机的绑定关系
实现:
2.1 在配置文件中将队列和交换机完成绑定
2.2 可以在管理界面中完成绑定
2.2.1 绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
2.2.2 管理更加灵活
2.2.3 更容易对绑定关系的权限管理,流程管理
3、配置文件
注意:这个单独的放在一个配置文件中 applicationContext-rabbitmq.xml
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" 3 xsi:schemaLocation="http://www.springframework.org/schema/rabbit 4 http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd 5 http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> 7 8 <!-- 定义RabbitMQ的连接工厂 --> 9 <rabbit:connection-factory id="connectionFactory" 10 host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" 11 password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" /> 12 13 <!-- 管理 --> 14 <rabbit:admin connection-factory="connectionFactory" /> 15 16 <!-- 定义交换机 --> 17 <rabbit:topic-exchange name="TAOTAO-ITEM-EXCHANGE" 18 auto-declare="true" durable="true" /> 19 20 <!-- 定义模板 --> 21 <rabbit:template id="rabbitTemplate" 22 connection-factory="connectionFactory" exchange="TAOTAO-ITEM-EXCHANGE" /> 23 24 25 </beans>
4、具体代码实现
注意:这个实现是把发送消息的代码抽离出来了,具体发送的内容包括商品的id、操作类型(就是具体的删除、更新还是插入操作)、时间戳,那个ObjectMapper是jackson jar包中转json的类
1 @Autowired 2 private RabbitTemplate RabbitTemplate; 3 4 private static final ObjectMapper MAPPER = new ObjectMapper(); 5 6 private void sendMsg(Long itemId, String type) { 7 try { 8 // 发送消息到MQ队列中,并通知其他系统 9 Map<String, Object> msg = new HashMap<String, Object>(); 10 msg.put("itemId", itemId); 11 msg.put("type", type); 12 msg.put("date", System.currentTimeMillis()); 13 this.RabbitTemplate.convertAndSend("item." + type, MAPPER.writeValueAsString(msg)); 14 } catch (Exception e) { 15 e.printStackTrace(); 16 } 17 } 18 19 //例如:更新操作中,在方法的最后就直接调用此方法就可以了 20 // 发送消息到MQ队列中,并通知其他系统 21 sendMsg(item.getId(), "update");
二、 前台系统接收消息
1、导入依赖
1 <dependency> 2 <groupId>org.springframework.amqp</groupId> 3 <artifactId>spring-rabbit</artifactId> 4 <version>1.4.0.RELEASE</version> 5 </dependency>
2、配置文件
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" 3 xsi:schemaLocation="http://www.springframework.org/schema/rabbit 4 http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd 5 http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> 7 8 <!-- 定义RabbitMQ的连接工厂 --> 9 <rabbit:connection-factory id="connectionFactory" 10 host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" 11 password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" /> 12 13 <!-- 管理 --> 14 <rabbit:admin connection-factory="connectionFactory" /> 15 16 <!-- 定义队列 --> 17 <rabbit:queue name="TAOTAO-WEB-ITEM-QUEUE" auto-declare="true" 18 durable="true" /> 19 20 <!-- 消费者对象 --> 21 <bean id="itemMQHandler" class="com.taotao.web.mq.handler.ItemMQHandler" /> 22 23 <!-- 监听队列 --> 24 <rabbit:listener-container 25 connection-factory="connectionFactory"> 26 <rabbit:listener ref="itemMQHandler" method="execute" 27 queue-names="TAOTAO-WEB-ITEM-QUEUE" /> 28 </rabbit:listener-container> 29 30 </beans>
3、具体处理逻辑
1 @Autowired 2 private RedisService redisService; 3 4 private static final ObjectMapper MAPPER = new ObjectMapper(); 5 6 /** 7 * 删除缓存中的数据,完成数据同步 8 * 9 * @param msg 10 */ 11 public void execute(String msg) { 12 try { 13 JsonNode jsonNode = MAPPER.readTree(msg); 14 Long itemId = jsonNode.get("itemId").asLong(); 15 String key = ItemService.REDIS_KEY + itemId; 16 this.redisService.del(key); 17 } catch (Exception e) { 18 e.printStackTrace(); 19 } 20 21 }
4、在界面管理工具中完成绑定关系
注意:这个操作是在 Exchanges菜单下面完成的
三、搜索系统中接收消息
1、导入依赖
1 <dependency> 2 <groupId>org.springframework.amqp</groupId> 3 <artifactId>spring-rabbit</artifactId> 4 <version>1.4.0.RELEASE</version> 5 </dependency>
2、配置文件
注意:基本上是复制粘贴那个前台系统的
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" 3 xsi:schemaLocation="http://www.springframework.org/schema/rabbit 4 http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd 5 http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> 7 8 <!-- 定义RabbitMQ的连接工厂 --> 9 <rabbit:connection-factory id="connectionFactory" 10 host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" 11 password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}" /> 12 13 <!-- 管理 --> 14 <rabbit:admin connection-factory="connectionFactory" /> 15 16 <!-- 定义队列 --> 17 <rabbit:queue name="TAOTAO-SEARCH-ITEM-QUEUE" auto-declare="true" 18 durable="true" /> 19 20 <!-- 消费者对象 --> 21 <bean id="itemMQHandler" class="com.taotao.search.mq.handler.ItemMQHandler" /> 22 23 <!-- 监听队列 --> 24 <rabbit:listener-container 25 connection-factory="connectionFactory"> 26 <rabbit:listener ref="itemMQHandler" method="execute" 27 queue-names="TAOTAO-SEARCH-ITEM-QUEUE" /> 28 </rabbit:listener-container> 29 30 </beans>
3、业务逻辑处理
1 @Service 2 public class ItemService { 3 4 @Autowired 5 private ApiService apiService; 6 7 @Value("${TAOTAO_MANAGE_URL}") 8 private String TAOTAO_MANAGE_URL; 9 10 private static final ObjectMapper MAPPER = new ObjectMapper(); 11 12 public Item queryById(Long itemId){ 13 try { 14 String url = TAOTAO_MANAGE_URL + "/rest/api/item/"+itemId; 15 String jsonData = this.apiService.doGet(url); 16 if(StringUtils.isNotEmpty(jsonData)){ 17 return MAPPER.readValue(jsonData, Item.class); 18 } 19 } catch (Exception e) { 20 e.printStackTrace(); 21 } 22 return null; 23 } 24 }
1 public class ItemMQHandler { 2 3 private static final ObjectMapper MAPPER = new ObjectMapper(); 4 5 @Autowired 6 private HttpSolrServer httpSolrServer; 7 8 @Autowired 9 private ItemService itemService; 10 11 /** 12 * 处理消息,新增、修改、删除的消息,将商品数据同步到solr中 13 * 消息中并没有上商品的信息,需要通过商品id通过后台接口进行查询 14 * 15 * @param msg 16 */ 17 public void execute(String msg){ 18 try { 19 JsonNode jsonNode = MAPPER.readTree(msg); 20 Long itemId = jsonNode.get("itemId").asLong(); 21 String type = jsonNode.get("type").asText(); 22 if(StringUtils.equals(type, "insert") || StringUtils.equals(type, "update")){ 23 Item item = this.itemService.queryById(itemId); 24 this.httpSolrServer.addBean(item); 25 this.httpSolrServer.commit(); 26 }else if(StringUtils.equals(type, "delete")){ 27 this.httpSolrServer.deleteById(String.valueOf(itemId)); 28 this.httpSolrServer.commit(); 29 } 30 } catch (Exception e) { 31 e.printStackTrace(); 32 } 33 } 34 }
4、在界面管理工具中完成绑定关系
四、总结
使用MQ实现商品数据的同步优势:
1、 降低系统间耦合度
2、 便于管理数据的同步