zoukankan      html  css  js  c++  java
  • ActiveMQ Cluster (ActiveMQ 集群) 配置

    构建高可用的ActiveMQ系统在生产环境中是非常重要的,对于这个apache的消息中间件实现高可用非常简单,只要在Apache ActiveMQ单点基本配置基础上做一次配置变更(如果在一台设备上部署多个AMQ,需要修改对应端口号),即可实现 

    AMQ实现高可用部署有三种方案: 
    1、Master-Slave 
    2、SharedFile System Master Slave 
    3、JDBCMaster Slave 

    第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛; 
    第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库; 
    第二种方案同样支持N个AMQ实例组网,但由于他是基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。 

    shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。 

    多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。 


     


    Apache ActiveMQ单点基本配置的原配置内容: 

    <persistenceAdapter> 
                <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter> 

    修改为: 

    <persistenceAdapter> 
                 <kahaDB directory="D:\ActiveMQ Cluster\shareBrokerData" enableIndexWriteAsync="true"  enableJournalDiskSyncs="false"/> 
    </persistenceAdapter> 

    在D:\ActiveMQ Cluster目录先创建shareBrokerData文件夹。 

    注意: 

    1.前面提到如果在一台设备上部署多个AMQ,需要修改对应端口号,如AMQ对外的监听端口61616和jetty的监听端口8161等。 
    2.如果多套AMQ部署在不同的设备上,这里的directory应该指向一个远程的系统目录(分布式文件系统) 
    3.客户端通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示。 

    下面为在一台设备上部署两个AMQ示例: 
    ActiveMQ A 
    1.activemq.xml修改监听端口: 

    <transportConnectors> 
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
    <!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

    </transportConnectors> 

    2.jetty.xml修改监听端口: 

    <property name="connectors"> 
                <list> 
                    <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                        <property name="port" value="8166" /> 
                    </bean> 
                    <!-- 
                        Enable this connector if you wish to use https with web console 
                    --> 
                    <!-- 
                    <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                        <property name="port" value="8162" /> 
                        <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                        <property name="password" value="password" /> 
                    </bean> 
                    --> 
                </list> 
    </property> 

    ActiveMQ B 
    1.activemq.xml修改监听端口: 

    <transportConnectors> 
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
    <!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

    </transportConnectors> 

    2.jetty.xml修改监听端口: 

    <property name="connectors"> 
                <list> 
                    <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                        <property name="port" value="8166" /> 
                    </bean> 
                    <!-- 
                        Enable this connector if you wish to use https with web console 
                    --> 
                    <!-- 
                    <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                        <property name="port" value="8162" /> 
                        <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                        <property name="password" value="password" /> 
                    </bean> 
                    --> 
                </list> 
    </property> 


    Java测试程序代码: 
    1.Producer: 

    import javax.jms.Connection; 
    import javax.jms.DeliveryMode; 
    import javax.jms.Destination; 
    import javax.jms.JMSException; 
    import javax.jms.MessageProducer; 
    import javax.jms.Session; 
    import javax.jms.TextMessage; 

    import org.apache.activemq.ActiveMQConnectionFactory; 
       
    public class ProducerTool { 
       
        private String subject = "TOOL.DEFAULT";    
       
        private Destination destination = null;    
       
        private Connection connection = null;    
       
        private Session session = null;    
       
        private MessageProducer producer = null;    
       
        // 初始化 
        private void initialize() throws JMSException, Exception {    
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0,tcp://172.16.30.11:61617?wireFormat.maxInactivityDuration=0)");    
            connection = connectionFactory.createConnection();    
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
            destination = session.createQueue(subject);    
            producer = session.createProducer(destination);    
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
             
        }    
       
        // 发送消息    
        public void produceMessage(String message) throws JMSException, Exception {    
            initialize();    
            TextMessage msg = session.createTextMessage(message);    
            connection.start();    
            System.out.println("Producer:->Sending message: " + message);    
            producer.send(msg);    
            System.out.println("Producer:->Message sent complete!");    
        }    
       
        // 关闭连接     
        public void close() throws JMSException {    
            System.out.println("Producer:->Closing connection");    
            if (producer != null)    
                producer.close();    
            if (session != null)    
                session.close();    
            if (connection != null)    
                connection.close();    
       }    
    }  


    import javax.jms.Connection; 
    import javax.jms.Destination; 
    import javax.jms.JMSException; 
    import javax.jms.Message; 
    import javax.jms.MessageConsumer; 
    import javax.jms.MessageListener; 
    import javax.jms.Session; 
    import javax.jms.TextMessage; 

    import org.apache.activemq.ActiveMQConnectionFactory; 
       
    public class ConsumerTool implements MessageListener {      
       
        private String subject = "TOOL.DEFAULT";    
       
        private Destination destination = null;    
       
        private Connection connection = null;    
       
        private Session session = null;    
       
        private MessageConsumer consumer = null;    
       
        // 初始化    
        private void initialize() throws JMSException, Exception {    
           ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
            connection = connectionFactory.createConnection();    
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
            destination = session.createQueue(subject);    
            consumer = session.createConsumer(destination);    
                
        }    
       
        // 消费消息       
        public void consumeMessage() throws JMSException, Exception {    
            initialize();    
            connection.start();    
                
            System.out.println("Consumer:->Begin listening...");    
            // 
            consumer.setMessageListener(this);    
            // Message message = consumer.receive();    
        }    
       
        // 关闭连接   
        public void close() throws JMSException {    
            System.out.println("Consumer:->Closing connection");    
            if (consumer != null)    
                consumer.close();    
            if (session != null)    
                session.close();    
            if (connection != null)    
                connection.close();    
        }    
       
        // 消息处理函数  
        public void onMessage(Message message) {    
            try {    
                if (message instanceof TextMessage) {    
                    TextMessage txtMsg = (TextMessage) message;    
                    String msg = txtMsg.getText();    
                    System.out.println("Consumer:->Received: " + msg);    
                } else {    
                    System.out.println("Consumer:->Received: " + message);    
                }    
            } catch (JMSException e) {    
                // TODO Auto-generated catch block    
                e.printStackTrace();    
            }    
        }    
    }  


    2.Consumer: 

    import javax.jms.Connection; 
    import javax.jms.Destination; 
    import javax.jms.JMSException; 
    import javax.jms.Message; 
    import javax.jms.MessageConsumer; 
    import javax.jms.MessageListener; 
    import javax.jms.Session; 
    import javax.jms.TextMessage; 

    import org.apache.activemq.ActiveMQConnectionFactory; 
       
    public class ConsumerTool implements MessageListener {      
       
        private String subject = "TOOL.DEFAULT";    
       
        private Destination destination = null;    
       
        private Connection connection = null;    
       
        private Session session = null;    
       
        private MessageConsumer consumer = null;    
       
        // 初始化    
        private void initialize() throws JMSException, Exception {    
           ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
            connection = connectionFactory.createConnection();    
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
            destination = session.createQueue(subject);    
            consumer = session.createConsumer(destination);    
                
        }    
       
        // 消费消息       
        public void consumeMessage() throws JMSException, Exception {    
            initialize();    
            connection.start();    
                
            System.out.println("Consumer:->Begin listening...");    
            // 
            consumer.setMessageListener(this);    
            // Message message = consumer.receive();    
        }    
       
        // 关闭连接   
        public void close() throws JMSException {    
            System.out.println("Consumer:->Closing connection");    
            if (consumer != null)    
                consumer.close();    
            if (session != null)    
                session.close();    
            if (connection != null)    
                connection.close();    
        }    
       
        // 消息处理函数  
        public void onMessage(Message message) {    
            try {    
                if (message instanceof TextMessage) {    
                    TextMessage txtMsg = (TextMessage) message;    
                    String msg = txtMsg.getText();    
                    System.out.println("Consumer:->Received: " + msg);    
                } else {    
                    System.out.println("Consumer:->Received: " + message);    
                }    
            } catch (JMSException e) {    
                // TODO Auto-generated catch block    
                e.printStackTrace();    
            }    
        }    
    }  

    3.Main 

    import javax.jms.JMSException; 
       
    public class Test {    
       
        /**   
         * @param args   
         */   
        public static void main(String[] args) throws JMSException, Exception { 
        
            
            ConsumerTool consumer = new ConsumerTool();    
            ProducerTool producer = new ProducerTool();    
            // 开始监听    
            consumer.consumeMessage();    
                
            // 延时500毫秒之后发送消息    
            Thread.sleep(500);    
            producer.produceMessage("Hello, world!");    
            producer.close();    
                
            // 延时500毫秒之后停止接受消息    
            Thread.sleep(500);    
            consumer.close();    
        
        }    
    }  


    ActiveMQ A 启动界面: 


     


    ActiveMQ B 启动界面: 


     


    AMQ A先启动,先锁文件,当AMQ B启动是,不能锁文件,但会不断的监听等待。 


    运行Java Test程序日志: 

    10:22:43.745 INFO  [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616 
    Consumer:->Begin listening... 
    10:22:45.623 INFO  [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0 
    Producer:->Sending message: Hello, world! 
    Producer:->Message sent complete! 
    Producer:->Closing connection 
    Consumer:->Received: Hello, world! 
    Consumer:->Closing connection 


    ActiveMQ A 管理界面: 




    异常处理: 

    配置好ActiveMQ后,前几次都启动成功。有一天启动时发现启动不成功,查看报错日志发现出现如下提示: 
    Failed to start Apache ActiveMQ (localhost, ID:*-PC-*-*-0:1). Reason: java.io.IOException: Transport Connector could not be registered in JMX: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind。 

    1.先去查看是不是端口被占用,用netstat -ano命令查看端口使用情况,发现没有端口被占用。 
    2.在控制面板的服务里把正在运行的Internet Connection Sharing (ICS)为家庭和小型办公网络提供网络地址转换、寻址、名称解析和/或入侵保护服务关了,他占用着端口。 
    3.把此服务关了后再启动ActvieMQ成功了。 

  • 相关阅读:
    JavaScript遍历表单元素
    JavaScript实现按钮改变网页背景色
    JavaScript实现指定格式字符串表单校验
    jQuery实现数字时钟
    Python使用递归绘制谢尔宾斯基三角形
    Python使用函数模拟“汉诺塔”过程
    Python使用函数实现杨辉三角
    CSS简单样式练习(七)
    CSS简单样式练习(六)
    cstring to char *例子
  • 原文地址:https://www.cnblogs.com/gisblogs/p/4311230.html
Copyright © 2011-2022 走看看