zoukankan      html  css  js  c++  java
  • ActiveMQ 配置jdbc主从

    使用 jdbc 方式配置主从模式,持久化消息存放在数据库中。

    在同一时刻,只有一个 master broker,master 接受客户端的连接,slave 不接受连接。
    当 master 因为关机而下线后,其中一个 slave 会提升为 master,然后接受客户端连接。但原来 master 的非持久消息丢失了,而持久消息保存在数据库中。

    broker xml 配置:使用 MySQL 数据源

    <!--
        Licensed to the Apache Software Foundation (ASF) under one or more
        contributor license agreements.  See the NOTICE file distributed with
        this work for additional information regarding copyright ownership.
        The ASF licenses this file to You under the Apache License, Version 2.0
        (the "License"); you may not use this file except in compliance with
        the License.  You may obtain a copy of the License at
       
        http://www.apache.org/licenses/LICENSE-2.0
       
        Unless required by applicable law or agreed to in writing, software
        distributed under the License is distributed on an "AS IS" BASIS,
        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        See the License for the specific language governing permissions and
        limitations under the License.
    -->
    <!--  
        Use JDBC for message persistence
        For more information, see:
        
        http://activemq.apache.org/persistence.html
        
        You need to add Derby database to your classpath in order to make this example work.
        Download it from http://db.apache.org/derby/ and put it in the ${ACTIVEMQ_HOME}/lib/optional/ folder
        Optionally you can configure any other RDBM as shown below
        
        To run ActiveMQ with this configuration add xbean:conf/activemq-jdbc.xml to your command
        
        e.g. $ bin/activemq xbean:conf/activemq-jdbc.xml
     -->
    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
      
      <broker useJmx="false" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
           <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
        </persistenceAdapter>
    
        <transportConnectors>
           <transportConnector name="default" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>
      </broker>
     
      <!-- Embedded Derby DataSource Sample Setup -->
     <!--  <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
        <property name="databaseName" value="derbydb"/>
        <property name="createDatabase" value="create"/>
      </bean> -->
     
      <!-- Postgres DataSource Sample Setup -->
      <!--
      <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
        <property name="serverName" value="localhost"/>
        <property name="databaseName" value="activemq"/>
        <property name="portNumber" value="0"/>
        <property name="user" value="activemq"/>
        <property name="password" value="activemq"/>
        <property name="dataSourceName" value="postgres"/>
        <property name="initialConnections" value="1"/>
        <property name="maxConnections" value="10"/>
      </bean>
      -->
    
      <!-- MySql DataSource Sample Setup -->
    
      <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://192.168.40.8:3306/db_zhang?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="root"/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
      </bean>
    
      <!-- Oracle DataSource Sample Setup -->
      <!--
      <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
        <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
        <property name="username" value="scott"/>
        <property name="password" value="tiger"/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
      </bean>
      -->
    
    </beans>

    客户端配置,producer 和 consumer 是一样的:

    new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61618)");

    以 MySQL 数据库为例,启动 broker 后,MySQL 中会创建 3 张表:
    ACTIVEMQ_MSGS 存放持久消息
    ACTIVEMQ_LOCK 表中只有一条记录,broker 执行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE 获取锁,获得锁的 broker 是 master
    ACTIVEMQ_ACKS

    broker 获取锁的调用栈如下:

    // org.apache.activemq.store.jdbc.DefaultDatabaseLocker
    public void doStart() throws Exception {
    
        LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
        // SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE
        String sql = getStatements().getLockCreateStatement();
        LOG.debug("Locking Query is "+sql);
        
        while (true) {
            try {
                connection = dataSource.getConnection();
                connection.setAutoCommit(false);
                lockCreateStatement = connection.prepareStatement(sql);
                // 执行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE
                // 如果成功,则跳出循环。如果超时,则抛异常
                lockCreateStatement.execute();
                break;
            } catch (Exception e) {
                try {
                    if (isStopping()) {
                        throw new Exception(
                                "Cannot start broker as being asked to shut down. " 
                                        + "Interrupted attempt to acquire lock: "
                                        + e, e);
                    }
                    if (exceptionHandler != null) {
                        try {
                            exceptionHandler.handle(e);
                        } catch (Throwable handlerException) {
                            LOG.error( "The exception handler "
                                    + exceptionHandler.getClass().getCanonicalName()
                                    + " threw this exception: "
                                    + handlerException
                                    + " while trying to handle this exception: "
                                    + e, handlerException);
                        }
    
                    } else {
                        LOG.debug("Lock failure: "+ e, e);
                    }
                } finally {
                    // Let's make sure the database connection is properly
                    // closed when an error occurs so that we're not leaking
                    // connections 
                    if (null != connection) {
                        try {
                            connection.rollback();
                        } catch (SQLException e1) {
                            LOG.debug("Caught exception during rollback on connection: " + e1, e1);
                        }
                        try {
                            connection.close();
                        } catch (SQLException e1) {
                            LOG.debug("Caught exception while closing connection: " + e1, e1);
                        }
                        
                        connection = null;
                    }
                }
            } finally {
                if (null != lockCreateStatement) {
                    try {
                        lockCreateStatement.close();
                    } catch (SQLException e1) {
                        LOG.debug("Caught while closing statement: " + e1, e1);
                    }
                    lockCreateStatement = null;
                }
            }
    
            LOG.info("Failed to acquire lock.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
            try {
                Thread.sleep(lockAcquireSleepInterval);
            } catch (InterruptedException ie) {
                LOG.warn("Master lock retry sleep interrupted", ie);
            }
        }
    
        LOG.info("Becoming the master on dataSource: " + dataSource);
    }

    broker 执行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE 获取锁,获取成功,则成为 master,如果失败,则睡眠一段时间后,继续获取锁。

    超时而抛出的异常:

  • 相关阅读:
    【Azure Redis Cache】对StackExchange.Redis IOCP错误消息的解读
    【Azure Developer】使用REST API获取Activity Logs、传入Data Lake的数据格式问题
    【Azure 存储服务】Blob中数据通过Stream Analytics导出到SQL/Cosmos DB
    【Azure Redis 缓存】Linux VM使用6380端口(SSL方式)连接Azure Redis (redis-cli & stunnel)
    【Azure 应用服务】在Azure App Service for Windows 中部署Java/NodeJS/Python项目时,web.config的配置模板内容
    【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
    领域驱动实践总结(基本理论总结与分析+架构分析与代码设计+具体应用设计分析V)
    Java三元表达式中的陷阱
    Java有陷阱——慎用入参做返回值
    Eclipse中安装反编译工具Fernflower(Enhanced Class Decompiler)
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8955458.html
Copyright © 2011-2022 走看看