zoukankan      html  css  js  c++  java
  • JMS学习三(ActiveMQ消息的可靠性)

    下面我们来学习一下消息接受确认和发送持久化消息、消息的过期、消息的选择器和消息的优先级。

    一、消息接收确认

    1、jms消息只有在被确认之后才认为成功消费了这条消息。消息的成功消费通常包括三个步骤:(1)、client接收消息 (2)、client处理消息 (3)、消息被确认(也就是client给一个确认消息)

    不管是事务性会话还是非事务性会话,第一步和第二步都一样但第三步有所不同

    2、在事务性会话中当一个事务被提交的时候,确认自动发生,和应答模式没关系,这个值可以随便写。(这里多提一句异步消息接收中不能使用事务性会话)

    3、在非事务性会话中消息何时被确认取决于创建的session中设置的消息应答模式(acknowledge model)该参数有三个值:

    (1)、Session.AUTO_ACKNOWLEDGE:当client端成功的从receive方法或从onMessage(Message message) 方法返回的时候,会话自动确认client收到消息。

    (2)、Session.CLIENT_ACKNOWLEDGE: 客户单通过调用acknowledge方法来确认客户端收到消息。但需要注意在这种应答模式下,确认是在会话层上进行的,确认一个被消费的消息将自动确认所有已消费的其他消息。比如一个消费者已经消费了10条消息,然后确认了第5条消息被消费,则这10条都被确认消费了。

    acknowledge()通知方法是在Message对象上

    同步接收,调用acknowledge()方法进行确认:

    1、生产者,设置签收模式为Session.CLIENT_ACKNOWLEDGE  
    // 通过Connection对象创建Session会话(上下文环境对象),  
    // 参数一,表示是否开启事务  
    // 参数二,表示的是签收模式,一般使用的有自动签收和客户端自己确认签收  
    Session session = connection.createSession(Boolean.FALSE,  
    Session.CLIENT_ACKNOWLEDGE);  
      
    2、消费者,设置签收模式为Session.CLIENT_ACKNOWLEDGE  
    // 通过Connection对象创建Session会话(上下文环境对象),  
            // 参数一,表示是否开启事务  
            // 参数二,表示的是签收模式,一般使用的有自动签收和客户端自己确认签收  
            Session session = connection.createSession(Boolean.FALSE,  
                    Session.CLIENT_ACKNOWLEDGE);  
      
    3、消费者,在消费消息的时候,返回一个确认  
    // 使用Session来创建消息对象的生产者或者消费者  
    MessageConsumer createConsumer = session.createConsumer(destination);  
    while (true) {  
      TextMessage textMessage = (TextMessage) createConsumer.receive();//同步方式接收  
      if (textMessage == null) break; // 客户端的签收模式,  
      textMessage.acknowledge();
      System.out.println(
    "收到的内容为" + textMessage.getText());

    异步接受,调用acknowledge()方法进行确认:

    consumer.setMessageListener(new MessageListener() {  
                    @Override  
                    public void onMessage(Message message) {  
                        TextMessage textMessage = (TextMessage) message;  
                        try {  
                            String value = textMessage.getText();  
                            System.out.println("value: " + value);  
                            message.acknowledge(); //消息消费确认通知  
                        } catch (JMSException e) {  
                            e.printStackTrace();  
                        }  
                    }  
                });  

    (3)、Session.DUPS_ACKNOWLEDGE:不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

    小结:

    1.transacted事务,事务成功commit,才会将消息发送到mom中 
    2.acknowledgeMode消息确认机制 
         1)、带事务的session 
                   如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。 
         2)、不带事务的session 
                   不带事务的session的签收方式,取决于session的配置。 
                    Activemq支持一下三種模式: 
                    Session.AUTO_ACKNOWLEDGE  消息自动签收 
                    Session.CLIENT_ACKNOWLEDGE  客戶端调用acknowledge方法手动签收 
                    Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息 
                    头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

    二、发送持久化消息

    这里发送持久化消息和消息的持久化是有点联系,但不是一回事,发送持久化消息的意思是要不要将发送的消息持久化到磁盘或数据库中而消息的持久化是指将消息持久化到磁盘还是数据库,一个是要不要将消息持久化一个是将消息怎么持久化。

    这里要写的当然是要不要将消息持久化,ActiveMQ给我们提供了两种两个选择即持久化或不持久化。

       ActiveMQ支持两种消息的传送模式,PERSISTENT和NON_PERSISTENT两种。如果不指定传输模式,默认的就是持久化消息。如果容忍消息丢失,那么可以使用非持久化的消息机制以改善性能和减少开销。

    1、PERSISTENT:指示JMS Provider持久保存消息,会把消息持久化到磁盘以保证消息不会因为JMS provider、JMS Server 的失败而丢失。

    2、NON_PERSISTENT:不要求JMS Provider持久保存消息。如果JMS Server 重启则消息就丢死了,消费端也就会失去这条消息。

    3、消息的持久化设置:

    MessageProducer producer = session.createProducer(queue);  
                // 消息是否为持久性的,如果不设置默认是持久化的。  
                // producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
                //非持久化消息,消息是不会持久化到磁盘的,发送后如果服务关闭再次开启则消息会丢失。  
                 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  

    消息是否持久化对应用的性能影响还是很大的,而且在消息量很大的时候则更要慎重是否要对消息进行持久化。

    三、消息过期

          

    允许消息过期 。默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。 
    有两种方法设置消息的过期时间,时间单位为毫秒: 
    1.使用消息生产者的setTimeToLive 方法为所有的消息设置过期时间。
    2.使用消息生产者的send 方法为每一条消息设置过期时间。
     消息过期时间,send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值。如果 timeToLive 值等于零,则 JMSExpiration 被设为零, 表示该消息永不过期。

    3、消息服务器接收到消息后,在指定的时间后,会从队列中移除指定的消息,超时被移除的消息不会发送给消费者。

    4、使用消息生产者的setTimeToLive(long time ) 方法来给所有的消息设置过期时间:

    // 消息生产者  
    MessageProducer producer = null;  
    producer = session.createProducer(queue);  
    // 消息是否为持久性的,如果不设置默认是持久化的。  
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
     //消息过期设置  
    producer.setTimeToLive(1000); 

    5、使用消息生产者的send()方法来设置消息的过期时间

    #在生产者发送消息的过程中,可以指定deliveryMode传输模式,priority消息优先级,timeToLive消息过期时间  
      
    void send(Message message, int deliveryMode, int priority, long timeToLive );  

    上面设置消息过期的都是消息生产者这方的来设置的,也就是如果不满足条件则消息服务器会把消息从消息队列中删除,但是我们也可以在消息消费端来设置接受时间(仅限于同步接受)

    Message message = consumer.receive(2);  

    就是在接受的时候添加等待时间(单位是毫秒)如果在指定的时间内获取不到消息则不会再等了。如果不设置等待时间则一直等待直到接收到消息或超时为止。

    四、优先级

     消息优先级从0-9十个级别,0-4是普通消息,5-9是加急消息。如果不指定优先级,默认为4.JMS不要求严格按照这十个优先级发送消息,但必须保证加急消息要优先于普通消息到达。

    五、临时目的地

             可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

    六、持久订阅

            首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic,第二个参数是订阅的名称。 JMS Provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。JMS Provider会象客户发送客户处于非激活状态时所发布的消息。 持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。

    七、本地事务

             在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。 事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。 需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。 需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。

    八、

  • 相关阅读:
    hive 数据hadoop数据etl交换
    团队冲刺(三)
    团队冲刺(二)
    CVPR2019论文热词云的实现
    团队冲刺(一)
    团队开发之电梯演讲----团队项目介绍--“益青春APP”
    android的finish()方法
    java web项目通过外网ip访问
    MySQL出现错误1205-Lock wait timeout exceeded; try restarting transaction
    团队开发(自己的理解)
  • 原文地址:https://www.cnblogs.com/alter888/p/8975113.html
Copyright © 2011-2022 走看看