zoukankan      html  css  js  c++  java
  • spring-消息

    1、异步消息

      当一个消息发送时候,消息会被交给消息代理,消息代理可以确保消息被发送到指定的目的地,同时解放发送者,使其能够继续进行其它业务。消息代理通常有ActiveMQ、RabbitMQ...,目的地通常有队列和主题,队列采用点对点的模型,主题采用发布订阅模型

    • 点对点模型:消息队列可以有多个接受者,但每条消息只能被一个接收者取走

    • 发布订阅模型:消息队列可以有多个订阅者,每条消息可以发送给多个主题订阅者

     2、JMS发送/接收消息

    1)activemq配置,使用ActiveMQ,并使用了JMSTemplate。

      JMS模板为开发者提供了与消息代理进行交互发送和接收消息的标准API,几乎每个消息代理都支持JMS。Jms模板和spring Date提供的jdbc模板一样可以消除样板代码,让开发更集中在业务处理上。

       <!-- ActiveMQ连接工厂 -->
        <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616"></property>
        </bean>
        <!-- 消息队列 -->
        <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="userqueue"></constructor-arg>
        </bean>
        <!-- 消息主题 -->
        <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="usertopic"></constructor-arg>
        </bean>
        <!-- 定义模板 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
      </bean>

    2)发送消息

    package com.cn.activemq;
    
    import com.cn.pojo.User;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    @Component
    public class SendMessageMQUtil {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Autowired
        @Qualifier("queue")
        private Destination queue;
    
        @Autowired
        @Qualifier("topic")
        private Destination topic;
    
        /**
         * 队列--发送消息
         * @param user
         */
        public void sendUserQueue(final User user){
    
            jmsTemplate.send(queue, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createObjectMessage(user);
                }
            });
        }
    
        /**
         * 主题--发送消息
         * @param user
         */
        public void sendUserTopic(final User user){
    
            jmsTemplate.send(topic, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createObjectMessage(user);
                }
            });
        }
    }

    3)接收消息

    package com.cn.activemq;
    
    import com.cn.pojo.User;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.ObjectMessage;
    
    @Component
    public class ReceiveMessageMQUtil {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Autowired
        @Qualifier("queue")
        private Destination queue;
    
        @Autowired
        @Qualifier("topic")
        private Destination topic;
    
        /**
         * 队列--接受消息
         * @return
         */
        public User receiveUserQueue(){
            try {
                ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(queue);
                return (User)objectMessage.getObject();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            return null;
        }
        /**
         * 主题--接受消息
         * @return
         */
        public User receiveUserTopic(){
            try {
                ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(topic);
                return (User)objectMessage.getObject();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            return null;
        }
    
    }

    4)测试

    分别新建测试类

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springMvc.xml"})//不可使用classpath:spring-*.xml,否则配置文件不起作用
    public class SendMessageMQUtilTest {
    
        @Autowired
        private SendMessageMQUtil sendMessageMQUtil;
    
        @Test
        public void sendUserQueue() throws Exception {
            User user=new User("computer1", "111111");
            sendMessageMQUtil.sendUserQueue(user);
        }
    
        @Test
        public void sendUserTopic() throws Exception {
            User user=new User("computer2", "222222");
            sendMessageMQUtil.sendUserTopic(user);
        }
    }
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springMvc.xml"})
    public class ReceiveMessageMQUtilTest {
    
        @Autowired
        private ReceiveMessageMQUtil receiveMessageMQUtil;
    
        @Test
        public void receiveUserQueue() throws Exception {
    
            User user=receiveMessageMQUtil.receiveUserQueue();
            System.out.println("ActiveMQ接收到的数据:"+user);
        }
    
        @Test
        public void receiveUserTopic() throws Exception {
    
            User user=receiveMessageMQUtil.receiveUserTopic();
            System.out.println("ActiveMQ接收到的数据:"+user);
        }
    
        @Test
        public void receiveUserTopic2() throws Exception {
    
            User user=receiveMessageMQUtil.receiveUserTopic();
            System.out.println("ActiveMQ接收到的数据:"+user);
        }
    
    }

    分别运行测试方法,结合activeMQ的控制台可以看出队列、主题以及各自的消费者等情况

    显示队列:

    显示主题:

         

    3、其它

    1)设置默认的目的地

      上述在发送消息和接收消息时,每次调用发送/接收消息的方法都传入了一个目的地参数。然而,可以在JmsTemplate实例化的时候,指定默认的目的地,如下:

        <!-- 定义队列 -->
        <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="userqueue"></constructor-arg>
        </bean>
        <!-- 定义模板 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
            <property name="defaultDestination" ref="queue"></property>//此处注入队列,也可以注入主题
        </bean>

    采用指定默认目的地的方式,则发送/接收消息调用的方法不用传递目的地了

     /**
         * 队列--发送消息
         * @param user
         */
        public void sendUserQueue(final User user){
    
            jmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createObjectMessage(user);
                }
            });
    
     /**
         * 队列--接受消息
         * @return
         */
        public User receiveUserQueue(){
            try {
                ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive();
                return (User)objectMessage.getObject();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            return null;
        }
       }

    2)发送消息对消息进行转换

      除了send(..)方法,JmsTemplate还提供了convertAndSend()方法,该方法不需要MessageCreator参数,而使用内置的消息转换器创建消息并发送。在JmsTemplate实例化时未指定消息转换器,在调用convertAndSend()方法则使用默认的SimpleMessageConverter消息转换器;receiveAndConvert()方法则在接收时候使用消息转换器

    /**
         * 队列--发送消息
         * @param user
         */
        public void sendUserQueue(final User user){
    
            jmsTemplate.convertAndSend(user);
        }
        /**
         * 队列--接受消息
         * @return
         */
        public User receiveUserQueue(){
            return (User)jmsTemplate.receiveAndConvert();
        }

    在JmsTemplate实例化指定消息转换器,则会在使用convertAndSend()/receiveAndConvert()方法使用指定的消息转换器

        <!-- 定义队列 -->
        <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="userqueue"></constructor-arg>
        </bean>
        <!-- 消息转换器  -->
        <bean id="mappingJackson2MessageConverter" class="org.springframework.jms.support.converter.MappingJackson2MessageConverter"></bean>
        <!-- 定义模板 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
            <property name="defaultDestination" ref="queue"></property>
            <property name="messageConverter" ref="mappingJackson2MessageConverter"></property>
        </bean>

    综合1)2),使用convertAndSend()/receiveAndConvert()发送和接收消息更加简单;在某些情况下,统一配置目的地也简化的使用

     3)使用消息监听器实现异步接收消息

      receive()相关方法是阻塞等到队列和主题是否有消息,或者超时才会返回。Spring提供了以POJO的方式处理消息的能力-创建消息监听器,可以在队列和主题有消息时候出发监听器的方法处理消息,而不需要等待

    创建POJO

    package com.cn.pojo;public class MyMessageHandler2 {
    
        public void handlerMessage(User user) {
            System.out.println("接收到的消息是:"+user);
        }
    }

    配置消息监听器,如果在队列userqueue中有消息,则会调用handlerMessage的方法。

        <!-- 消息处理POJO -->
        <bean id="myMessageHandler2" class="com.cn.pojo.MyMessageHandler2"></bean>
        <!-- 配置消息监听器 -->
        <jms:listener-container connection-factory="activeMQConnectionFactory">
            <jms:listener destination="userqueue" ref="myMessageHandler2" method="handlerMessage"></jms:listener>
        </jms:listener-container>

    如果消息处理的POJO实现了MessageListener 接口

    package com.cn.pojo;
    
    import javax.jms.Message;
    import javax.jms.MessageListener;
    
    public class MyMessageHandler2 implements MessageListener {
    
        public void onMessage(Message message) {
            System.out.println("接收到的消息是:"+message);
        }
    }

    则配置消息监听器时候,不需在<jms:listener>中配置 method元素。如果在队列userqueue中有消息,则会调用onMessage的方法。

     ###ActiveMQ的应用场景https://blog.csdn.net/he90227/article/details/50800646

  • 相关阅读:
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    Scapy 工具介绍
    ubuntu虚拟机使用open-vm-tools代替vmware-tools
    docker、vmware和PD的区别
    ubuntu查看OpenGL版本
    SQL Server 常用近百条SQL语句(收藏版)
    Intellij IDEA 如何去掉 @Autowired 注入警告
    awtk-linux-fb 使用 double framebuffer 闪烁的问题
  • 原文地址:https://www.cnblogs.com/shixiemayi/p/9574191.html
Copyright © 2011-2022 走看看