zoukankan      html  css  js  c++  java
  • SPRING 集成 activemq 的 topic 模式

    概要

    activemq 支持两种模式:

    1.队列模式

    2. 发布订阅者模式,topic有一个主题可以有多个订阅者。这种情况可以将一个消息,分发到多个消费者。

    比如我有这样一个案例,用户需要同步,而且需要同步到多个系统,那么我们只需要队列添加一个主题,其他的子系统订阅该主题。分别处理自己的同步逻辑。

    这样就实现了代码的解耦。

    实现代码

    1.生产者

    import java.util.Map;
    
    import javax.annotation.Resource;
    import javax.jms.Destination;
    
    import org.apache.activemq.command.ActiveMQTopic;
    import org.springframework.jms.core.JmsTemplate;
    
    public class TopicProducer {
        @Resource(name = "topicJmsTemplate")
        private JmsTemplate jmsTemplate;
        private Map<String,Destination> topicMap=new java.util.concurrent.ConcurrentHashMap<>();
        /**
         * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination
         * @param destination
         * @param message
         */
        public void sendMqMessage(String topicName, Object model){
            Destination dest=null;
            if(!topicMap.containsKey(topicName)){
                dest=new ActiveMQTopic(topicName);
                topicMap.put(topicName, dest);
            }
            else{
                dest=topicMap.get(topicName);
            }
            jmsTemplate.convertAndSend(dest, model);
        }
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    }

    生产者XML配置

    <!-- topic 连接工厂 -->
        <bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="failover:(tcp://${jms.ip}:${jms.port})" />
            <property name="useAsyncSend" value="true" />
            <property name="clientID" value="providerClienctConnect" />
        </bean>
        
        <bean id="topicDestination"  class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="userOrgTopic"/>
        </bean>
    
        <!-- 消息发送者客户端 -->
        <bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="providerConnectionFactory" />
            <property name="defaultDestination" ref="topicDestination" />
            <!-- 开启订阅模式 -->
            <property name="pubSubDomain" value="true"/>
            <property name="receiveTimeout" value="10000" />
            <!-- deliveryMode, priority, timeToLive 的开关要生效,必须配置为true,默认false-->
            <property name="explicitQosEnabled" value="true"/>
            <!-- 发送模式
                 DeliveryMode.NON_PERSISTENT=1:非持久 ;
                 DeliveryMode.PERSISTENT=2:持久
            -->
            <property name="deliveryMode" value="1"/>
        </bean>
        
        <bean id="topicProducer" class="com.aps.core.jms.TopicProducer"></bean>

    deliveryMode :

    1:非持久 就是如果消息发送后,没有消费者启动,重启服务后,那么消息将会消失。

    2.持久 消息发送后,消息没有被消费,重启服务后,消息依然存在。

    2.配置消费者

    java 代码

    import javax.jms.Message;
    import javax.jms.MessageListener;
    
    public class ConsumerMessageListener implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
    //        ObjectMessage obj=(ObjectMessage)message;
    //        try {
    //            OsUser user=(OsUser) obj.getObject();
    //            System.out.println(user.getFullname());
    //        } catch (JMSException e) {
    //            // TODO Auto-generated catch block
    //            e.printStackTrace();
    //        }
    //        System.out.println("ok");
        }
    
    }

    xml 配置

    <!-- 配置JMS连接工厂 -->
        <bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="failover:(tcp://${jms.ip}:${jms.port})" />
            <property name="useAsyncSend" value="true" />
            <property name="clientID" value="consumerClienctConnect" />
        </bean>
    
        <!-- 定义消息Destination -->
        <bean id="topic1Destination"  class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="mytopic"/>
        </bean>
        
        <bean id="topic2Destination"  class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="mytopic"/>
        </bean>
    
        <!-- 配置消息消费监听者 -->
        <bean id="consumerMessageListener" class="com.aps.jms.ConsumerMessageListener" />
    
        <!-- 消息订阅客户端1 -->
        <bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="consumerConnectionFactory" />
            <!-- 开启订阅模式 -->
            <property name="pubSubDomain" value="true"/>
            <property name="destination" ref="topic1Destination" />
            <property name="subscriptionDurable" value="true"/>
            <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
            <property name="clientId" value="consumerClient1"/>
            <property name="messageListener" ref="consumerMessageListener" />
            <!-- 消息应答方式
                 Session.AUTO_ACKNOWLEDGE  消息自动签收
                 Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
                 Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
            -->
            <property name="sessionAcknowledgeMode" value="1"/>
        </bean>
    
        <!-- 消息订阅客户端2 -->
        <bean id="consumerListenerClient2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="consumerConnectionFactory" />
            <!-- 开启订阅模式 -->
            <property name="pubSubDomain" value="true"/>
            <property name="destination" ref="topic2Destination" />
            <property name="subscriptionDurable" value="true"/>
            <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
            <property name="clientId" value="consumerClient2"/>
            <property name="messageListener" ref="consumerMessageListener" />
            <!-- 消息应答方式
                 Session.AUTO_ACKNOWLEDGE  消息自动签收
                 Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
                 Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
            -->
            <property name="sessionAcknowledgeMode" value="1"/>
        </bean>

    这里可以看到,我们配置了两个消费者。

    我们测试的时候,发一个消息,可以看到两个收到两次消息。

  • 相关阅读:
    show proceslist时发现大量的sleep,有什么风险吗,该如何处理?
    监控MySQL的性能,应该主要观察那几个监控项?
    MySQL所有的压力都在一个CPU核心上,为什么会产生这种现象,改如何解决?
    大表,某列无索引,先需要查询该列,删除符合条件的记录,大约占40%数据量,请问有何更好的方案吗?
    MySQL DBA运维中那些动作属于危险性操作?
    云环境上自建MySQL,有哪些高可用实现方案?
    RDS上,MySQL实例中某张表数据小于tmp_table_size,但有查询时会报错临时空间满 The table '/data/mysql/zst/tmp/#sql_13975_23' is full. 原因可能是什么?
    MySQL误删除frm文件该怎么办?
    生产环境MySQL死锁如何监控及如何减少死锁发生的概率。
    MongoDB有哪些优秀特性及适合的场景是什么?
  • 原文地址:https://www.cnblogs.com/yg_zhang/p/10106069.html
Copyright © 2011-2022 走看看