zoukankan      html  css  js  c++  java
  • TDDL生成全局ID原理

    TDDL 在分布式下的SEQUENCE原理

    TDDL大家应该很熟悉了,淘宝分布式数据层。很好的为我们实现了分库分表、Master/Salve、动态数据源配置等功能。

    那么分布式之后,数据库自增序列肯定用不了了,如何方便快捷的解决这个问题呢?TDDL也提供了SEQUENCE的解决方案。

    总述

    在数据库中创建 sequence 表,用于记录,当前已被占用的id最大值。

    每台客户端主机取一个id区间(比如 1000~2000)缓存在本地,并更新 sequence 表中的id最大值记录。

    客户端主机之间取不同的id区间,用完再取,使用乐观锁机制控制并发。

    第一步:创建一张sequence对应的表

    CREATE TABLE `imp_sequence` (
      `BIZ_NAME` varchar(45) NOT NULL COMMENT '业务名称',
      `CURRENT_VALUE` int(11) NOT NULL COMMENT '当前最大值',
      `GMT_CREATE` datetime DEFAULT NULL COMMENT '创建时间',
      `GMT_MODIFIED` datetime DEFAULT NULL COMMENT '修改时间',
      PRIMARY KEY (`BIZ_NAME`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='数据序列表';

    表名和字段可以按各自规则定义,定义之后需要与第二步DAO中的定义相对应!

    几张逻辑表需要声明几个sequence。

    第二步:配置sequenceDao

    <bean id="sequenceDao" class="com.taobao.tddl.client.sequence.impl.DefaultSequenceDao">
            <!-- 数据源 -->
            <property name="dataSource"  ref="dataSource" />
            <!-- 步长-->
            <property name="step" value="1000" />
            <!-- 重试次数-->
            <property name="retryTimes" value="1" />
            <!-- sequence 表名-->
            <property name="tableName" value="gt_sequence" />
            <!-- sequence 名称-->
            <property name="nameColumnName" value="BIZ_NAME" />
            <!-- sequence 当前值-->
            <property name="valueColumnName" value="CURRENT_VALUE" />
            <!-- sequence 更新时间-->
            <property name="gmtModifiedColumnName" value="gmt_modified" />
    </bean>
    View Code

    第三步:配置sequence生成器

    <bean id="businessSequence"  class="com.taobao.tddl.client.sequence.impl.DefaultSequence">
            <property name="sequenceDao" ref="sequenceDao"/>
            <property name="name" value="business_sequence" />
    </bean>
    View Code

    第四步:调用

    public class IbatisSmDAO extends SqlMapClientDaoSupport implements SmDAO {
     
      /**smSequence*/
      private DefaultSequence   businessSequence;
       
        public int insert(SmDO sm) throws DataAccessException {
            if (sm == null) {
                throw new IllegalArgumentException("Can't insert a null data object into db.");
            }
            
            try {
                sm.setId((int)businessSequence.nextValue());
            } catch (SequenceException e) {
                throw new RuntimeException("Can't get primary key.");
            }
            
            getSqlMapClientTemplate().insert("MS-SM-INSERT", sm);
     
            return sm.getId();
        }
    }
    View Code

    从调用配置中,我们可以发现其中涉及到二个重要类DefaultSequenceDao和DefaultSequence,这二个都是TDDL的默认实现。DefaultSequenceDao:序列DAO默认实现,JDBC方式。DefaultSequence:序列默认实现。


    先来看DefaultSequenceDao,TDDL中提供了默认的表名,列名和步长等,第一步的建表可以参照默认方式。

    private static final int MIN_STEP = 1;
        private static final int MAX_STEP = 100000;
        private static final int DEFAULT_STEP = 1000;
        private static final int DEFAULT_RETRY_TIMES = 150;
     
        private static final String DEFAULT_TABLE_NAME = "sequence";
        private static final String DEFAULT_NAME_COLUMN_NAME = "name";
        private static final String DEFAULT_VALUE_COLUMN_NAME = "value";
        private static final String DEFAULT_GMT_MODIFIED_COLUMN_NAME = "gmt_modified";
     
        private static final long DELTA = 100000000L;
     
        private DataSource dataSource;
     
        /**
         * 重试次数
         */
        private int retryTimes = DEFAULT_RETRY_TIMES;
     
        /**
         * 步长
         */
        private int step = DEFAULT_STEP;
     
        /**
         * 序列所在的表名
         */
        private String tableName = DEFAULT_TABLE_NAME;
     
        /**
         * 存储序列名称的列名
         */
        private String nameColumnName = DEFAULT_NAME_COLUMN_NAME;
     
        /**
         * 存储序列值的列名
         */
        private String valueColumnName = DEFAULT_VALUE_COLUMN_NAME;
     
        /**
         * 存储序列最后更新时间的列名
         */
        private String gmtModifiedColumnName = DEFAULT_GMT_MODIFIED_COLUMN_NAME;
    View Code

    接下来看一下nextRange方法:取得下一个可用的序列区间:

    public SequenceRange nextRange(String name) throws SequenceException {
            if (name == null) {
                throw new IllegalArgumentException("序列名称不能为空");
            }
     
            long oldValue;
            long newValue;
     
            Connection conn = null;
            PreparedStatement stmt = null;
            ResultSet rs = null;
     
            for (int i = 0; i < retryTimes + 1; ++i) {
                try {
                    conn = dataSource.getConnection();
                    stmt = conn.prepareStatement(getSelectSql());
                    stmt.setString(1, name);
                    rs = stmt.executeQuery();
                    rs.next();
                    oldValue = rs.getLong(1);
     
                    if (oldValue < 0) {
                        StringBuilder message = new StringBuilder();
                        message.append("Sequence value cannot be less than zero, value = ").append(oldValue);
                        message.append(", please check table ").append(getTableName());
     
                        throw new SequenceException(message.toString());
                    }
     
                    if (oldValue > Long.MAX_VALUE - DELTA) {
                        StringBuilder message = new StringBuilder();
                        message.append("Sequence value overflow, value = ").append(oldValue);
                        message.append(", please check table ").append(getTableName());
     
                        throw new SequenceException(message.toString());
                    }
     
                    newValue = oldValue + getStep();
                } catch (SQLException e) {
                    throw new SequenceException(e);
                } finally {
                    closeResultSet(rs);
                    rs = null;
                    closeStatement(stmt);
                    stmt = null;
                    closeConnection(conn);
                    conn = null;
                }
     
                try {
                    conn = dataSource.getConnection();
                    stmt = conn.prepareStatement(getUpdateSql());
                    stmt.setLong(1, newValue);
                    stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
                    stmt.setString(3, name);
                    stmt.setLong(4, oldValue);
                    int affectedRows = stmt.executeUpdate();
                    if (affectedRows == 0) {
                        // retry
                        continue;
                    }
     
                    return new SequenceRange(oldValue + 1, newValue);
                } catch (SQLException e) {
                    throw new SequenceException(e);
                } finally {
                    closeStatement(stmt);
                    stmt = null;
                    closeConnection(conn);
                    conn = null;
                }
            }
     
            throw new SequenceException("Retried too many times, retryTimes = " + retryTimes);
        }
    View Code

    通过getSelectSql查询最新的value值,然后加上步点,通过getUpdateSql更新到数据库中

    private String getSelectSql() {
            if (selectSql == null) {
                synchronized (this) {
                    if (selectSql == null) {
                        StringBuilder buffer = new StringBuilder();
                        buffer.append("select ").append(getValueColumnName());
                        buffer.append(" from ").append(getTableName());
                        buffer.append(" where ").append(getNameColumnName()).append(" = ?");
     
                        selectSql = buffer.toString();
                    }
                }
            }
     
            return selectSql;
        }
     
        private String getUpdateSql() {
            if (updateSql == null) {
                synchronized (this) {
                    if (updateSql == null) {
                        StringBuilder buffer = new StringBuilder();
                        buffer.append("update ").append(getTableName());
                        buffer.append(" set ").append(getValueColumnName()).append(" = ?, ");
                        buffer.append(getGmtModifiedColumnName()).append(" = ? where ");
                        buffer.append(getNameColumnName()).append(" = ? and ");
                        buffer.append(getValueColumnName()).append(" = ?");
     
                        updateSql = buffer.toString();
                    }
                }
            }
     
            return updateSql;
        }
    View Code

    有一个特殊需要说明的,在update语句中,where需要把之前的value当成条件传入。实现了类型version的乐观锁操作。如果同一个时间AB二台机器同时请求获取到相同的value,进行update操作只有可能一条成功。失败的会按retryTimes进行重试。

    接下来看DefaultSequence,比较简单,就不说明了

    public class DefaultSequence implements Sequence {
        private final Lock lock = new ReentrantLock();
     
        private SequenceDao sequenceDao;
     
        /**
         * 序列名称
         */
        private String name;
     
        private volatile SequenceRange currentRange;
     
        public long nextValue() throws SequenceException {
            if (currentRange == null) {
                lock.lock();
                try {
                    if (currentRange == null) {
                        currentRange = sequenceDao.nextRange(name);
                    }
                } finally {
                    lock.unlock();
                }
            }
     
            long value = currentRange.getAndIncrement();
            if (value == -1) {
                lock.lock();
                try {
                    for (;;) {
                        if (currentRange.isOver()) {
                            currentRange = sequenceDao.nextRange(name);
                        }
     
                        value = currentRange.getAndIncrement();
                        if (value == -1) {
                            continue;
                        }
     
                        break;
                    }
                } finally {
                    lock.unlock();
                }
            }
     
            if (value < 0) {
                throw new SequenceException("Sequence value overflow, value = " + value);
            }
     
            return value;
        }
     
        public SequenceDao getSequenceDao() {
            return sequenceDao;
        }
     
        public void setSequenceDao(SequenceDao sequenceDao) {
            this.sequenceDao = sequenceDao;
        }
     
        public String getName() {
            return name;
        }
     
        public void setName(String name) {
            this.name = name;
        }
    }
    View Code
  • 相关阅读:
    关于c#中的委托和事件
    Unity3d中默认函数调用顺序(MonoBehaviour)
    u3d 摄像机详解
    u3d中的坐标系
    u3d中的向量 vector3 vector2
    u3d中的INput
    C#构造函数
    解析C#中[],List,Array,ArrayList的区别及应用
    Mybatis(七) mybatis的逆向工程的配置详解
    Mybatis(六) Spring整合mybatis
  • 原文地址:https://www.cnblogs.com/fanguangdexiaoyuer/p/10946666.html
Copyright © 2011-2022 走看看