zoukankan      html  css  js  c++  java
  • ActiveMQ消息队列技术融合Spring

    异步调用:消息队列不用去理会接收方,发送方发送完消息后不用去理会

    削峰填谷:当一瞬间大量请求进入,消息队列可对数据进行积压,慢慢去消费数据。

    解耦:通过消息队列技术实现,我一个应用程序通过消息队列发一个数据给另外一个,让他帮我完成另外一个功能

    一、业务逻辑

      我想在修改一个物品的状态时,同时发送广播,给对应的监听器去实现,此商品存储到solr中,同时通过网页静态模板生成一个当前物品的详情页面,此时用到了广播机制

      当我删除一个商品时,发送一个广播,给对应的监听器,同时删除solr中对应的物品。

    广播机制:必须要同时在线,才能接收我的消息

       使用消息中间件需要导入配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context.xsd">
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.200.128:61616"/>  
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <property name="connectionFactory" ref="connectionFactory"/>  
        </bean>   
        
        <!-- 发布订阅模式, 商品导入索引库和生成静态页面 -->
        <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">
              <!--将商品上架所有的商品的id发送到这个队列中-->
             <constructor-arg value="youlexuan_topic_page_solr"/>
        </bean>
        <!-- 点对点模式-->
        <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!--将商品上架所有的商品的id发送到这个队列中-->
            <constructor-arg value="youlexuan_queue_solr_delete"/>
        </bean>  
        
    </beans>

    发布广播:

    if ("1".equals(status)){
    jmsTemplate.send(topicPageAndSolrDestination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    TextMessage textMessage = session.createTextMessage(String.valueOf(id));
    return textMessage;
    }
    });
    }

    监听器1,将当前商品存入solr中:操作solr的服务器配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context.xsd">
          <!--产生Connection-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.200.128:61616"/>  
        </bean>
        <!--spring 管理connectionFactory-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">    
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>
        <!--发布订阅模式   将数据导入solr 索引库-->
        <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">  
            <constructor-arg value="youlexuan_topic_page_solr"/>
        </bean>
        <!--发布订阅模式   消息监听容器 将数据导入solr 索引库-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicPageAndSolrDestination" />
            <property name="messageListener" ref="pageAndSolrListener" />
        </bean>
    #对应的用来监听执行往solr中保存库存的消息 <bean id="pageAndSolrListener" class="com.ghh.sellergoods.service.listener.ItemSearchListener"></bean> <!--点对点的模式 删除索引库--> <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!--指定从这个队列中 接收下架商品的--> <constructor-arg value="youlexuan_queue_solr_delete"/> </bean> <!--点对点的模式 消息监听器 删除索引库--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueSolrDeleteDestination" /> <property name="messageListener" ref="itemDeleteListener" /> </bean> <bean id="itemDeleteListener" class="com.ghh.sellergoods.service.listener.ItemDeleteListener"></bean> </beans>

    监听器类

    public class ItemSearchListener implements MessageListener {
        @Autowired
        private SearchService searchService;
        @Autowired
        private ItemDao itemDao;
        @Override
        public void onMessage(Message message) {
            //获取生产者发布的广播,往solr中添加库存列表
            ActiveMQTextMessage atm = (ActiveMQTextMessage) message;
            try {
                //获取广播中的数据。
                Long goodsId = Long.valueOf(atm.getText());
                //通过传过来的商品id去查询库存表
                ItemQuery query = new ItemQuery();
                ItemQuery.Criteria criteria = query.createCriteria();
                criteria.andGoodsIdEqualTo(goodsId);
                //查询对应商品id的库存表
                List<Item> items = itemDao.selectByExample(query);
            //调用对应的方法,往solr中添加当前商品对应库存信息 searchService.importList(items); }
    catch (JMSException e) { e.printStackTrace(); } } }

    监听器类2:配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context.xsd"> 
        <!--产生Connection工厂类-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.200.128:61616"/>  
        </bean>
        <!--spring管理工厂类-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">    
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>
        <!--发布订阅模式  生成页面-->
        <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">  
              <!--指定从这个队列上获取上架的商品id-->
              <constructor-arg value="youlexuan_topic_page_solr"/>
        </bean>
        <!--发布订阅模式 消息监听器 生成页面-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicPageAndSolrDestination" />
            <property name="messageListener" ref="pageListener" />
        </bean>
        <bean id="pageListener" class="com.ghh.core.service.listener.PageListener"></bean>
        
    </beans>

    监听器类2:生成静态网页模板

    public class PageListener implements MessageListener {
        @Autowired
        private CmsService cmsService;
        @Override
        public void onMessage(Message message) {
            ActiveMQTextMessage atm = (ActiveMQTextMessage) message;
            try {
                Long goodsId = Long.valueOf(atm.getText());
                Map<String, Object> goodsData = cmsService.findGoodsData(goodsId);
                cmsService.createStaticPage(goodsId,goodsData);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    点对点

      当我删除商品时,我需要对应的服务进行删除solr中库存信息,添加和删除使用的是同一个服务中,使用的是上面的配置文件

        //发布广播,
        @Autowired
        private ActiveMQTopic topicPageAndSolrDestination;
    //在修改的代码方法中来广播发布当前商品的id
    
    if (ids.length>0) {
                jmsTemplate.send(queueSolrDeleteDestination, new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws JMSException {
                        TextMessage textMessage = session.createTextMessage(String.valueOf(ids));
                        return textMessage;
                    }
                });
            }
    #执行删除solr中库存信息

    public
    class ItemDeleteListener implements MessageListener { @Autowired private SearchService searchService; @Override public void onMessage(Message message) { ActiveMQTextMessage atm = (ActiveMQTextMessage) message; try { Long goodsId = Long.valueOf(atm.getText()); searchService.deleteById(goodsId); } catch (JMSException e) { e.printStackTrace(); } } }
  • 相关阅读:
    【云速建站】购买前的指导
    【云速建站】域名配置指导
    Python爬虫批量下载糗事百科段子,怀念的天王盖地虎,小鸡炖蘑菇...
    舌尖上的安全
    【云速建站】视频播放专题
    Python装饰器总结,带你几步跨越此坑!
    让你提前认识软件开发(15):程序调试的利器—日志
    Win8下IIS的安装和站点的公布
    [2011山东ACM省赛] Mathman Bank(模拟题)
    Android UI开发神兵利器之Android Action Bar Style Generator
  • 原文地址:https://www.cnblogs.com/guanyuehao0107/p/11946581.html
Copyright © 2011-2022 走看看