zoukankan      html  css  js  c++  java
  • ActiveMQ的静态网络配置

    static networkConnector是用于创建一个静态的配置对于网络中的多个Broker做集群,这种协议用于复合url,一个复合url包括多个url地址。

    <networkConnectors>
                 <networkConnector name="local network"  duplex="true"
                  uri="static://(tcp://192.168.174.104:61616,tcp://192.168.174.104:61676)"/>
    </networkConnectors>

    常用networkConnector配置的可用属性:

      conduitSubscriptions :默认true,是否把同一个broker的多个consumer当做一个来处理

      duplex :默认false,设置是否能双向通信


    消息发送代码
    public class JmsSend {
        
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
            Connection connection = connectionFactory.createConnection();
            
            connection.start();
            
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination queue=session.createQueue("my-queue4");
            
        
            MessageProducer producer=session.createProducer(queue);
            
            
            for(int i=0 ; i<20 ; i++){
                 TextMessage message=session.createTextMessage("message"+i);
                 //message.setStringProperty("queue", "queue"+i);
                 //message.setJMSType("1");
                 producer.send(message);
            }
            session.commit();
            session.close();
            
            connection.close();
            
        }
    
    }
    View Code

    192.168.174.104:61616 broker1 接收测试代码
    public class JmsReceiver1 {
        public static void main(String[] args) throws Exception {
    
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://192.168.174.104:61616");
            
    
            for (int i=0; i<10 ;i++){
                new Myhread1(connectionFactory).start();
                
                Thread.sleep(1000);
                
            }
            
            
    
        }
    
    }
    
    class Myhread1 extends Thread {
        
        private ConnectionFactory connectionFactory ;
        
        public Myhread1(ConnectionFactory connectionFactory) {
            super();
            this.connectionFactory = connectionFactory;
        }
    
    
        public void run() {
             
             
            try {
                
                final Connection connection = connectionFactory.createConnection();
                connection.start();
                
                
                final Session session = connection.createSession(Boolean.TRUE,
                        Session.AUTO_ACKNOWLEDGE);
                
                Destination queue = session.createQueue("my-queue4");
    
                MessageConsumer consumer = session.createConsumer(queue);
    
                consumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message message) {
                        TextMessage msg = (TextMessage) message;
                        try {
                            System.out.println("1======"+msg.getText());
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                        try {
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                        try {
                            session.close();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                        try {
                            connection.close();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                        
                    }
                });
                
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
            
        }
    
    }
    View Code

    192.168.174.104:61676 broker2 接收测试代码
    public class JmsReceiver2 {
        public static void main(String[] args) throws Exception {
    
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://192.168.174.104:61676");
            
    
            for (int i=0; i<10 ;i++){
                new Myhread2(connectionFactory).start();
                
                Thread.sleep(1000);
                
            }
            
            
    
        }
    
    }
    
    class Myhread2 extends Thread {
        
        private ConnectionFactory connectionFactory ;
        
        public Myhread2(ConnectionFactory connectionFactory) {
            super();
            this.connectionFactory = connectionFactory;
        }
    
    
        public void run() {
             
            
            try {
                
                final Connection connection  = connectionFactory.createConnection();
                connection.start();
                
                
                final Session session = connection.createSession(Boolean.TRUE,
                        Session.AUTO_ACKNOWLEDGE);
                
                Destination queue = session.createQueue("my-queue4");
    
                MessageConsumer consumer = session.createConsumer(queue);
    
                consumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message message) {
                        TextMessage msg = (TextMessage) message;
                        try {
                            System.out.println("2======"+msg.getText());
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                        try {
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                        try {
                            session.close();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                        
                        try {
                            connection.close();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        
                    }
                });
                
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
            
        }
    
    }
    View Code

    “丢失”的消息

    broker1和broker2通过networkConnector连接,一些consumers连接到broker2,消费broker1上的消息。消息先被broker2从broker1上消费掉,然后转发给这些consumers。不幸的是转发部分消息的时候broker2重启了,这些consumers发现broker2连接失败,通过failover连接到broker1上去了,但是有一部分他们还没有消费的消息被broker1已经分发到了broker2上去了。这些消息,就好像是消失了。

    broker1 中my-queue4 接收到20条消息。

    broker1通过静态网络与broker2连接,与broker2相连的消费者消费后,broker1中Number of Pending Messages为0,即消息先被broker2从broker1上消费掉。

    一些consumers连接到broker1,没法从broker1获取消息消费。

     

    针对“丢失”的消息,配置replayWhenNoConsumers选项

    这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发。

    <policyEntries>
            <policyEntry queue=">" enableAudit="false">
                    <networkBridgeFilterFactory>
                            <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                    </networkBridgeFilterFactory>
            </policyEntry>
    </policyEntries>

    容错的链接--Failover

    Failover协议实现了自动重新链接的逻辑。默认情况下,这种协议用于随机的去选择一个链接去链接,如果链接失败了,那么会链接到其他
    的Broker上。默认的配置定义了延迟重新链接,意味着传输将会在10秒后自动的去重新链接可用的broker。当然所有的重新链接参数都可以根据应用的需要而配置。

    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("failover:(tcp://192.168.174.104:61616,tcp://192.168.174.104:61676)?randomize=false");
    randomize:使用随机链接,以达到负载均衡的目的,默认true。
  • 相关阅读:
    C++学习笔记-C++对C语言的扩充和增强
    C++学习笔记-C++与C语言的一些区别
    C++学习笔记-C++与C语言的一些区别
    C学习笔记-字符串处理函数
    C学习笔记-字符串处理函数
    C学习笔记-gdb
    深入理解C语言-函数指针
    深入理解C语言-函数指针
    深入理解C语言-结构体做函数参数
    async 珠峰培训node正式课笔记 【async】任务流程控制,异步流程控制
  • 原文地址:https://www.cnblogs.com/xiaoliangup/p/9351461.html
Copyright © 2011-2022 走看看