zoukankan      html  css  js  c++  java
  • activeMq与spring整合

    ActiveMQ 使用的是标准生产者和消费者模型
    有两种数据结构 QueueTopic
    1Queue 队列 ,生产者生产了一个消息,只能由一个消费者进行消费
    2Topic 话题,生产者生产了一个消息,可以由多个消费者进行消费

     

    默认 tcp 连接 activeMQ 端口 61616

    导入jar

    2、 编写配置生产者
    配置 activemq 连接工厂

    <!-- ActiveMQ 连接工厂 -->
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
        <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
        <amq:connectionFactory id="amqConnectionFactory"
            brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />
    
        <!-- Spring Caching连接工厂 -->
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactor
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
            <!-- 同上,同理 -->
            <!-- <constructor-arg ref="amqConnectionFactory" /> -->
            <!-- Session缓存数量 -->
            <property name="sessionCacheSize" value="100" />
        </bean>
        
         <!-- Spring JmsTemplate 的消息生产者 start-->
    
        <!-- 定义JmsTemplate的Queue类型 -->
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="mqConnectionFactory" />
            <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
            <property name="pubSubDomain" value="false" />
        </bean>
    
        <!-- 定义JmsTemplate的Topic类型 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
             <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="mqConnectionFactory" />
            <!-- pub/sub模型(发布/订阅) -->
            <property name="pubSubDomain" value="true" />
        </bean>
    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" 
    connection-factory="connectionFactory" acknowledge="auto">
    <!-- 默认注册bean名称,应该是类名首字母小写 -->
    <jms:listener destination="bos_sms" ref="topicConsumer"/>
    </jms:listener-container>
     

    queue模式生产者模板,topic模式同,只是注入类不同:

    @Service
    public class QueueSender {
        // 注入jmsTemplate
        @Autowired
        @Qualifier("jmsQueueTemplate")
        private JmsTemplate jmsTemplate;
      //创建一个send方法,传入队列名,和一个用于发送的消息
        public void send(String queueName, final String message) {
            jmsTemplate.send(queueName, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(message);
                }
            });
        }
    }

    queue消费者模板

    @Service
    public class QueueConsumer1 implements MessageListener {
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out
                        .println("消费者QueueConsumer获取消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    生产者测试

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:applicationContext-mq.xml")
    public class ProducerTest {
        @Autowired
        private QueueSender queueSender;
    
        @Autowired
        private TopicSender topicSender;
    
        @Test
        public void testSendMessage() {
            queueSender.send("spring_queue", "我是生产者");
        }
    }

    消费者测试

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml")
    public class ConsumerTest {
    
        @Test
        public void testConsumerMessage() {
            while (true) {
                // junit退出,防止进程死掉
            }
        }
    }

    结果运行生产者测试后对应测试者会输出对应消息

     

  • 相关阅读:
    一张图告诉你为什么是服务网关,文末有现金抽奖。
    Java中的宏变量,宏替换详解。
    Java中创建String的两道面试题及详解
    JSON Web Token (JWT),服务端信息传输安全解决方案。
    jdk紧急漏洞,XMLDecoder反序列化攻击
    Java对象引用四个级别(强、软、弱、虚)
    Java7任务并行执行神器:Fork&Join框架
    (2)Django-pycharm部署
    批处理编写
    (1)Django安装
  • 原文地址:https://www.cnblogs.com/wsx-wb/p/7295214.html
Copyright © 2011-2022 走看看