zoukankan      html  css  js  c++  java
  • ActiveMQ的消息确认问题

    http://riddickbryant.iteye.com/blog/441890

    【发送端】

    session = connection.createSession(Boolean.FALSE,  Session.AUTO_ACKNOWLEDGE);

    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

     1 import javax.jms.*;
     2 import org.apache.activemq.ActiveMQConnection;
     3 import org.apache.activemq.ActiveMQConnectionFactory;
     4 /**
     5  *
     6  * @author LIN NP
     7  */
     8 public class JmsSender{
     9     
    10   private ConnectionFactory connectionFactory = null;
    11   private Connection connection = null;
    12   private Session session = null;
    13   private Destination destination = null;
    14   private MessageProducer producer = null;
    15   
    16   private static final int SEND_NUMBER = 1;
    17  
    18   public void init()
    19   {
    20     // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
    21     connectionFactory = new ActiveMQConnectionFactory(
    22         ActiveMQConnection.DEFAULT_USER,
    23         ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");    // ActiveMQ默认使用的TCP连接端口是61616
    24     
    25     try{
    26       // 构造从工厂得到连接对象
    27       connection = connectionFactory.createConnection();
    28       connection.start();
    29       // 获取操作连接
    30       session = connection.createSession(Boolean.FALSE,  Session.AUTO_ACKNOWLEDGE);
    31       
    32       //第一种方式:Queue
    33       
    34 //            destination = session.createQueue("xkey");        // "xkey"可以取其他的。
    35 //            producer = session.createProducer(destination); // 得到消息生成者【发送者】
    36       
    37       //第二种方式:Topic
    38        Topic topic = session.createTopic("xkey.Topic");
    39        producer = session.createProducer(topic);
    40 
    41       // 持久化
    42       producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    43       
    44       // 构造消息,此处写死,项目就是参数,或者方法获取
    45       sendMessage(session,producer);
    46       
    47       //session.commit();
    48     }
    49     catch(Exception e)
    50     {
    51       e.printStackTrace();
    52     }
    53     finally
    54     {
    55       try 
    56       {
    57         connection.close();
    58       } 
    59       catch (JMSException e) 
    60       {
    61         // TODO Auto-generated catch block
    62         e.printStackTrace();
    63       }
    64     }
    65   }
    66   
    67   
    68   private void sendMessage(Session session,MessageProducer producer) throws JMSException
    69   {
    70     for (int i = 1; i <= SEND_NUMBER; i ++) 
    71     {  
    72       TextMessage message = session.createTextMessage("发送消息" + i);  
    73       
    74       System.out.println("发送消息" + i);  
    75       
    76    // 发送消息
    77       producer.send(message);
    78       
    79     }  
    80   }
    81   /**
    82    * @param args
    83    */
    84   public static void main(String[] args) 
    85   {
    86     // TODO Auto-generated method stub
    87     JmsSender jms = new JmsSender();
    88     jms.init();
    89   }
    90 }

    【接收端】

    connection = connectionFactory.createConnection();
    connection.setClientID("bbb");
    connection.start();

    session = connection.createSession(Boolean.FALSE,  Session.AUTO_ACKNOWLEDGE); 

    consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅

     1 import javax.jms.*;
     2 import org.apache.activemq.ActiveMQConnection;
     3 import org.apache.activemq.ActiveMQConnectionFactory;
     4 /**
     5  *
     6  * @author LIN NP
     7  */
     8 public class JmsReceiver
     9 {
    10   private ConnectionFactory connectionFactory = null;
    11   private Connection connection = null;
    12   private Session session = null;
    13   private MessageConsumer consumer = null;
    14   private Destination destination = null;
    15   public void init()
    16   {
    17     connectionFactory = new ActiveMQConnectionFactory(
    18         ActiveMQConnection.DEFAULT_USER,
    19         ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616"); // ActiveMQ默认使用的TCP连接端口是61616
    20     try
    21     {
    22       // 构造从工厂得到连接对象
    23       connection = connectionFactory.createConnection();
    24       connection.setClientID("bbb"); 
    25       connection.start();
    26       // 获取操作连接
    27       session = connection.createSession(Boolean.FALSE,  Session.AUTO_ACKNOWLEDGE); 
    28       /**
    29       * 第一种方式:Queue
    30       */
    31 //            destination = session.createQueue("xkey");
    32 //            consumer = session.createConsumer(destination);
    33       /**
    34        * 第二种方式:Topic
    35        */
    36       
    37       Topic topic = session.createTopic("xkey.Topic");
    38       //consumer = session.createConsumer(topic);
    39       consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅
    40       
    41       /**
    42         * 
    43         */
    44       while (true) 
    45       {  
    46         //设置接收者接收消息的时间,为了便于测试,这里设定为500s
    47         TextMessage message = (TextMessage) consumer.receive(500);  
    48         if (null != message) 
    49         {  
    50           System.out.println("Receiver: " + message.getText());  
    51         }
    52         else 
    53         {  
    54           break;  
    55         }  
    56       }  
    57     }
    58     catch(Exception e)
    59     {
    60       e.printStackTrace();
    61     }
    62     finally
    63     {
    64       try 
    65       {
    66         connection.close();
    67       } 
    68       catch (JMSException e) 
    69       {
    70         // TODO Auto-generated catch block
    71         e.printStackTrace();
    72       }
    73     }
    74   }
    75   /**
    76    * @param args
    77    */
    78   public static void main(String[] args) 
    79   {
    80     // TODO Auto-generated method stub
    81     JmsReceiver jms = new JmsReceiver();
    82     jms.init();
    83   }
    84 }
     1 //创建JMS连接和会话
     2 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);   
     3 connection = factory.createConnection();   
     4 connection.setClientID(Constant.JMS_CLIENT_ID);   
     5 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
     6 // 创建消息发送主题和发送者
     7 Topic jmsSendTopic = session.createTopic(sendTopic);
     8 sendTopicProducer = session.createProducer(jmsSendTopic);
     9 sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    10 sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
    11 // 创建消息接收主题和接收者
    12 Topic jmsReceiveTopic = session.createTopic(receiveTopic);
    13 receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
    14 receiveTopicConsumer.setMessageListener(this);
    15 connection.start();  

    解答:问题原因在于这段代码在接收到JMS消息时不会向ActiveMQ服务器确认消息的接收,故而ActiveMQ服务器一直认为该消息没有成功发送给接收者,因而每次接收者重启之后就会收到ActiveMQ服务器发送过来的消息。在这里要解释一下session的创建。

    session = connection.createSession(true,Session.Auto_ACKNOWLEDGE);

    当createSession第一个参数为true时,表示创建的session被标记为transactional的,确认消息就通过确认和校正来自动地处理,第二个参数应该是没用的。

    当createSession的第一个参数为false时,表示创建的session没有标记为transactional,此时有三种用于消息确认的选项:
    **AUTO_ACKNOWLEDGE session将自动地确认收到的一则消息;
    **CLIENT_ACKNOWLEDGE 客户端程序将确认收到的一则消息,调用这则消息的确认方法;
    **DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,这将导致消息提供者传递的一些复制消息可能出错。

    JMS有两种消息传递方式。标记为NON_PERSISTENT的消息最多传递一次,而标记为PERSISTENT的消息将使用暂存后再转发的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失,但是得等到这个服务恢复联机的时候才会被传递。所以默认的消息传递方式是非持久性的,虽然使用非持久性消息可能降低内存和需要的存储器,但这种传递方式只有当你不需要接收所有消息时才使用。
    因此正确的代码只需改动一处就行了,即将true改为false

     1 //创建JMS连接和会话
     2 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);   
     3 connection = factory.createConnection();   
     4 connection.setClientID(Constant.JMS_CLIENT_ID);   
     5 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     6 // 创建消息发送主题和发送者
     7 Topic jmsSendTopic = session.createTopic(sendTopic);
     8 sendTopicProducer = session.createProducer(jmsSendTopic);
     9 sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    10 sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
    11 // 创建消息接收主题和接收者
    12 Topic jmsReceiveTopic = session.createTopic(receiveTopic);
    13 receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
    14 receiveTopicConsumer.setMessageListener(this);
    15 connection.start();  
  • 相关阅读:
    Web 日志分析过程
    nginx系列之九:lua服务
    Linux网络编程之IO模型
    从URL输入到页面展现到底发生什么
    CentOS 日常运维十大技能
    以MySQL为例,详解数据库索引原理(1)
    Elasticsearch的特点以及应用场景
    Ubuntu1804编译安装LNMP
    golang 高级
    Centos7 安装 Redis
  • 原文地址:https://www.cnblogs.com/lsx1993/p/4626717.html
Copyright © 2011-2022 走看看