zoukankan      html  css  js  c++  java
  • activeMq 消费者整合spring

    package com.mq.consumer;

    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;

    public class ConsumerMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
    TextMessage tm = (TextMessage) message;
    try {
    System.out.println("---------消息消费---------");
    System.out.println("订阅者: consumerClient1");
    System.out.println("消息内容: " + tm.getText());
    System.out.println("消息ID: " + tm.getJMSMessageID());
    System.out.println("消息Destination: " + tm.getJMSDestination());
    /* System.out.println("---------更多信息---------");
    System.out.println(ToStringBuilder.reflectionToString(tm));*/
    System.out.println("-------------------------");
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }

    对应的spring的配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">


    <!-- 配置JMS连接工厂 -->
    <bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="failover:(tcp://10.31.88.166:61616)" />
    <property name="useAsyncSend" value="true" />
    <property name="clientID" value="consumerClienctConnect" />
    </bean>

    <!-- 定义消息Destination -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="testSpringTopic"/>
    </bean>

    <!-- 配置消息消费监听者 -->
    <bean id="consumerMessageListener" class="com.mq.consumer.ConsumerMessageListener" />
    <bean id="consumerMessageListener2" class="com.mq.consumer.ConsumerMessageListener2" />

    <!-- 消息订阅客户端1 -->
    <bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="consumerConnectionFactory" />
    <!-- 开启订阅模式 -->
    <property name="pubSubDomain" value="true"/>
    <property name="destination" ref="topicDestination" />
    <property name="subscriptionDurable" value="true"/>
    <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
    <property name="clientId" value="consumerClient1"/>
    <property name="messageListener" ref="consumerMessageListener" />
    <!-- 消息应答方式
    Session.AUTO_ACKNOWLEDGE 消息自动签收
    Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
    Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
    -->
    <property name="sessionAcknowledgeMode" value="1"/>
    </bean>

    </beans>

  • 相关阅读:
    url传递参数带 + ,解决办法
    操作系统——内存地址重定位
    算法——二分查找变形题
    Java——代码性能优化
    maven——添加插件和添加依赖有什么区别?
    JavaWeb——Servlet如何调用线程池中的线程?
    「ZJOI2016」小星星
    [十二省联考2019]字符串问题
    [十二省联考2019]春节十二响
    [十二省联考2019]异或粽子
  • 原文地址:https://www.cnblogs.com/lizihao/p/5550441.html
Copyright © 2011-2022 走看看