zoukankan      html  css  js  c++  java
  • ActiveMQ集群下的消息回流功能

    ------------------------------------------------------------------

    "丢失"的消息

      如果有broker1和broker2通过networkConnector连接,有一个consumer1连接到broker1,一个consumer2连接到broker2,程序往broker1上面发送30条消息,这时consumer2连接到broker2消费消息,当consumer2消费了15条消息时,broker2挂掉了。 但是还剩下15条消息在broker2上面,这些消息就好像消息了,除非broker2重启了,然后有消费者连接到broker2上来消费消息,遇到这样的情况该怎么办呢?

      从5.6版本起,在destinationPolicy上新增的选择replayWhenNoConsumers,这个选项使得broker2上有需要转发的消息但是没有消费者时,把消息回流到它原来的broker1上,同时需要把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发,activemq.xml配置如下:

    配置允许双向连接

    先运行发送消息程序,然后运行61716端口的程序接收消息,再接收了5条后将程序断掉,再去运行61616端口的程序,也是可以接收消息的,这样就实现了消息的回流。

    消息发送程序:

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.Destination;
     4 import javax.jms.MessageProducer;
     5 import javax.jms.Session;
     6 import javax.jms.TextMessage;
     7 
     8 import org.apache.activemq.ActiveMQConnectionFactory;
     9 
    10 public class JmsSend {
    11     public static void main(String[] args) throws Exception {
    12         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
    13         Connection connection = connectionFactory.createConnection();
    14         connection.start();
    15         
    16         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    17         Destination destination = session.createQueue("my-queue");
    18         
    19         MessageProducer producer = session.createProducer(destination);
    20         for(int i = 0;i < 10;i++){
    21             TextMessage message = session.createTextMessage("message,1212 --->" + i);
    22             Thread.sleep(1000);
    23             //通过生产者发出消息
    24             producer.send(message);
    25         }
    26         session.commit();
    27         session.close();
    28         connection.close();
    29     }
    30 }

     61616端口接收消息程序:

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.Destination;
     4 import javax.jms.MessageConsumer;
     5 import javax.jms.Session;
     6 import javax.jms.TextMessage;
     7 
     8 import org.apache.activemq.ActiveMQConnectionFactory;
     9 
    10 public class JmsReceiver {
    11     public static void main(String[] args) throws Exception {
    12         ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
    13         Connection connection =  cf.createConnection();
    14         connection.start();
    15         
    16         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    17         Destination destination = session.createQueue("my-queue");
    18         MessageConsumer consumer = session.createConsumer(destination);
    19         int i = 0;
    20         while(i < 10){
    21             Thread.sleep(3000);
    22             i++;
    23             TextMessage message = (TextMessage)consumer.receive();
    24             session.commit();
    25             System.out.println("111接收到的消息是:"+message.getText());
    26         }
    27         session.close();
    28         connection.close();
    29     }
    30 }

     61716端口接收消息程序:

     1 import javax.jms.Connection;
     2 import javax.jms.ConnectionFactory;
     3 import javax.jms.Destination;
     4 import javax.jms.MessageConsumer;
     5 import javax.jms.Session;
     6 import javax.jms.TextMessage;
     7 
     8 import org.apache.activemq.ActiveMQConnectionFactory;
     9 
    10 public class JmsReceiver2 {
    11     public static void main(String[] args) throws Exception {
    12         ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.81:61716");
    13         Connection connection =  cf.createConnection();
    14         connection.start();
    15         
    16         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    17         Destination destination = session.createQueue("my-queue");
    18         MessageConsumer consumer = session.createConsumer(destination);
    19         int i = 0;
    20         while(i < 10){
    21             Thread.sleep(3000);
    22             i++;
    23             TextMessage message = (TextMessage)consumer.receive();
    24             session.commit();
    25             System.out.println("222接收到的消息是:"+message.getText());
    26         }
    27         session.close();
    28         connection.close();
    29     }
    30 }
  • 相关阅读:
    UDP协议测试
    openstack ussusi ubuntu 20 centos8 dracut initqueue timeout
    wol linux远程通过数据帧自动开机
    openStack proformancee bottlenecks options optimized
    find 搜索排除搜索目录
    Ipv6
    golang学习笔记 ---日志库 logrus
    golang学习笔记---- 格式化IO
    golang学习笔记 --- struct 嵌套
    golang学习笔记---HTTPS
  • 原文地址:https://www.cnblogs.com/xinhuaxuan/p/6139561.html
Copyright © 2011-2022 走看看