zoukankan      html  css  js  c++  java
  • 分布式-信息方式-ActiveMQ静态网络连接信息回流功能

     
                                                 “丢失”的消息
    有这样的场景, broker1和 broker2通过 netwoskconnector连接,一些 consumers连接到 broker1,
    消费 broker2上的消息。消息先被 broker1从 broker2上消费掉,然后转发给这些 consumers。不幸的是转
    发部分消息的时候 broker1重启了,这些 consumer发现 broker1连接失败,通过 failover连接到 broker2
    上去了,但是有一部分他们还没有消费的消息被 broker2已经分发到了 broker1上去了。这些消息,就好
    像是消失了,除非有消费者重新连接到 broker1上来消费。怎么办呢?
    从5.6版起,在 destinationPolicy上新增的选项 replayWhenNoConsumers。这个选项使得 broker1
    上有需要转发的消息但是没有消费者时,把消息回流到它原始的 broker,同时把 enableAudit设置为
    false,为了防止消息回流后祓当做重复消息而不被分发,示例如下:

    <destinationPolicy>
    <policyMap>
    <policyEntries>
    <policyEntry queue=">" enableAudit="false">
    <networkBridgeFilterFactory>
    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
    </networkBridgeFilterFactory>
    </policyEntry>
    </policyEntries>
    </policyMap>
    </destinationPolicy>

    代码如下:

    package test.mq.staitsnetwork;
    
     
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Sender {
           public static void main(String[] args) throws JMSException, InterruptedException {
            ConnectionFactory   ConnectionFactory=new ActiveMQConnectionFactory(
                    "tcp://192.168.145.100:61616"
                    );
            Connection connection=ConnectionFactory.createConnection();
            connection.start();
        
            Session session=connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
            Destination destination=session.createQueue("my_queue");
            MessageProducer Producer=session.createProducer(destination);
         
            for(int i=0;i<30;i++){
                 TextMessage message=session.createTextMessage("message----"+i);
                    //Thread.sleep(1000);  
                    Producer.send(message);
            }
             session.commit();
             session.close();
             connection.close();    
        }
    }

    消费者1

    package test.mq.staitsnetwork;
    
    import java.util.Enumeration;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;
    public class Receiver1{
        
        public static void main(String[] args) throws JMSException {
            ConnectionFactory   connectionFactory=new ActiveMQConnectionFactory(
                    "tcp://192.168.145.100:61676"
                    );
            for(int i=0;i<30;i++){
                Thread    t=new MyThread(connectionFactory);
                t.start();
                try {
                    Thread.sleep(1000l);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        
        }
    }
    class MyThread extends Thread{
             private ConnectionFactory   connectionFactory=null;
             public  MyThread(ConnectionFactory   connectionFactory){
             this.connectionFactory = connectionFactory;
             }
           public void run(){
                try {
                    final Connection  connection = connectionFactory.createConnection();
                    connection.start();
                    Enumeration names=connection.getMetaData().getJMSXPropertyNames();
                     
                    final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
                    Destination destination=session.createQueue("my_queue");
                    MessageConsumer Consumer=session.createConsumer(destination);
                    Consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message msg) {
                        TextMessage     txtmsg=(TextMessage) msg; 
                        try {
                            System.out.println("接收信息1--->"+txtmsg.getText());
                        } catch (JMSException e1) {
                            e1.printStackTrace();
                        }
                        try {
                            session.commit();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                        try {
                            session.close();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        try {
                            connection.close();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        }
                    });
                     
                    
                    
                    
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
           }    
        }

    消费者2

    package test.mq.staitsnetwork;
    
    import java.util.Enumeration;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;
    public class Receiver2{
        
        public static void main(String[] args) throws JMSException {
            ConnectionFactory   connectionFactory=new ActiveMQConnectionFactory(
                    "tcp://192.168.145.100:61616"
                    );
            for(int i=0;i<30;i++){
                Thread    t=new MyThread2(connectionFactory);
                t.start();
                try {
                    Thread.sleep(1000l);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        
        }
    }
    class MyThread2 extends Thread{
             private ConnectionFactory   connectionFactory=null;
             public  MyThread2(ConnectionFactory   connectionFactory){
             this.connectionFactory = connectionFactory;
             }
           public void run(){
                try {
                    final Connection  connection = connectionFactory.createConnection();
                    connection.start();
                    Enumeration names=connection.getMetaData().getJMSXPropertyNames();
                     
                    final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
                    Destination destination=session.createQueue("my_queue");
                    MessageConsumer Consumer=session.createConsumer(destination);
                    Consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message msg) {
                        TextMessage     txtmsg=(TextMessage) msg; 
                        try {
                            System.out.println("接收信息2--->"+txtmsg.getText());
                        } catch (JMSException e1) {
                            e1.printStackTrace();
                        }
                        try {
                            session.commit();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                        try {
                            session.close();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        try {
                            connection.close();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        }
                    });
                     
                    
                    
                    
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
           }    
        }

    运行结果:

    消费者1

    消费者2

  • 相关阅读:
    一起谈.NET技术,C#调试心经(续) 狼人:
    基于xmpp openfire smack开发之openfire介绍和部署[1]
    mysql学习笔记二
    使用jQuery实现的网页版的个人简历
    基于xmpp openfire smack开发之smack类库介绍和使用[2]
    基于色彩恒常( color constancy)特性的FrankleMcCann Retinex图像增强。
    The import org.cocos2dx.lib cannot be resolved
    C++编译器的递归深度与程序优化思考
    jquery实战视频教程_选项卡效果一
    编译器是怎样工作的?用lex和yacc 写一个计算器(2)
  • 原文地址:https://www.cnblogs.com/caoyingjielxq/p/9359744.html
Copyright © 2011-2022 走看看