zoukankan      html  css  js  c++  java
  • Spring 整合 ActiveMQ

    Spring官方提供了一个叫JmsTemplate的类,这个类就专门用来处理JMS的,在该类的Bean配置标签中有两个属性connectionFactory-refdefaultDestination-ref正好对应JMS中的ConnectionFactoryDestination

    依赖

    <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-jms</artifactId>
    </dependency>
    
    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-pool</artifactId>
    </dependency>
    
    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
    </dependency>
    

    配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="
               http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
               http://www.springframework.org/schema/context
               http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS 服务厂商提供 -->
        <bean id="providerFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"/>
            <property name="useAsyncSend" value="true"/>
        </bean>
    
        <!-- 配置生产者连接池
             可以用来将Connection、Session和MessageProducer池化,
             这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包-->
        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory" ref="providerFactory"/>
            <!-- 最大连接数 -->
            <property name="maxConnections" value="100"></property>
        </bean>
    
    
        <!-- 队列的目的地,点对点-->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--创建的bean队列的名称-->
            <constructor-arg index="0" value="spring-active-queue"/>
        </bean>
        
        <!-- 主题的目的地:不配置订阅就是同步阻塞模式 -->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <!--创建的bean主题的名称-->
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
    
    
        <!-- 消费者配置监听器,listener-container可以同时支持多个监听器,
                 如果你设置了多个监听器,那么这些监听器会轮流去队列中获取信息处理。 -->
        <bean id="ListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="jmsFactory"/>
            <!-- 开启订阅模式 -->
            <property name="pubSubDomain" value="true"/>
            <!-- 目的地 -->
            <property name="destination" ref="destinationTopic"/>
            <!-- 持久化订阅 -->
            <property name="subscriptionDurable" value="true"/>
    
            <!---接收客户端ID,在持久化时,客户端不在线时消息就存在数据库里,直到被这个ID的客户端消费掉-->
            <!-- connection.setClientID(clientId) -->
            <property name="clientId" value="ld1"/>    //Connection Id
            <!-- MessageConsumer messageConsumer = session.createDurableSubscriber((Topic)getDestination(), getDurableSubscriptionName());
                 通过 Session 创建 DurableSubscriber 的时候,我们要为其提供一个 Durable Subscriber Name -->
            <property name="durableSubscriptionName" value="ld1"/>     //Connection中不同的Durable Subscription
    
            <!-- 消息应答方式
                Session.AUTO_ACKNOWLEDGE=1  消息自动签收
                Session.CLIENT_ACKNOWLEDGE=2  客户端调用acknowledge方法手动签收
                Session.DUPS_OK_ACKNOWLEDGE=3  不必必须签收,消息可能会重复发送-->
            <property name="sessionAcknowledgeMode" value="1"/>
    
            <!--消息监听器-->
            <property name="messageListener" ref="myMessageListener"/>
        </bean>
    
    
        <!-- spring 使用 jmsTemplate 来实现消息的发送和接收 -->
        <bean id="JmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    
            <property name="connectionFactory" ref="jmsFactory"/>
    
            <!--默认目的地-->
            <property name="defaultDestination" ref="destinationQueue"/>
    
            <!--消息转换-->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
    
            <!-- 开启订阅模式 -->
            <property name="pubSubDomain" value="true"/>
    
            <!-- 自动签收模式 -->
            <property name="sessionAcknowledgeMode" value="1" />
    
            <!--过期时间(默认为永久有效。)-->
            <property name="receiveTimeout" value="10000"/>
    
            <!-- 发送模式
                 DeliveryMode.NON_PERSISTENT=1 非持久
                 DeliveryMode.PERSISTENT=2 持久 -->
            <property name="deliveryMode" value="2"/>
    
            <!-- 默认false
                 如果是true,deliveryMode(持久化), priority(优先级), timeToLive(消息的存活时间)的值将被使用
                 否则使用默认的值。-->
            <property name="explicitQosEnabled" value="true"/>
        </bean>
    
    
        <!-- 切记每一个消息消费者的clientId都是唯一的,切不可重复,
             它每一个消费者对应一条信道,一旦重复那么会报信道被占用错误 -->
    </beans>
    

    监听器

    @Component(value = "myMessageListener")
    public class SimpleMsgListener implements MessageListener {
    
        //收到信息时的动作
        @Override
        public void onMessage(Message message) {
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("收到的信息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    生产者(直接配置监听器,就不需要启动消费者)

    public class QueueProducer {
    
        // 负责消息的发送和接收可以理解为MessageProducer和MessageConsummer的组合。
        private static JmsTemplate jt = null;
    
        public static void main(String[] args) {
            ApplicationContext ctx = new ClassPathXmlApplicationContext("application.xml");
            // 获取JmsTemplate对象
            jt = (JmsTemplate) ctx.getBean("jmsTemplate");
            // 调用方法,发送消息
            jt.send(new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage msg = session.createTextMessage("Hello");
                    return msg;
                }
            });
            System.out.println("end");
        }
    }
    
  • 相关阅读:
    文件操作工具类
    批量插入数据到 MySQL的几种方式
    C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率
    C#队列学习笔记:RabbitMQ延迟队列
    C#队列学习笔记:RabbitMQ优先级队列
    C#队列学习笔记:RabbitMQ实现客户端相互通讯
    C#队列学习笔记:RabbitMQ搭建集群
    C#队列学习笔记:RabbitMQ安装及使用
    C#队列学习笔记:RabbitMQ基础知识
    C#队列学习笔记:MSMQ入门二
  • 原文地址:https://www.cnblogs.com/loveer/p/11405884.html
Copyright © 2011-2022 走看看