zoukankan      html  css  js  c++  java
  • SpringJms/ActiveMq 消息中间件配置及使用

    消息生产者

    先在pom.xml加入以下依赖

         <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
         </dependency>

    src/main/resource下配置spring-jms

    spring-jms.xml配置内容

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context.xsd">
            
        
           
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.25.129:61616"/>  
        </bean>
           
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>  
               
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <property name="connectionFactory" ref="connectionFactory"/>  
        </bean>      
        <!--这个是队列目的地,导入索引库, 文本信息-->  
        <bean id="queueSolrDestination" class="org.apache.activemq.command.ActiveMQQueue">  
            <constructor-arg value="pinyougou_queue_solr"/>  
        </bean>  
        
        <!--这个是队列目的地,删除索引库-->  
        <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">  
            <constructor-arg value="pinyougou_queue_solr_delete"/>  
        </bean>  
    
        
        <!--这个是订阅模式 生成商品详细页,文本信息-->
        <bean id="topicPageDestination" class="org.apache.activemq.command.ActiveMQTopic">  
            <constructor-arg value="pinyougou_topic_page"/>  
        </bean>  
        
        <!--这个是订阅模式 删除商品详细页-->  
        <bean id="topicPageDeleteDestination" class="org.apache.activemq.command.ActiveMQTopic">  
            <constructor-arg value="pinyougou_topic_page_delete"/>  
        </bean>  
        
    
    </beans>

    控制层,向activeMq发送消息

    @RequestMapping("/updateStatus")
        public Result updateStatus(Long[] ids,String status){
            try {
                goodsService.updateStatus(ids, status);
                //更新索引库
                if("2".equals(status)){//如果是审核通过 
                    //得到需要导入的SKU列表
                    List<TbItem> itemList = goodsService.findItemListByGoodsIdListAndStatus(ids, status);
                    if(itemList.size()>0) {
                        //导入到solr
    //                    itemSearchService.importList(itemList);        
                        
                        //改成由activeMq实现
                        final String jsonString = JSON.toJSONString(itemList);        
                        jmsTemplate.send(queueSolrDestination, new MessageCreator() {    
                            @Override
                            public Message createMessage(Session session) throws JMSException {                            
                                    return session.createTextMessage(jsonString);
                            }
                        });        
    
                    }else {
                        System.out.println("没有sku明细数据");
                    }
                    
                    //静态页生成
                    for(final Long goodsId:ids){
                        //itemPageService.genItemHtml(goodsId);
                        
                        //改成由activeMq实现
                        jmsTemplate.send(topicPageDestination, new MessageCreator() {                                        
                        @Override
                        public Message createMessage(Session session) throws JMSException {                            
                            return session.createTextMessage(goodsId+"");
                        }
                    });    
                    }    
                }        
                
                return new Result(true, "修改状态成功"); 
            } catch (Exception e) {
                e.printStackTrace();
                return new Result(false, "修改状态失败");
            }
        }

    消息消费者

    pom.xml

        <!-- activeMq消息队列 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>

    src/main/resource下配置消息消费者配置文件

    applicationContext-jms-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context.xsd">
        
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.25.129:61616"/>  
        </bean>
           
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>  
        
        <!--这个是队列目的地,导入索引库-->  
        <bean id="queueSolrDestination" class="org.apache.activemq.command.ActiveMQQueue">  
            <constructor-arg value="pinyougou_queue_solr"/>  
        </bean>    
        
        <!-- 消息监听容器  导入索引库-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueSolrDestination" />
            <property name="messageListener" ref="itemSearchListener" />    <!--监听类-->
        </bean>
        
        <!--这个是队列目的地,导入索引库-->  
        <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">  
            <constructor-arg value="pinyougou_queue_solr_delete"/>  
        </bean>    
        
        <!-- 消息监听容器  导入索引库-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueSolrDeleteDestination" />
            <property name="messageListener" ref="itemDeleteListener" />
        </bean>
        
        
    </beans>

    监听类,通过监听类去调用serviceImpl真正业务方法

    package com.pinyougou.search.service.impl;
    
    import java.util.List;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSON;
    import com.pinyougou.pojo.TbItem;
    import com.pinyougou.search.service.ItemSearchService;
    /*监听类
     * activeMq更新索引库-增加
     */
    @Component
    public class ItemSearchListener implements MessageListener {
    
        @Autowired
        private ItemSearchService itemSearchService;
        
        @Override
        public void onMessage(Message message) {
            
            TextMessage textMessage=(TextMessage)message;
            try {
                String text = textMessage.getText();//json字符串
                System.out.println("监听到消息:"+text);
                
                List<TbItem> itemList = JSON.parseArray(text, TbItem.class);
                itemSearchService.importList(itemList);
                System.out.println("导入到solr索引库");
                
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
    }
  • 相关阅读:
    13 原型链_继承_this大总结_函数一定是对象,对象不一定是函数
    12 贪吃蛇游戏
    实现wiki访问
    11 第三个阶段js高级_原型
    JZOJ.5257【NOIP2017模拟8.11】小X的佛光
    模板——权值线段树(逆序对)
    LCA模板
    笛卡尔树——神奇的“二叉搜索堆”
    JZOJ.5246【NOIP2017模拟8.8】Trip
    JZOJ.5236【NOIP2017模拟8.7】利普希茨
  • 原文地址:https://www.cnblogs.com/binghuaZhang/p/14268009.html
Copyright © 2011-2022 走看看