zoukankan      html  css  js  c++  java
  • spring整合JMS

    JMS(java消息服务)

    JMS是java的一个标准,定义了使用消息代理的通用API。

    Spring基于模板类JmsTemplate为JMS提供了支持

    Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上到达的消息

    消息代理(message broker)和目的地(desttination)

    日常收寄快递,会涉及三个对象:

    • 寄件人
    • 目的地
    • 收件人

    假设没有快递企业,寄件人必须亲自跋涉千里把物品送到目的地,然后由收件人去领取。

    在异步消息中,消息代理充当快递企业的角色。为消息代理指定目的地后,消息代理确保消息送达,同时解放了发送者,使其可以进行其他业务

    目的地只关注消息从哪里获取,不关心由谁取走消息。它有两种同用的目的地类型:

    • 队列
    • 主题

    搭建消息代理——ActiveMQ

    1、配置依赖

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-spring</artifactId>
        <version>5.15.11</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
    </dependency>
    

    2、配置连接工厂

    首先配置JMS连接工厂,让应用知道如何连接到ActiveMQ。

    ActiveMQConnectionFactory是ActiveMQ自带的连接工厂,在Spring中配置如下:

    <bean id="connectionFactory" 
    	class="org.apache.activemq.ActiveMQConnectionFactory"
    	p:brokerURL="tcp://localhost:61616" />
    

    ActiveMQ代理监听默认监听localhost的61616端口,可以使用brokerURL属性来指定端口

    3、声明ActiveMQ消息的目的地

    如果目的地是队列类型:

    <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"
        c:_0="queue" />
    

    ActiveMQTopic(String name) 构造器会创建一个队列类型的目的地,并用String对象指明队列的名字

    如果目的地是队列类型:

    <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"
        c:_0="topic" />
    

    4、使用amq命名空间简化配置

    <amq:queue id="spittleQueue" physicalName="spittle.alert.queue" />
    
    <amq:topic id="spittleTopic" physicalName="spittle.alert.topic" />
    
    <amq:connectionFactory id="connectionFactory" 
          brokerURL="tcp://localhost:61616" />
    

    发送和接受消息

    如果不适用JmsTemplate模板,发送消息需要如下步骤:

    1. 建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址(一般使用默认,如果没有修改的话)
    2. 通过ConnectionFactory对象创建一个Connection连接,并且调用Connection的start方法开启连接,Connection方法默认是关闭的
    3. 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数1是是否启用事物,参数2是签收模式,一般设置为自动签收
    4. 通过Session对象创建Destination对象(目的地),指的是一个客户端用来制定生产消息目标和消费消息来源的对象。在PTP的模式中,Destination被称作队列,在Pub/Sub模式中,Destination被称作主题(Topic)
    5. 通过Session对象创建消息的发送和接收对象(生产者和消费者)
    6. 通过MessageProducer的setDeliverMode方法为其设置持久化或者非持久化特性
    7. 使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。客户端同理。记得关闭
    public static void main(String[] args) throws JMSException {
    
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
                
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
        
        Destination destination = session.createQueue("queue");
        
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for (int i=0;i<=5;i++) {
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("我是第"+i+"消息");
            producer.send(textMessage);
        }
        if(connection!=null){
            connection.close();
        }
    }
    

    通过JmsTemplate模板可以简化上述代码

    1、配置JmsTemplate

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
        c:_0-ref="connectionFactory"
        
        <!--可选属性, 指定默认目的地 -->
        p:defaultDestination-ref="queue" />
    

    如果系统存在目的地,可以通过p:defaultDestination-ref将目的地bean装配进来;

    此外,也可以用defaultDestinationName属性指明消息通道名称,如果命名目的地存在就使用已有的,否则创建该命名目的地.

    它的messageConverter属性可以指定消息转换器,JmsTemplate在convertAndSend()方法中默认使用SimpleMessage Converter

    2、发送和接受消息

    import org.springframework.jms.core.JmsOperations;
    
    public class AlertServiceImpl implements AlertService {
    
      private JmsOperations jmsOperations;
    
      public AlertServiceImpl(JmsOperations jmsOperations) {
        this.jmsOperations = jmsOperations;
      }
    
    //  使用send方法发送消息
    //  public void sendSpittleAlert(final Spittle spittle) {
    //    jmsOperations.send(
    //      "spittle.alert.queue", 
    //      new MessageCreator() {
    //        public Message createMessage(Session session) 
    //                       throws JMSException {
    //          return session.createObjectMessage(spittle);
    //        }
    //      }
    //    );
    //  }
    
    //  使用convertAndSend方法发送消息
      public void sendSpittleAlert(Spittle spittle) {
        jmsOperations.convertAndSend(spittle);
      }
    
    //  使用receive方法接收消息 
    //  public Spittle getSpittleAlert() {
    //    try {
    //    ObjectMessage message = (ObjectMessage) jmsOperations.receive();
    //    return (Spittle) message.getObject();
    //    } catch (JMSException e) {
    //      throw JmsUtils.convertJmsAccessException(e);
    //    }
    //  }
    
    //  使用receiveAndConvert方法接收消息  
      public Spittle retrieveSpittleAlert() {
        return (Spittle) jmsOperations.receiveAndConvert();
      }
    }
    

    convertAndSend/receiveAndConvert会使用内置的消息转换器(message converter)创建消息。这两个方法在接收Object对象时,Object对象必须实现Serializable接口而且有默认构造器

    使用JmsTemplate接收消息的最大缺点在于receive()和receiveAndConvert()方法都是同步的

    创建消息驱动的POJO

    消息驱动(MDB)来源于EJB2规范。MDB可以异步处理消息,MDB将JMS目的地中的消息作为事件,并对这些事件进行响应

    MDB需要实现javax.jms.MessageListener接口的onMessage(message message)f的方法,并使用@MessageDriven(mappedName="destination")注解标注MDB

    Spring提供了以POJO的方式处理消息的能力,不需要实现javax.jms.MessageListener接口

    只需要把处理消息的POJO配置为消息监听器,这个POJO便能接受消息

    <jms:listener-container connection-factory="connectionFactory">
        <jms:listener destination="destination"
            <!-- messageHandler是自定义的POJO消息处理器 -->
            ref="messageHandler"
            
            <!-- messageHandler是自定义的POJO消息处理器的处理方法 -->
            method="handler" />
    </jms:listener-container>
    

    消息监听器容器(message listener container)是一个特殊的bean,它可以监控JMS目的地并等待消息到达。一旦有消息到达,它取出消息,然后把消息传给任意一个对此消息感兴趣的消息监听器

    connection-factory属性配置了连接工厂,容器中的每个jms:listener都使用这个连接工厂进行消息监听,connection-factory属性默认值为connectionFactory

    对于jms:listener元素如果ref属性所标示的bean实现了MessageListener,那就没有必要再指定 method 属性了,默认就会调用onMessage()方法

    参考

    消息中间件ActiveMQ使用详解
    浅谈ActiveMQ与使用

  • 相关阅读:
    交叉编译报错 ld: unrecognized option '-Wl,-O1'
    QT WebEngineView显示不了中文
    MySQL连不上,重装时需要输入current password,最后一步失败
    Ubuntu 16.04关闭图形界面
    windows下查看文件的MD5,SHA1,SHA256
    MAC地址到IPV6 Local Link地址的转换
    systemctl stop某个服务时,保留服务开启的子程序
    WEB服务-Nginx之9-四层负载均衡
    WEB服务-Nginx之8-七层负载均衡
    WEB服务-Nginx之7-反向代理
  • 原文地址:https://www.cnblogs.com/weixia-blog/p/12355117.html
Copyright © 2011-2022 走看看