zoukankan      html  css  js  c++  java
  • ActiveMQ Cluster (ActiveMQ 集群) 配置

    构建高可用的ActiveMQ系统在生产环境中是非常重要的,对于这个apache的消息中间件实现高可用非常简单,只要在Apache ActiveMQ单点基本配置基础上做一次配置变更(如果在一台设备上部署多个AMQ,需要修改对应端口号),即可实现 

    AMQ实现高可用部署有三种方案: 
    1、Master-Slave 
    2、SharedFile System Master Slave 
    3、JDBCMaster Slave 

    第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛; 
    第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库; 
    第二种方案同样支持N个AMQ实例组网,但由于他是基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。 

    shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。 

    多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。 


     


    Apache ActiveMQ单点基本配置的原配置内容: 

    <persistenceAdapter> 
                <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter> 

    修改为: 

    <persistenceAdapter> 
                 <kahaDB directory="D:\ActiveMQ Cluster\shareBrokerData" enableIndexWriteAsync="true"  enableJournalDiskSyncs="false"/> 
    </persistenceAdapter> 

    在D:\ActiveMQ Cluster目录先创建shareBrokerData文件夹。 

    注意: 

    1.前面提到如果在一台设备上部署多个AMQ,需要修改对应端口号,如AMQ对外的监听端口61616和jetty的监听端口8161等。 
    2.如果多套AMQ部署在不同的设备上,这里的directory应该指向一个远程的系统目录(分布式文件系统) 
    3.客户端通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示。 

    下面为在一台设备上部署两个AMQ示例: 
    ActiveMQ A 
    1.activemq.xml修改监听端口: 

    <transportConnectors> 
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
    <!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

    </transportConnectors> 

    2.jetty.xml修改监听端口: 

    <property name="connectors"> 
                <list> 
                    <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                        <property name="port" value="8166" /> 
                    </bean> 
                    <!-- 
                        Enable this connector if you wish to use https with web console 
                    --> 
                    <!-- 
                    <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                        <property name="port" value="8162" /> 
                        <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                        <property name="password" value="password" /> 
                    </bean> 
                    --> 
                </list> 
    </property> 

    ActiveMQ B 
    1.activemq.xml修改监听端口: 

    <transportConnectors> 
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
    <!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

    </transportConnectors> 

    2.jetty.xml修改监听端口: 

    <property name="connectors"> 
                <list> 
                    <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                        <property name="port" value="8166" /> 
                    </bean> 
                    <!-- 
                        Enable this connector if you wish to use https with web console 
                    --> 
                    <!-- 
                    <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                        <property name="port" value="8162" /> 
                        <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                        <property name="password" value="password" /> 
                    </bean> 
                    --> 
                </list> 
    </property> 


    Java测试程序代码: 
    1.Producer: 

    import javax.jms.Connection; 
    import javax.jms.DeliveryMode; 
    import javax.jms.Destination; 
    import javax.jms.JMSException; 
    import javax.jms.MessageProducer; 
    import javax.jms.Session; 
    import javax.jms.TextMessage; 

    import org.apache.activemq.ActiveMQConnectionFactory; 
       
    public class ProducerTool { 
       
        private String subject = "TOOL.DEFAULT";    
       
        private Destination destination = null;    
       
        private Connection connection = null;    
       
        private Session session = null;    
       
        private MessageProducer producer = null;    
       
        // 初始化 
        private void initialize() throws JMSException, Exception {    
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0,tcp://172.16.30.11:61617?wireFormat.maxInactivityDuration=0)");    
            connection = connectionFactory.createConnection();    
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
            destination = session.createQueue(subject);    
            producer = session.createProducer(destination);    
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
             
        }    
       
        // 发送消息    
        public void produceMessage(String message) throws JMSException, Exception {    
            initialize();    
            TextMessage msg = session.createTextMessage(message);    
            connection.start();    
            System.out.println("Producer:->Sending message: " + message);    
            producer.send(msg);    
            System.out.println("Producer:->Message sent complete!");    
        }    
       
        // 关闭连接     
        public void close() throws JMSException {    
            System.out.println("Producer:->Closing connection");    
            if (producer != null)    
                producer.close();    
            if (session != null)    
                session.close();    
            if (connection != null)    
                connection.close();    
       }    
    }  


    import javax.jms.Connection; 
    import javax.jms.Destination; 
    import javax.jms.JMSException; 
    import javax.jms.Message; 
    import javax.jms.MessageConsumer; 
    import javax.jms.MessageListener; 
    import javax.jms.Session; 
    import javax.jms.TextMessage; 

    import org.apache.activemq.ActiveMQConnectionFactory; 
       
    public class ConsumerTool implements MessageListener {      
       
        private String subject = "TOOL.DEFAULT";    
       
        private Destination destination = null;    
       
        private Connection connection = null;    
       
        private Session session = null;    
       
        private MessageConsumer consumer = null;    
       
        // 初始化    
        private void initialize() throws JMSException, Exception {    
           ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
            connection = connectionFactory.createConnection();    
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
            destination = session.createQueue(subject);    
            consumer = session.createConsumer(destination);    
                
        }    
       
        // 消费消息       
        public void consumeMessage() throws JMSException, Exception {    
            initialize();    
            connection.start();    
                
            System.out.println("Consumer:->Begin listening...");    
            // 
            consumer.setMessageListener(this);    
            // Message message = consumer.receive();    
        }    
       
        // 关闭连接   
        public void close() throws JMSException {    
            System.out.println("Consumer:->Closing connection");    
            if (consumer != null)    
                consumer.close();    
            if (session != null)    
                session.close();    
            if (connection != null)    
                connection.close();    
        }    
       
        // 消息处理函数  
        public void onMessage(Message message) {    
            try {    
                if (message instanceof TextMessage) {    
                    TextMessage txtMsg = (TextMessage) message;    
                    String msg = txtMsg.getText();    
                    System.out.println("Consumer:->Received: " + msg);    
                } else {    
                    System.out.println("Consumer:->Received: " + message);    
                }    
            } catch (JMSException e) {    
                // TODO Auto-generated catch block    
                e.printStackTrace();    
            }    
        }    
    }  


    2.Consumer: 

    import javax.jms.Connection; 
    import javax.jms.Destination; 
    import javax.jms.JMSException; 
    import javax.jms.Message; 
    import javax.jms.MessageConsumer; 
    import javax.jms.MessageListener; 
    import javax.jms.Session; 
    import javax.jms.TextMessage; 

    import org.apache.activemq.ActiveMQConnectionFactory; 
       
    public class ConsumerTool implements MessageListener {      
       
        private String subject = "TOOL.DEFAULT";    
       
        private Destination destination = null;    
       
        private Connection connection = null;    
       
        private Session session = null;    
       
        private MessageConsumer consumer = null;    
       
        // 初始化    
        private void initialize() throws JMSException, Exception {    
           ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
            connection = connectionFactory.createConnection();    
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
            destination = session.createQueue(subject);    
            consumer = session.createConsumer(destination);    
                
        }    
       
        // 消费消息       
        public void consumeMessage() throws JMSException, Exception {    
            initialize();    
            connection.start();    
                
            System.out.println("Consumer:->Begin listening...");    
            // 
            consumer.setMessageListener(this);    
            // Message message = consumer.receive();    
        }    
       
        // 关闭连接   
        public void close() throws JMSException {    
            System.out.println("Consumer:->Closing connection");    
            if (consumer != null)    
                consumer.close();    
            if (session != null)    
                session.close();    
            if (connection != null)    
                connection.close();    
        }    
       
        // 消息处理函数  
        public void onMessage(Message message) {    
            try {    
                if (message instanceof TextMessage) {    
                    TextMessage txtMsg = (TextMessage) message;    
                    String msg = txtMsg.getText();    
                    System.out.println("Consumer:->Received: " + msg);    
                } else {    
                    System.out.println("Consumer:->Received: " + message);    
                }    
            } catch (JMSException e) {    
                // TODO Auto-generated catch block    
                e.printStackTrace();    
            }    
        }    
    }  

    3.Main 

    import javax.jms.JMSException; 
       
    public class Test {    
       
        /**   
         * @param args   
         */   
        public static void main(String[] args) throws JMSException, Exception { 
        
            
            ConsumerTool consumer = new ConsumerTool();    
            ProducerTool producer = new ProducerTool();    
            // 开始监听    
            consumer.consumeMessage();    
                
            // 延时500毫秒之后发送消息    
            Thread.sleep(500);    
            producer.produceMessage("Hello, world!");    
            producer.close();    
                
            // 延时500毫秒之后停止接受消息    
            Thread.sleep(500);    
            consumer.close();    
        
        }    
    }  


    ActiveMQ A 启动界面: 


     


    ActiveMQ B 启动界面: 


     


    AMQ A先启动,先锁文件,当AMQ B启动是,不能锁文件,但会不断的监听等待。 


    运行Java Test程序日志: 

    10:22:43.745 INFO  [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616 
    Consumer:->Begin listening... 
    10:22:45.623 INFO  [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0 
    Producer:->Sending message: Hello, world! 
    Producer:->Message sent complete! 
    Producer:->Closing connection 
    Consumer:->Received: Hello, world! 
    Consumer:->Closing connection 


    ActiveMQ A 管理界面: 




    异常处理: 

    配置好ActiveMQ后,前几次都启动成功。有一天启动时发现启动不成功,查看报错日志发现出现如下提示: 
    Failed to start Apache ActiveMQ (localhost, ID:*-PC-*-*-0:1). Reason: java.io.IOException: Transport Connector could not be registered in JMX: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind。 

    1.先去查看是不是端口被占用,用netstat -ano命令查看端口使用情况,发现没有端口被占用。 
    2.在控制面板的服务里把正在运行的Internet Connection Sharing (ICS)为家庭和小型办公网络提供网络地址转换、寻址、名称解析和/或入侵保护服务关了,他占用着端口。 
    3.把此服务关了后再启动ActvieMQ成功了。 

  • 相关阅读:
    Leetcode 1489找到最小生成树李关键边和伪关键边
    Leetcode 113 路径总和 II
    hdu 1223 还是畅通工程
    hdu 1087 Super Jumping! Jumping! Jumping!
    hdu 1008 Elevator
    hdu 1037 Keep on Truckin'
    湖工oj 1241 畅通工程
    湖工oj 1162 大武汉局域网
    hdu 2057 A + B Again
    poj 2236 Wireless Network
  • 原文地址:https://www.cnblogs.com/gisblogs/p/4311230.html
Copyright © 2011-2022 走看看