zoukankan      html  css  js  c++  java
  • 叙述 activemq 与spring 主题实现 小功能实现

    在上一篇文章里 我说到了 maven的配置  我现在直接说 xml配置   

    首先我先描述 生产者的信息

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc"
        xsi:schemaLocation="
            http://www.springframework.org/schema/beans     
            http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.1.xsd
            http://www.springframework.org/schema/mvc
            http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
            http://www.springframework.org/schema/jms
            http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
            http://activemq.apache.org/schema/core
            http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
            >
        
        <!-- <context:component-scan base-package="com.gzframe.demo.activemq" />
        <mvc:annotation-driven /> -->
            
        <amq:connectionFactory id="amqConnectionFactory" 
            brokerURL="tcp://60.205.212.39:61616" 
            userName="admin" 
            password="admin" />
        
        <!-- 配置JMS连接工长 -->
        <bean id="connectionFactoryMq"
            class="org.springframework.jms.connection.CachingConnectionFactory">
            <constructor-arg ref="amqConnectionFactory" />
            <property name="sessionCacheSize" value="100" />
        </bean>
        
        
        <!-- 定义主题 -->
         <!-- 定义消息主题(topic)  在这里要注意一下   -->
        <bean id="demoTopicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <!-- 设置消息队列的名字 -->
            <constructor-arg>
                <value>topic.demo2</value>
            </constructor-arg>
        </bean>
        <!-- 配置JMS模板(topic),Spring提供的JMS工具类,它发送、接收消息。org.apache.activemq.command.ActiveMQQueue 如果你用的是这个  系统不会报错  但是 你生产的 会在队列里生产出来  也会在 主题里  -->
        <bean id="jmsTemplateTopic" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactoryMq" />
            <property name="defaultDestination" ref="demoTopicDestination" />
            <property name="receiveTimeout" value="10000" />
            <!-- true是topic,false是queue,默认是false,此处显示写出false -->
            <property name="pubSubDomain" value="true" />
        </bean>
        
        
        
     </beans>

    配置好了以后  下面是我的controller层   这个我直接就写方法了  因为在上一篇文章里  我写的controller 都在一起

    @Autowired
        private ActiveMqTopicsService activeMqTopicsService;
    /**
         * 
         * @Title: TestTopic @Description: 一个标题 发布 生产者 @param 设定文件 @return void
         * 返回类型 @throws
         */
        @RequestMapping("TestTestTopic")
        public void TestTopic() {
            activeMqTopicsService.TestTopic();
        }
    @Service
    public class ActiveMqTopicsServiceImpl implements ActiveMqTopicsService{
        
        @Resource(name="jmsTemplateTopic")
        private JmsTemplate jmsTemplate;
        
        //队列名gzframe.demo
        @Resource(name="demoTopicDestination")
        private Destination destination;
        /**
         * 
        * @Title: TestTopic 
        * @Description: 一个标题 发布 生产者
        * @param     设定文件 
        * @return void    返回类型 
        * @throws
         */
        public void TestTopic() {
            jmsTemplate.send(new MessageCreator() {  
                public Message createMessage(Session session) throws JMSException {  
                    TextMessage msg = session.createTextMessage();  
                    // 设置消息属性  
                    msg.setStringProperty("phrCode", "C001");  
                    // 设置消息内容  
                    msg.setText("Hello World! bug1");  
                    return msg;  
                }  
            });  
        }
    }

    运行结果

    在这里  我先说一下 我的个人习惯  我想把这个东西都已经放进去了 我就想用请求  取出来  可是 找了 很久发现 取不出来 应该是订阅的关系  所以显示订阅   你订阅了以后  你放进去东西 才会给你返回  自己单独取应该是取不出来的   主题就是订阅模式吧  

    我先配置一下我的xml  首先你要订阅它  然后配置监听  

    在这里 要注意下  需要重新配置一下工厂   这个订阅与生产者是同一个xml  在订阅里已经设置了 订阅主题的名字

     <!-- 配置JMS连接工厂 -->
        <bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="failover:(tcp://**.**.***.***:61616)" />
            <property name="useAsyncSend" value="true" />
            <property name="clientID" value="consumerClienctConnect" />
        </bean>

    然后消费者订阅

    <!-- 消息订阅客户端1 -->
        <bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="consumerConnectionFactory" />
            <!-- 开启订阅模式 -->
            <property name="pubSubDomain" value="true"/>
            <property name="destination" ref="demoTopicDestination" />
            <property name="subscriptionDurable" value="true"/>
            <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
            <property name="clientId" value="consumerClient1"/>
            <property name="messageListener" ref="activeMqTopicsServiceImpl" />
            <!-- 消息应答方式
                 Session.AUTO_ACKNOWLEDGE  消息自动签收
                 Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
                 Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
            -->
            <property name="sessionAcknowledgeMode" value="1"/>
        </bean>
        <!-- 消息订阅客户端2 -->
        <bean id="consumerListenerClient2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="consumerConnectionFactory" />
            <!-- 开启订阅模式 -->
            <property name="pubSubDomain" value="true"/>
            <property name="destination" ref="demoTopicDestination" />
            <property name="subscriptionDurable" value="true"/>
            <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
            <property name="clientId" value="consumerClient2"/>
            <property name="messageListener" ref="activeMqTopicsServiceImpl" />
            <!-- 消息应答方式
                 Session.AUTO_ACKNOWLEDGE  消息自动签收
                 Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
                 Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
            -->
            <property name="sessionAcknowledgeMode" value="1"/>
        </bean>

    在配置文件完成了以后  在java里要实现这个接口MessageListener

    @Service
    public class ActiveMqTopicsServiceImpl implements ActiveMqTopicsService,MessageListener {
        
        @Resource(name="jmsTemplateTopic")
        private JmsTemplate jmsTemplate;
        
        //队列名gzframe.demo
        @Resource(name="demoTopicDestination")
        private Destination destination;
        
        
        @Override
        public void onMessage(Message arg0) {
            TextMessage tm = (TextMessage)arg0;
            try {
                System.out.println("从订阅主题--" + destination.toString() + "收到了消息:	"
                        + tm.getText());
                tm.acknowledge();
                String str = tm.getText();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            
        }
    }

    这个实现与接口我都放在了一起  看见的要注意下呀

    实现结果

     

     因为我我配置了 两个 消费者  所以会监听两次 出现两个输出

    到这里就结束了  就是很简单的例子 这篇文章与上一篇文章  我也是 在网上找的例子 自己整合了一下 做的实现  我也是个初学者  希望给大家一点点小帮助  哈哈哈    

  • 相关阅读:
    Socket_leaks open socket #5024 left in connection
    阿里云 如何减少备份使用量? mysql数据库的完整备份、差异备份、增量备份
    一个正则式引发的血案 贪婪、懒惰与独占
    linux下tmp目录里很多php开头的文件
    后端线上服务监控与报警方案
    架构先行
    数据盘缩容
    文件过滤 批量删除
    mock数据(模拟后台数据)
    如何避免升级 Linux 实例内核后无法启动
  • 原文地址:https://www.cnblogs.com/pqy521/p/8145271.html
Copyright © 2011-2022 走看看