zoukankan      html  css  js  c++  java
  • 商城08——activeMQ 使用消息队列同步索引库

    1.  课程计划

    1、什么是MQ

    2、MQ的应用场景

    3、ActiveMQ的使用方法。

    4、使用消息队列实现商品同步。

    2.  同步索引库分析

    方案一:在taotao-manager中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑。

        

        缺点:业务逻辑耦合度非常高,业务拆分不明确

    方案二:业务逻辑在taotao-search中实现,调用服务在taotao-manager实现。业务逻辑分开。

        

        缺点:

               服务之间的耦合度变高,启动有先后顺序。

               随着调用的服务会越来越多,服务之间的调用越来越复杂,难以管理。

    方案三:使用消息队列

        

        存在的问题:

                    1、如果MQ挂了,所有相关的服务都挂了

                    2、MQ有性能的瓶颈,尽量减少消息的内容的大小

     

    技术的选型和具体的业务有关,只选择合适的技术。

    如果MQ挂了:

      1.通过日志查找原因

      2.通知相关的人员修复

      3.关键的业务必须保证有备用方案

     

    3.  ActiveMQ

      MQ是一个消息中间件,比如:ActiveMQ、RabbitMQ、kafka都属于MQ,是MQ的产品。

      什么是消息中间件?

        消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)

    3.1. 什么是ActiveMQ

      ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

      主要特点:

    1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
    2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。
    3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。
    4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
    5. 支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA。
    6. 支持通过JDBC和journal提供高速的消息持久化。
    7. 从设计上保证了高性能的集群,客户端-服务器,点对点。
    8. 支持Ajax。
    9. 支持与Axis的整合。
    10. 可以很容易得调用内嵌JMS provider,进行测试。

    什么是JMS?

      JMS(JavaMessaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

           JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。

      消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。

      JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性:

    • StreamMessage : Java原始值的数据流
    • MapMessage     :一套名称-值对
    • TextMessage    :一个字符串对象
    • ObjectMessage :一个序列化的 Java对象
    • BytesMessage  :一个字节的数据流

    3.2. ActiveMQ的消息形式

      对于消息的传递有两种类型:

    • 一种是点对点,即一个生产者和一个消费者一一对应;

        

    • 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

           

    3.3.  ActiveMQ的安装

    第一步: 把ActiveMQ 的压缩包上传到Linux系统。

        Alt+p打开sftp窗口:输入put "F:/java/ziyuan/apache-activemq-5.13.0-bin.tar.gz"

    第二步:解压缩: tar  zxvf  apache-activemq-5.13.0-bin.tar.gz

    第三步:启动:使用bin目录下的activemq命令

     启动:[root@localhost bin]# ./activemq start

    关闭:[root@localhost bin]# ./activemq stop

    查看状态:[root@localhost bin]# ./activemq status

    进入管理后台:

    http://192.168.25.130:8161/admin

    用户名:admin

    密码:admin

    4.   ActiveMQ的使用方法

    点对点:(Queue)

      

    发布/订阅:(Topic)

      

    下面使用其他的工程来学习:

      

    工程需要添加jar包:

      

    4.1.点对点(Queue)

    4.1.1.    生产者(Producer):生产消息,发送端。

    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

    第二步:使用ConnectionFactory对象创建一个Connection对象。

    第三步:开启连接,调用Connection对象的start方法。

    第四步:使用Connection对象创建一个Session对象。

    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。

    第六步:使用Session对象创建一个Producer对象。

    第七步:创建一个Message对象,创建一个TextMessage对象。

    第八步:使用Producer对象发送消息。

    第九步:关闭资源。

    public class QueueProducer {
        //生产者发送消息
        @Test
        public void send() throws Exception{
            //1.创建一个连接工厂Connectionfactory,  参数:就是要连接的服务器的地址
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.132:61616");
            //2.通过工厂获取连接对象 创建连接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            
            //4.创建一个session对象  提供发送消息等方法
              //第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。
              //第二个参数:就是设置消息的应答模式.  如果第一个参数为false时,第二个参数设置才有意义。用的是自动应答
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            //5.创建目的地 queue, 参数:目的地的名称
            Queue queue = session.createQueue("queue-test");
            //6.创建个生产者
            MessageProducer producer = session.createProducer(queue);    
            //7.构建消息的内容  
            TextMessage textMessage = session.createTextMessage("queue测试发送的消息");
    //        TextMessage message = session.createTextMessage();
    //        message.setText("queue测试发送的消息");
            //8.发送消息
            producer.send(textMessage);
            
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }

    4.1.2.  消费者(Consumer):接收消息。

    第一步:创建一个ConnectionFactory对象。

    第二步:从ConnectionFactory对象中获得一个Connection对象。

    第三步:开启连接。调用Connection对象的start方法。

    第四步:使用Connection对象创建一个Session对象。

    第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。

    第六步:使用Session对象创建一个Consumer对象。

    第七步:接收消息。

    第八步:打印消息。

    第九步:关闭资源

    public class QueueCustomer {
        @Test
        public void recieve() throws Exception {
            //1.创建连接的工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.132:61616");
            //2.创建连接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            
            //4.创建session
              //第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。
              //第二个参数:就是设置消息的应答模式   如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建接收消息的一个目的地
            Queue queue = session.createQueue("queue-test");
            //6.创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
            //7.接收消息 打印
                //-------------第一种
            /*while(true){
                Message message = consumer.receive(1000000);//设置接收消息的超时时间
                //没有接收到消息就跳出循环
                if(message==null){
                    break;
                }
                if(message instanceof TextMessage){
                    TextMessage message2 = (TextMessage) message;
                    System.out.println("接收的消息为"+message2.getText());
                }
            }*/
                //-------------第二种
            //设置一个监听器,这里其实开辟了一个新的线程
            consumer.setMessageListener(new MessageListener() {
                //当有消息的时候会执行以下的逻辑
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        TextMessage message2 = (TextMessage) message;
                        try {
                            System.out.println("接收的消息为"+message2.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            Thread.sleep(199999);
            
            //8.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

    测试结果:

    Number Of Pending Messages  等待消费的消息 这个是当前未出队列的数量。

    Number Of Consumers  消费者 这个是消费者端的消费者数量

    Messages Enqueued  进入队列的消息  进入队列的总数量,包括出队列的。

    Messages Dequeued  出了队列的消息  可以理解为是消费这消费掉的数量。

    4.2. 发布/订阅(Topic)

    4.2.1.    生产者(Producer):生产消息,发送端

    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

    第二步:使用ConnectionFactory对象创建一个Connection对象。

    第三步:开启连接,调用Connection对象的start方法。

    第四步:使用Connection对象创建一个Session对象。

    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象

    第六步:使用Session对象创建一个Producer对象。

    第七步:创建一个Message对象,创建一个TextMessage对象。

    第八步:使用Producer对象发送消息。

    第九步:关闭资源。

    public class TopicProducer {
    
        // 发送topic
        @Test
        public void send() throws Exception {
            //1.创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.132:61616");
            //2.创建连接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建目的地    topic
            Topic createTopic = session.createTopic("topic-test");
            //6.创建生成者
            MessageProducer producer = session.createProducer(createTopic);
            //7.构建消息对象
            TextMessage createTextMessage = session.createTextMessage("topic发送的消息123");
            //8.发送消息
            producer.send(createTextMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }

    4.2.2.   消费者(Consumer):接收消息。

    第一步:创建一个ConnectionFactory对象。

    第二步:从ConnectionFactory对象中获得一个Connection对象。

    第三步:开启连接。调用Connection对象的start方法。

    第四步:使用Connection对象创建一个Session对象。

    第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。

    第六步:使用Session对象创建一个Consumer对象。

    第七步:接收消息。

    第八步:打印消息。

    第九步:关闭资源

     以TopicCustomer1为例:

    public class TopicCustomer1 {
        
        @Test
        public void reieve() throws Exception {
            // 1.创建连接的工厂 指定MQ服务器的地址
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.132:61616");
            // 2.获取连接
            Connection connection = connectionFactory.createConnection();
            // 3.开启连接
            connection.start();
            // 4.根据连接对象创建session (提供了操作activmq的方法)
              //第一个参数:表示是否开启分布式事务(JTA) 一般就是false :表示不开启。 只有设置了false ,第二个参数才有意义。
              //第二个参数:表示设置应答模式 自动应答和手动应答 。使用的是自动应答
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5.根据session创建目的地(destination)
            Topic topic = session.createTopic("topic-test");
            // 6.创建消费者;
            MessageConsumer consumer = session.createConsumer(topic);
            // 7.接收消息
    
            // 第一种接收消息.直接接收 只是测试的使用
            /*
             * while(true){ //设置接收消息的超时时间 单位是毫秒 Message receive =
             * consumer.receive(3000000);
             * 
             * if(receive==null){ break; }
             * 
             * //取消息 if(receive instanceof TextMessage){ TextMessage message =
             * (TextMessage)receive; String text = message.getText();//获取消息的内容
             * System.out.println(text); } }
             */
    
            // 第二种接收消息.设置一个监听器 就是开启了一个新的线程
            System.out.println("start");
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        TextMessage message2 = (TextMessage) message;
                        String text = "";
                        try {
                            text = message2.getText();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        } // 获取消息的内容
                        System.out.println(text);
                    }
                    System.out.println();
                }
            });
            System.out.println("end");
            // 睡眠
            Thread.sleep(10000000);
    
            // 9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

    4.3. 小结

    queue 是点对点模式,只能是一个生产者产生一个消息,被一个消费者消费。

    topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。

    queue 默认是存在于MQ的服务器中的,发送消息之后,消费者随时取。但是一定是一个消费者取,消费完消息也就没有了。

    topic 默认是不存在于MQ服务器中的,一旦发送之后,如果没有订阅,消息则丢失。

    5.  Activemq整合spring

    5.1. 配置

      第一步:引用相关的jar包。

    <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
            </dependency>
    View Code

    第二步:配置文件applicationContext-activemq.xml

      <bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.132:61616"></property>
        </bean>
        <!-- 通用的connectionfacotry 指定真正使用的连接工厂 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnection"></property>
        </bean>
        <!-- 接收和发送消息时使用的类,JmsTemolate模板对象-->
        <bean class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"></property>
        </bean>
        <!--<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg name="name" value="item-change-queue"></constructor-arg>
        </bean>-->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="item-change-topic"></constructor-arg>
        </bean>
        
        <!-- 监听器 -->
        <bean id="myMessageListener" class="com.itheima.activemq.spring.MyMessageListener"></bean>
        <!-- 监听容器,作用:启动线程做监听 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"></property>
            <property name="destination" ref="topicDestination"></property>
            <property name="messageListener" ref="myMessageListener"></property>
        </bean>
        
        <bean id="myMessageListener2" class="com.itheima.activemq.spring.MyMessageListener"></bean>
        <!-- 监听容器,作用:启动线程做监听 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"></property>
            <property name="destination" ref="queueDestination"></property>
            <property name="messageListener" ref="myMessageListener2"></property>
        </bean>

    5.2. 代码测试

    此处只是为了测试,使用的是同一个配置文件(生产者和消费者都使用同一个配置文件)

    5.2.1.    发送消息

    第一步:初始化一个spring容器

    第二步:从容器中获得JMSTemplate对象。

    第三步:从容器中获得一个Destination对象

    第四步:使用JMSTemplate对象发送消息,需要知道Destination

    public class Producer {
        @Test
        public void send() throws Exception{
            //1.初始化spring容器
            ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
            //2.获取到jmstemplate的对象
            JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
            //3.获取目的地 destination
            Destination destination = (Destination) context.getBean(Destination.class);
            //4.发送消息
            jmsTemplate.send(destination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("通过spring发送的消息123");
                }
            });
            Thread.sleep(100000);
        }
    }

    5.2.2.   接收消息

    创建一个MessageListener的实现类:

    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            //获取消息
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    6.  activemq整合到项目中

    需要在商品的添加/修改,删除的时候,同步索引库将数据从数据库中查询出来导入到索引库更新

    消息的发送方为:所在工程taotao-manager-service

    消息的接收方为:所在工程taotao-search-service

    两个工程都需要依赖activmq:

         <!-- activemq -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
            </dependency>

    6.1. 生产者Producer:taotao-manager-service工程中发送消息

    功能分析:

           当商品添加完成后发送一个TextMessage,包含一个商品id即可。

           接收端接收到商品id通过数据库查询到商品的信息(搜索的结果商品的信息)再同步索引库。

    6.1.1.    applicationContext-activemq.xml配置

       <bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.132:61616"></property>
        </bean>
        <!-- 通用的connectionfacotry 指定真正使用的连接工厂 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnection"></property>
        </bean>
        <!-- 接收和发送消息时使用的类,模板对象-->
        <bean class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"></property>
        </bean>
    
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="item-change-topic"></constructor-arg>
        </bean>

    6.1.2 在itemservice中添加发送消息的业务逻辑

    ItemServiceImpl 中的 saveItem()方法(添加商品描述数据方法)中增加一个 发送消息功能:商品添加完成后发送一个TextMessage,包含一个商品id即可。

    @Service
    public class ItemServiceImpl implements ItemService {
        
        //注入mapper
        @Autowired
        private TbItemMapper itemMapper;
        @Autowired
        private TbItemDescMapper itemDescMapper;
        
        @Autowired
        private JmsTemplate jmsTemplate;
        @Resource(name="topicDestination")
        private Destination topicDestination;
    
        @Override
        public TaotaoResult saveItem(TbItem item, String desc) {
            //生成商品的id (利用工具类IDUtils下的genItemId()方法生成)
            final long itemId = IDUtils.genItemId();
            
            //1.补全item的其他属性
            item.setId(itemId);
            item.setStatus((byte) 1);//商品状态:1-正常,2-下架,3-删除
            item.setCreated(new Date());
            item.setUpdated(new Date());
            //2.向商品表(item表)插入数据
            itemMapper.insert(item);
            
            //3.创建一个商品描述表对应的pojo对象
            TbItemDesc itemDesc = new TbItemDesc();
            //4.补全商品描述中的属性
            itemDesc.setItemId(itemId);
            itemDesc.setItemDesc(desc);
            itemDesc.setCreated(new Date());
            itemDesc.setUpdated(new Date());
            //5.插入商品描述数据
            itemDescMapper.insert(itemDesc);
            
            //--------------添加发送消息的业务逻辑----------
            jmsTemplate.send(topicDestination, new MessageCreator() {
                @Override
                public Message createMessage(Session arg0) throws JMSException {
                    TextMessage textMessage = arg0.createTextMessage(itemId + "");
                    return textMessage;
                }
            });
    //6.返回TaotaoResult
            return TaotaoResult.ok();
        }
    }

    6.2. 消费者Consumer:在taotao-search-service中消费消息

    6.2.1.  功能分析

    1、接收消息。需要创建MessageListener接口的实现类。

    2、取消息,取商品id。

    3、根据商品id查询数据库。

    4、创建一SolrInputDocument对象。

    5、使用SolrServer对象写入索引库。

    6、返回成功,返回TaotaoResult。

    6.2.2.  Dao层

      根据商品id查询商品信息。返回一个searchItem

       SearchItemMapper接口中添加如下的方法:

    //根据商品id查询商品信息
    public SearchItem getItemById(Long itemId);

      映射文件:

      <select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem">
            SELECT
                a.id,
                a.title,
                a.sell_point,
                a.price,
                a.image,
                b. NAME category_name,
                c.item_desc
            FROM
                tb_item a
            JOIN tb_item_cat b ON a.cid = b.id
            JOIN tb_item_desc c ON a.id = c.item_id
            WHERE a.status = 1
            AND a.id=#{itemId}
        </select>

      SearchDao类中添加如下方法:

    @Repository
    public class SearchDao {
    
        @Autowired
        private SolrServer solrServer;
        
        @Autowired
        private SearchItemMapper searchItemMapper;
    
        /*根据id更新索引库*/
        public TaotaoResult updateItemById(Long itemId) throws Exception {
            //1.调用mapper中的方法
            SearchItem searchItem = searchItemMapper.getItemById(itemId);
            //2.创建solrinputdocument
            SolrInputDocument document = new SolrInputDocument();
            //3.向文档中添加域
            document.addField("id", searchItem.getId());
            document.addField("item_title", searchItem.getTitle());
            document.addField("item_sell_point", searchItem.getSell_point());
            document.addField("item_price", searchItem.getPrice());
            document.addField("item_image", searchItem.getImage());
            document.addField("item_category_name", searchItem.getCategory_name());
            document.addField("item_desc", searchItem.getItem_desc());
            //4.添加文档到索引库中
            solrServer.add(document);
            //5.提交
            solrServer.commit();
            return TaotaoResult.ok();
        }
    }

    6.2.3.  Service层

    参数:商品ID

    业务逻辑:

    1、根据商品id查询商品信息。

    2、创建一SolrInputDocument对象。

    3、使用SolrServer对象写入索引库。

    4、返回成功,返回TaotaoResult。

    返回值:TaotaoResult

    接口定义:

      /**根据商品的id查询商品的数据,并且更新到索引库中*/
        public TaotaoResult updateItemById(Long itemId) throws Exception;

    实现类实现:

      @Override
        public TaotaoResult updateItemById(Long itemId) throws Exception {
            return searchDao.updateItemById(itemId);
        }

    6.2.4.  Listener

    public class ItemChangeMessageListener implements MessageListener {
    
        @Autowired
        private SearchService searchService;
        
        @Override
        public void onMessage(Message message) {
            try {
                
                TextMessage textMessage = null;
                Long itemId = null; 
                //判断消息类型是否为textMessage,如果是,就获取商品的id
                if (message instanceof TextMessage) {
                    textMessage = (TextMessage) message;
                    itemId = Long.parseLong(textMessage.getText());
                }
                //根据商品的id查询商品的数据,并且更新到索引库中
                searchService.updateItemById(itemId);                        
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
    }

    1.1.1.    Spring配置监听

    在taotao-search-service的配置文件中,配置:

    <bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.132:61616"></property>
        </bean>
        <!-- 通用的connectionfacotry 指定真正使用的连接工厂 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnection"></property>
        </bean>
    
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="item-change-topic"></constructor-arg>
        </bean>
        
        <!-- 监听器 -->
        <bean id="itemChangeMessageListener" class="com.taotao.search.listener.ItemChangeMessageListener"></bean>
        <!-- 监听容器,作用:启动线程做监听 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"></property>
            <property name="destination" ref="topicDestination"></property>
            <property name="messageListener" ref="itemChangeMessageListener"></property>
        </bean>

    这样就实现商品数据的索引库和数据库的同步。

  • 相关阅读:
    nacos安装配置和部署教程
    springcloudstream整合rabbitmq
    Springboot整合swagger2
    git命令详解
    Mybatis 注解开发传入List 两种方式接收方式 在IN场景中
    java 根据时间段查询数据库
    stream分页
    201521123068《Java程序设计》第1周学习总结
    201521123027 《JAVA程序设计》第二周学习总结
    201521123027 《JAVA程序设计》第一周学习总结
  • 原文地址:https://www.cnblogs.com/toria/p/11298427.html
Copyright © 2011-2022 走看看