zoukankan      html  css  js  c++  java
  • sharding-jdbc分库分表

      对于分片数据库的主键一般通过自己生产主键避免用数据库自带的自增主键。比如用redis生产自增主键、mysql用一个IdManager生产自增主键(每次从表中取一个主键)。

    1.简介

    1.1分片键

      用于分片的数据库字段,是将数据库(表)水平拆分的关键字段。例:将订单表中的订单主键的尾数取模分片,则订单主键为分片字段。 SQL中如果无分片字段,将执行全路由,性能较差。 除了对单分片字段的支持,ShardingSphere也支持根据多个字段进行分片。

    1.2 分片算法

    通过分片算法将数据分片,支持通过=、>=、<=、>、<、BETWEEN和IN分片。分片算法需要应用方开发者自行实现,可实现的灵活度非常高。

    目前提供4种分片算法

    (1)精确分片算法

      对应PreciseShardingAlgorithm,用于处理使用单一键作为分片键的=与IN进行分片的场景。需要配合StandardShardingStrategy使用。

    (2)范围分片算法

      对应RangeShardingAlgorithm,用于处理使用单一键作为分片键的BETWEEN AND、>、<、>=、<=进行分片的场景。需要配合StandardShardingStrategy使用。

    (3)复合分片算法

      对应ComplexKeysShardingAlgorithm,用于处理使用多键作为分片键进行分片的场景,包含多个分片键的逻辑较复杂,需要应用开发者自行处理其中的复杂度。需要配合ComplexShardingStrategy使用

    (4)Hint分片算法

      对应HintShardingAlgorithm,用于处理使用Hint行分片的场景。需要配合HintShardingStrategy使用。

    1.3 分片策略

    包含分片键和分片算法,由于分片算法的独立性,将其独立抽离。真正可用于分片操作的是分片键 + 分片算法,也就是分片策略。目前提供5种分片策略。

    (1)标准分片策略

      对应StandardShardingStrategy。提供对SQL语句中的=, >, <, >=, <=, IN和BETWEEN AND的分片操作支持。StandardShardingStrategy只支持单分片键,提供PreciseShardingAlgorithm和RangeShardingAlgorithm两个分片算法。PreciseShardingAlgorithm是必选的,用于处理=和IN的分片。RangeShardingAlgorithm是可选的,用于处理BETWEEN AND, >, <, >=, <=分片,如果不配置RangeShardingAlgorithm,SQL中的BETWEEN AND将按照全库路由处理。

    (2)复合分片策略

      对应ComplexShardingStrategy。复合分片策略。提供对SQL语句中的=, >, <, >=, <=, IN和BETWEEN AND的分片操作支持。ComplexShardingStrategy支持多分片键,由于多分片键之间的关系复杂,因此并未进行过多的封装,而是直接将分片键值组合以及分片操作符透传至分片算法,完全由应用开发者实现,提供最大的灵活度。

    (3)行表达式分片策略

      对应InlineShardingStrategy。使用Groovy的表达式,提供对SQL语句中的=和IN的分片操作支持,只支持单分片键。对于简单的分片算法,可以通过简单的配置使用,从而避免繁琐的Java代码开发,如: t_user_$->{u_id % 8} 表示t_user表根据u_id模8,而分成8张表,表名称为t_user_0到t_user_7。

    (4)Hint分片策略

      对应HintShardingStrategy。通过Hint指定分片值而非从SQL中提取分片值的方式进行分片的策略。

    (5)不分片策略

      对应NoneShardingStrategy。不分片的策略。

    2. SQL建库建表语句如下:

    DROP SCHEMA IF EXISTS demo_ds_0;
    DROP SCHEMA IF EXISTS demo_ds_1;
    
    CREATE SCHEMA IF NOT EXISTS demo_ds_0;
    CREATE SCHEMA IF NOT EXISTS demo_ds_1;
    
    
    CREATE TABLE IF NOT EXISTS demo_ds_0.t_order_0 (order_id BIGINT NOT NULL, user_id INT NOT NULL, STATUS VARCHAR(50), PRIMARY KEY (order_id));
    CREATE TABLE IF NOT EXISTS demo_ds_0.t_order_1 (order_id BIGINT NOT NULL, user_id INT NOT NULL, STATUS VARCHAR(50), PRIMARY KEY (order_id));
    CREATE TABLE IF NOT EXISTS demo_ds_1.t_order_0 (order_id BIGINT NOT NULL, user_id INT NOT NULL, STATUS VARCHAR(50), PRIMARY KEY (order_id));
    CREATE TABLE IF NOT EXISTS demo_ds_1.t_order_1 (order_id BIGINT NOT NULL, user_id INT NOT NULL, STATUS VARCHAR(50), PRIMARY KEY (order_id));

    3. 不使用Spring的方式

    1.pom配置

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>cn.qlq</groupId>
        <artifactId>shared</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.shardingsphere/sharding-jdbc-core -->
            <dependency>
                <groupId>org.apache.shardingsphere</groupId>
                <artifactId>sharding-jdbc-core</artifactId>
                <version>4.0.0</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-dbcp2</artifactId>
                <version>2.1.1</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.6</version>
            </dependency>
    
            <!-- commons工具包 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.4</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <!-- 配置了很多插件 -->
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.7</source>
                        <target>1.7</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <!-- spring热部署 -->
                    <!-- <dependencies> <dependency> <groupId>org.springframework</groupId> 
                        <artifactId>springloaded</artifactId> <version>1.2.6.RELEASE</version> </dependency> 
                        </dependencies> -->
                </plugin>
            </plugins>
        </build>
    </project>

    2. 基于java的配置(使用行表达式分片策略简单测试)

    package shared;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    import javax.sql.DataSource;
    
    import org.apache.commons.dbcp2.BasicDataSource;
    import org.apache.commons.lang3.RandomUtils;
    import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
    import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
    import org.apache.shardingsphere.api.config.sharding.strategy.InlineShardingStrategyConfiguration;
    import org.apache.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory;
    
    public class DataSourceUtils {
    public static DataSource getDataSource() throws SQLException {
            // 配置真实数据源
            Map<String, DataSource> dataSourceMap = new HashMap<>();
    
            // 配置第一个数据源
            BasicDataSource dataSource1 = new BasicDataSource();
            dataSource1.setDriverClassName("com.mysql.jdbc.Driver");
            dataSource1.setUrl("jdbc:mysql://localhost:3306/demo_ds_0");
            dataSource1.setUsername("root");
            dataSource1.setPassword("123456");
            dataSourceMap.put("demo_ds_0", dataSource1);
    
            // 配置第二个数据源
            BasicDataSource dataSource2 = new BasicDataSource();
            dataSource2.setDriverClassName("com.mysql.jdbc.Driver");
            dataSource2.setUrl("jdbc:mysql://localhost:3306/demo_ds_1");
            dataSource2.setUsername("root");
            dataSource2.setPassword("123456");
            dataSourceMap.put("demo_ds_1", dataSource2);
    
            // 配置Order表规则
            TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order",
                    "demo_ds_${0..1}.t_order_${0..1}");
    
            // 配置分库 + 分表策略
            orderTableRuleConfig.setDatabaseShardingStrategyConfig(
                    new InlineShardingStrategyConfiguration("user_id", "demo_ds_${user_id % 2}"));
            orderTableRuleConfig.setTableShardingStrategyConfig(
                    new InlineShardingStrategyConfiguration("order_id", "t_order_${order_id % 2}"));
    
            // 配置分片规则
            ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
            shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
    
            // 省略配置order_item表规则...
            // ...
    
            // 获取数据源对象
            DataSource dataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,
                    new Properties());
            return dataSource;
        }
    }

    上面规则:

    (1)根据user_id进行分库。偶数在demo_ds_0;奇数在demo_ds_1

    (2)根据order_id进行分表。偶数在t_order_0;奇数在t_order_1

    3.测试

    1.测试增加。(注意:ID要生成,不能用mysql的自增ID。会导致数据重复)。这里可以用基于redis生成唯一ID或者用IdManager从数据库生成ID。

        private static void testAdd() throws SQLException {
            Connection conn = getDataSource().getConnection();
            for (int i = 1; i < 21; i++) {
                int user_id = RandomUtils.nextInt(1, 60);
                String sql = "insert into t_order(order_id,user_id, status) values(" + i + "," + user_id + "," + "'状态" + i
                        + "'" + ") ";
                System.out.println(sql);
                PreparedStatement preparedStatement = conn.prepareStatement(sql);
                boolean execute = preparedStatement.execute();
            }
        }

    结果:

    (1)查询demo_ds_0.t_order_0

    SELECT * FROM demo_ds_0.t_order_0

     

     (2)查询demo_ds_0.t_order_1 

    SELECT * FROM demo_ds_0.t_order_1

     (3)查询demo_ds_1.t_order_0

    SELECT * FROM demo_ds_1.t_order_0

     

     (4)查询demo_ds_1.t_order_1

    SELECT * FROM demo_ds_1.t_order_1

    2.测试查询

        private static void testselect() throws SQLException {
            Connection conn = getDataSource().getConnection();
            String sql = "select * from t_order";
            PreparedStatement preparedStatement = conn.prepareStatement(sql);
            ResultSet executeQuery = preparedStatement.executeQuery();
            while (executeQuery.next()) {
                int order_id = executeQuery.getInt("order_id");
                int user_id = executeQuery.getInt("user_id");
                String status = executeQuery.getString("status");
                System.out.print("order_id: " + order_id);
                System.out.print("	user_id: " + user_id);
                System.out.println("	status: " + status);
            }
        }

    结果:

    order_id: 4user_id: 30status: 状态4

    order_id: 8user_id: 28status: 状态8

    order_id: 10user_id: 14status: 状态10

    order_id: 12user_id: 42status: 状态12

    order_id: 14user_id: 38status: 状态14

    order_id: 16user_id: 36status: 状态16

    order_id: 18user_id: 30status: 状态18

    order_id: 20user_id: 4status: 状态20

    order_id: 7user_id: 34status: 状态7

    order_id: 9user_id: 40status: 状态9

    order_id: 11user_id: 44status: 状态11

    order_id: 13user_id: 22status: 状态13

    order_id: 15user_id: 12status: 状态15

    order_id: 2user_id: 57status: 状态2

    order_id: 6user_id: 11status: 状态6

    order_id: 1user_id: 53status: 状态1

    order_id: 3user_id: 15status: 状态3

    order_id: 5user_id: 13status: 状态5

    order_id: 17user_id: 7status: 状态17

    order_id: 19user_id: 31status: 状态19

    注意:

    (1)上面的分片算法不支持范围查询,比如:

    select * from t_order where order_id < 15

    报错如下:

    Exception in thread "main" java.lang.IllegalStateException: Inline strategy cannot support range sharding.
        at com.google.common.base.Preconditions.checkState(Preconditions.java:173)

    (2)支持精确查询,比如:

    select * from t_order where order_id = 15

    结果:

    order_id: 15user_id: 12status: 状态15

    3. 使自定义分片策略

      自定义分库算法,这里实现一个最简单的分库算法。

    规则同上:

    1. 根据user_id进行分库。偶数在demo_ds_0;奇数在demo_ds_1

    2. 据order_id进行分表。偶数在t_order_0;奇数在t_order_1

    代码如下: 分库算法与分表算法都实现 ComplexKeysShardingAlgorithm 接口。

    分库算法:

    package shared;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.commons.collections4.CollectionUtils;
    import org.apache.commons.collections4.IteratorUtils;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;
    
    public class MyDBComplexShardingStrategy implements ComplexKeysShardingAlgorithm<Comparable<?>> {
    
        @Override
        public Collection<String> doSharding(Collection<String> availableTargetNames,
                ComplexKeysShardingValue<Comparable<?>> shardingValue) {
    
            System.out.println("=====MyDBComplexShardingStrategy=====");
            System.out.println(availableTargetNames);
            System.out.println(shardingValue);
    
            if (CollectionUtils.isEmpty(availableTargetNames)) {
                throw new RuntimeException("可用数据库为空");
            }
    
            List<String> result = new ArrayList<>();
    
            Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = shardingValue
                    .getColumnNameAndShardingValuesMap();
            Set<String> keySet = columnNameAndShardingValuesMap.keySet();
            for (String key : keySet) {
                if (!"user_id".equals(key)) {
                    continue;
                }
    
                Collection<Comparable<?>> collection = columnNameAndShardingValuesMap.get(key);
                Iterator<Comparable<?>> iterator = collection.iterator();
                while (iterator.hasNext()) {
                    Integer next = (Integer) iterator.next();
                    Integer index = next % 2;
                    String availableTargetName = IteratorUtils.get(availableTargetNames.iterator(), index);
                    result.add(availableTargetName);
                }
            }
    
            System.out.println(result);
    
            return result;
        }
    
    }

    doSharding方法解释:

     入参:

    availableTargetNames:  可用的数据库集合。 [demo_ds_0, demo_ds_1]

     shardingValue: 是指分库的字段以及值集合。 ComplexKeysShardingValue(logicTableName=t_order, columnNameAndShardingValuesMap={user_id=[25]}, columnNameAndRangeValuesMap={})

     返回值:

    满足条件的数据库。返回集合表示同时向多个库中插入。

    分表算法:

    package shared;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.commons.collections4.CollectionUtils;
    import org.apache.commons.collections4.IteratorUtils;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;
    
    public class MyTableComplexShardingStrategy implements ComplexKeysShardingAlgorithm<Comparable<?>> {
    
        @Override
        public Collection<String> doSharding(Collection<String> availableTargetNames,
                ComplexKeysShardingValue<Comparable<?>> shardingValue) {
    
            System.out.println("=====MyTableComplexShardingStrategy=====");
            System.out.println(availableTargetNames);
            System.out.println(shardingValue);
    
            if (CollectionUtils.isEmpty(availableTargetNames)) {
                throw new RuntimeException("可用数据表为空");
            }
    
            List<String> result = new ArrayList<>();
    
            Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = shardingValue
                    .getColumnNameAndShardingValuesMap();
            Set<String> keySet = columnNameAndShardingValuesMap.keySet();
            for (String key : keySet) {
                if (!"order_id".equals(key)) {
                    continue;
                }
    
                Collection<Comparable<?>> collection = columnNameAndShardingValuesMap.get(key);
                Iterator<Comparable<?>> iterator = collection.iterator();
                while (iterator.hasNext()) {
                    Integer next = (Integer) iterator.next();
                    Integer index = next % 2;
                    String availableTargetName = IteratorUtils.get(availableTargetNames.iterator(), index);
                    result.add(availableTargetName);
                }
            }
    
            System.out.println(result);
    
            return result;
        }
    
    }

    doSharding方法解释:

     入参:

    availableTargetNames:  可用的数据表集合。 [t_order_0, t_order_1]

     shardingValue: 是指分表的字段以及值集合。 ComplexKeysShardingValue(logicTableName=t_order, columnNameAndShardingValuesMap={order_id=[1]}, columnNameAndRangeValuesMap={})

     返回值:

      满足条件的数据表。返回集合表示同时向多个表中插入。

    获取数据源代码以及测试代码:

    package shared;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    import javax.sql.DataSource;
    
    import org.apache.commons.dbcp2.BasicDataSource;
    import org.apache.commons.lang3.RandomUtils;
    import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
    import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
    import org.apache.shardingsphere.api.config.sharding.strategy.ComplexShardingStrategyConfiguration;
    import org.apache.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory;
    
    public class DataSourceUtils {
    
        public static void main(String[] args) throws SQLException {
            testAdd();
        }
    
        private static void testAdd() throws SQLException {
            Connection conn = getDataSource().getConnection();
            for (int i = 1; i < 21; i++) {
                int user_id = RandomUtils.nextInt(1, 60);
                String sql = "insert into t_order(order_id,user_id, status) values(" + i + "," + user_id + "," + "'状态" + i
                        + "'" + ") ";
                System.out.println(sql);
                PreparedStatement preparedStatement = conn.prepareStatement(sql);
                boolean execute = preparedStatement.execute();
            }
        }
    
        public static DataSource getDataSource() throws SQLException {
            // 配置真实数据源
            Map<String, DataSource> dataSourceMap = new HashMap<>();
    
            // 配置第一个数据源
            BasicDataSource dataSource1 = new BasicDataSource();
            dataSource1.setDriverClassName("com.mysql.jdbc.Driver");
            dataSource1.setUrl("jdbc:mysql://localhost:3306/demo_ds_0");
            dataSource1.setUsername("root");
            dataSource1.setPassword("123456");
            dataSourceMap.put("demo_ds_0", dataSource1);
    
            // 配置第二个数据源
            BasicDataSource dataSource2 = new BasicDataSource();
            dataSource2.setDriverClassName("com.mysql.jdbc.Driver");
            dataSource2.setUrl("jdbc:mysql://localhost:3306/demo_ds_1");
            dataSource2.setUsername("root");
            dataSource2.setPassword("123456");
            dataSourceMap.put("demo_ds_1", dataSource2);
    
            // 配置Order表规则(t_order 是逻辑表名,
            // demo_ds_${0..1}.t_order_${0..1}是实际的节点数(groovy表达式))
            TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order",
                    "demo_ds_${0..1}.t_order_${0..1}");
    
            // 配置分库 + 分表策略
            orderTableRuleConfig.setDatabaseShardingStrategyConfig(
                    new ComplexShardingStrategyConfiguration("user_id", new MyDBComplexShardingStrategy()));
            orderTableRuleConfig.setTableShardingStrategyConfig(
                    new ComplexShardingStrategyConfiguration("order_id", new MyTableComplexShardingStrategy()));
    
            // 配置分片规则
            ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
            shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
    
            // 获取数据源对象
            DataSource dataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,
                    new Properties());
            return dataSource;
        }
    }

    解释:

    new TableRuleConfiguration("t_order",  ${0..1}.t_order_${0..1}")

       t_order 是逻辑表名、demo_ds_${0..1}.t_order_${0..1} 是实际的节点数(一个groovy范围表达式)。而且对应的数据库以及数据表必须存在。会进行验证。

    orderTableRuleConfig.setDatabaseShardingStrategyConfig(new ComplexShardingStrategyConfiguration("user_id, order_id", new MyDBComplexShardingStrategy()));

      如上面代码可以设置多个分库或者分表的字段。多个用逗号分隔即可。后面doSharding 获取到的就是map中的两对值,如下:

    ComplexKeysShardingValue(logicTableName=t_order, columnNameAndShardingValuesMap={user_id=[45], order_id=[1]}, columnNameAndRangeValuesMap={})

    4. 结合SpringBoot在spring中使用

    1.pom中引入

            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-dbcp2</artifactId>
            </dependency>
            <!-- sharding.jdbc for spring boot -->
            <dependency>
                <groupId>org.apache.shardingsphere</groupId>
                <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
                <version>4.0.0</version>
            </dependency>

    2. application.properties中配置

    ###############S shardingsphere setting####
    spring.shardingsphere.datasource.names=demo_ds_0,demo_ds_1
    
    spring.shardingsphere.datasource.demo_ds_0.type=org.apache.commons.dbcp2.BasicDataSource
    spring.shardingsphere.datasource.demo_ds_0.driver-class-name=com.mysql.jdbc.Driver
    spring.shardingsphere.datasource.demo_ds_0.url=jdbc:mysql://localhost:3306/demo_ds_0
    spring.shardingsphere.datasource.demo_ds_0.username=root
    spring.shardingsphere.datasource.demo_ds_0.password=123456
    
    spring.shardingsphere.datasource.demo_ds_1.type=org.apache.commons.dbcp2.BasicDataSource
    spring.shardingsphere.datasource.demo_ds_1.driver-class-name=com.mysql.jdbc.Driver
    spring.shardingsphere.datasource.demo_ds_1.url=jdbc:mysql://localhost:3306/demo_ds_1
    spring.shardingsphere.datasource.demo_ds_1.username=root
    spring.shardingsphere.datasource.demo_ds_1.password=123456
    
    spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
    spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=demo_ds_$->{user_id % 2}
    
    spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=demo_ds_$->{0..1}.t_order_$->{0..1}
    spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_id
    spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expression=t_order_$->{order_id % 2}
    ###############E shardingsphere setting####

    3.Junit 测试

    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    
    import javax.annotation.Resource;
    import javax.sql.DataSource;
    
    import org.apache.commons.lang3.RandomUtils;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import cn.qlq.MySpringBootApplication;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = MySpringBootApplication.class)
    public class PlainTest {
    
        @Resource
        private DataSource dataSource;
    
        @Test
        public void testAdd() throws SQLException {
            Connection conn = dataSource.getConnection();
            for (int i = 1; i < 21; i++) {
                int user_id = RandomUtils.nextInt(1, 60);
                String sql = "insert into t_order(order_id,user_id, status) values(" + i + "," + user_id + "," + "'状态" + i
                        + "'" + ") ";
                System.out.println(sql);
                PreparedStatement preparedStatement = conn.prepareStatement(sql);
                boolean execute = preparedStatement.execute();
            }
        }
    }

    结果同上面不使用springboot结果一样。

    补充:shardingjdbc可以设置默认的数据源,也就是不想分库分表的使用默认的数据源。

    例如:

      shardingRuleConfig.setDefaultDataSourceName("defaultDataSource") 是设置默认的数据源,不分片的表都采用默认的数据源。

    package cn.qlq.sharedjdbc.config;
    
    import java.sql.SQLException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    import javax.sql.DataSource;
    
    import org.apache.commons.dbcp2.BasicDataSource;
    import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
    import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
    import org.apache.shardingsphere.api.config.sharding.strategy.ComplexShardingStrategyConfiguration;
    import org.apache.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DataSourceConfig {
    
        @Bean
        public static DataSource dataSource() throws SQLException {
            // 配置真实数据源
            Map<String, DataSource> dataSourceMap = new HashMap<>();
    
            // 配置第一个数据源
            BasicDataSource dataSource0 = createDataSource("jdbc:mysql://localhost:3306/demo_ds_0", "root", "123456");
            dataSourceMap.put("demo_ds_0", dataSource0);
    
            // 配置第二个数据源
            BasicDataSource dataSource1 = createDataSource("jdbc:mysql://localhost:3306/demo_ds_1", "root", "123456");
            dataSourceMap.put("demo_ds_1", dataSource1);
    
            // 配置默认数据源
            BasicDataSource defaultDataSource = createDataSource("jdbc:mysql://localhost:3306/test1", "root", "123456");
            dataSourceMap.put("defaultDataSource", defaultDataSource);
    
            // 配置Order表规则(t_order 是逻辑表名,
            // demo_ds_${0..1}.t_order_${0..1}是实际的节点数(groovy表达式))
            TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order",
                    "demo_ds_${0..1}.t_order_${0..1}");
            // 配置分库 + 分表策略
            orderTableRuleConfig.setDatabaseShardingStrategyConfig(
                    new ComplexShardingStrategyConfiguration("user_id", new MyDBComplexShardingStrategy()));
            orderTableRuleConfig.setTableShardingStrategyConfig(
                    new ComplexShardingStrategyConfiguration("order_id", new MyTableComplexShardingStrategy()));
    
            // 配置分片规则
            ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
            // shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
            // 设置默认数据源
            shardingRuleConfig.setDefaultDataSourceName("defaultDataSource");
            // 设置不分片的表
            // shardingRuleConfig.setBindingTableGroups(Arrays.asList("country"));
    
            // 获取数据源对象
            DataSource createDataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,
                    new Properties());
            return createDataSource;
        }
    
        private static BasicDataSource createDataSource(String url, String username, String password) {
            BasicDataSource defaultDataSource = new BasicDataSource();
            defaultDataSource.setDriverClassName("com.mysql.jdbc.Driver");
            defaultDataSource.setUrl(url);
            defaultDataSource.setUsername(username);
            defaultDataSource.setPassword(password);
            return defaultDataSource;
        }
    }

    5. mybatis使用

    1.到demo_ds_0和demo_ds_1库中创建表

    CREATE TABLE t_order_0 (id BIGINT(20) NOT NULL, description VARCHAR(255), order_id BIGINT(20), user_id BIGINT(20), PRIMARY KEY (id)); 
    CREATE TABLE t_order_1 (id BIGINT(20) NOT NULL, description VARCHAR(255), order_id BIGINT(20), user_id BIGINT(20), PRIMARY KEY (id));  

    对应的bean如下:

    package cn.qlq.sharedjdbc.client;
    
    import javax.persistence.Id;
    
    import com.baomidou.mybatisplus.annotation.TableName;
    
    import lombok.Data;
    
    @Data
    // 对应逻辑表名称
    @TableName("t_order")
    public class TOrder {
    
        @Id
        private Long id;
    
        private Long order_id;
    
        private Long user_id;
    
        private String description;
    
    }

    2. pom引入配置

            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-dbcp2</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.shardingsphere</groupId>
                <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
                <version>4.0.0</version>
            </dependency>

    3.java配置设置分片规则以及默认数据源

    package cn.qlq.sharedjdbc.config;
    
    import java.sql.SQLException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    import javax.sql.DataSource;
    
    import org.apache.commons.dbcp2.BasicDataSource;
    import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
    import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
    import org.apache.shardingsphere.api.config.sharding.strategy.ComplexShardingStrategyConfiguration;
    import org.apache.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DataSourceConfig {
    
        @Bean
        public static DataSource dataSource() throws SQLException {
            // 配置真实数据源
            Map<String, DataSource> dataSourceMap = new HashMap<>();
    
            // 配置第一个数据源
            BasicDataSource dataSource0 = createDataSource("jdbc:mysql://localhost:3306/demo_ds_0", "root", "123456");
            dataSourceMap.put("demo_ds_0", dataSource0);
    
            // 配置第二个数据源
            BasicDataSource dataSource1 = createDataSource("jdbc:mysql://localhost:3306/demo_ds_1", "root", "123456");
            dataSourceMap.put("demo_ds_1", dataSource1);
    
            // 配置默认数据源
            BasicDataSource defaultDataSource = createDataSource("jdbc:mysql://localhost:3306/test1", "root", "123456");
            dataSourceMap.put("defaultDataSource", defaultDataSource);
    
            // 配置Order表规则(t_order 是逻辑表名,
            // demo_ds_${0..1}.t_order_${0..1}是实际的节点数(groovy表达式))
            TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order",
                    "demo_ds_${0..1}.t_order_${0..1}");
            // 配置分库 + 分表策略
            orderTableRuleConfig.setDatabaseShardingStrategyConfig(
                    new ComplexShardingStrategyConfiguration("user_id", new MyDBComplexShardingStrategy()));
            orderTableRuleConfig.setTableShardingStrategyConfig(
                    new ComplexShardingStrategyConfiguration("order_id", new MyTableComplexShardingStrategy()));
    
            // 配置分片规则
            ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
            shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
            // 设置默认数据源
            shardingRuleConfig.setDefaultDataSourceName("defaultDataSource");
            // 设置不分片的表
            shardingRuleConfig.setBindingTableGroups(Arrays.asList("country"));
    
            // 获取数据源对象
            DataSource createDataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,
                    new Properties());
            return createDataSource;
        }
    
        private static BasicDataSource createDataSource(String url, String username, String password) {
            BasicDataSource defaultDataSource = new BasicDataSource();
            defaultDataSource.setDriverClassName("com.mysql.jdbc.Driver");
            defaultDataSource.setUrl(url);
            defaultDataSource.setUsername(username);
            defaultDataSource.setPassword(password);
            return defaultDataSource;
        }
    }

    分库规则如下:

    package cn.qlq.sharedjdbc.config;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.commons.collections4.CollectionUtils;
    import org.apache.commons.collections4.IteratorUtils;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;
    
    public class MyDBComplexShardingStrategy implements ComplexKeysShardingAlgorithm<Comparable<?>> {
    
        @Override
        public Collection<String> doSharding(Collection<String> availableTargetNames,
                ComplexKeysShardingValue<Comparable<?>> shardingValue) {
    
            System.out.println("=====MyDBComplexShardingStrategy=====");
            System.out.println(availableTargetNames);
            System.out.println(shardingValue);
    
            if (CollectionUtils.isEmpty(availableTargetNames)) {
                throw new RuntimeException("可用数据库为空");
            }
    
            List<String> result = new ArrayList<>();
    
            Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = shardingValue
                    .getColumnNameAndShardingValuesMap();
            Set<String> keySet = columnNameAndShardingValuesMap.keySet();
            for (String key : keySet) {
                if (!"user_id".equals(key)) {
                    continue;
                }
    
                Collection<Comparable<?>> collection = columnNameAndShardingValuesMap.get(key);
                Iterator<Comparable<?>> iterator = collection.iterator();
                while (iterator.hasNext()) {
                    Integer next = Integer.valueOf(iterator.next().toString());
                    Integer index = next % 2;
                    String availableTargetName = IteratorUtils.get(availableTargetNames.iterator(), index);
                    result.add(availableTargetName);
                }
            }
    
            System.out.println(result);
    
            return result;
        }
    
    }

    分表规则如下:

    package cn.qlq.sharedjdbc.config;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.commons.collections4.CollectionUtils;
    import org.apache.commons.collections4.IteratorUtils;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;
    
    public class MyTableComplexShardingStrategy implements ComplexKeysShardingAlgorithm<Comparable<?>> {
    
        @Override
        public Collection<String> doSharding(Collection<String> availableTargetNames,
                ComplexKeysShardingValue<Comparable<?>> shardingValue) {
    
            System.out.println("=====MyTableComplexShardingStrategy=====");
            System.out.println(availableTargetNames);
            System.out.println(shardingValue);
    
            if (CollectionUtils.isEmpty(availableTargetNames)) {
                throw new RuntimeException("可用数据表为空");
            }
    
            List<String> result = new ArrayList<>();
    
            Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = shardingValue
                    .getColumnNameAndShardingValuesMap();
            Set<String> keySet = columnNameAndShardingValuesMap.keySet();
            for (String key : keySet) {
                if (!"order_id".equals(key)) {
                    continue;
                }
    
                Collection<Comparable<?>> collection = columnNameAndShardingValuesMap.get(key);
                Iterator<Comparable<?>> iterator = collection.iterator();
                while (iterator.hasNext()) {
                    Integer next = Integer.valueOf(iterator.next().toString());
                    Integer index = next % 2;
                    String availableTargetName = IteratorUtils.get(availableTargetNames.iterator(), index);
                    result.add(availableTargetName);
                }
            }
    
            System.out.println(result);
    
            return result;
        }
    
    }

     4.测试

    (1)mapper

    package cn.qlq.sharedjdbc.client;
    
    import org.apache.ibatis.annotations.Mapper;
    
    import com.baomidou.mybatisplus.core.mapper.BaseMapper;
    
    @Mapper
    public interface TOrderMapper extends BaseMapper<TOrder> {
    
    }

    (2)controller

    package cn.qlq.sharedjdbc.client;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.lang3.RandomUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import cn.qlq.utils.SnowflakeIdWorker;
    
    @RestController
    @RequestMapping("tOrder")
    public class TOrderController {
    
        private Map<String, Object> result = new HashMap<>();
    
        @Autowired
        private TOrderMapper tOrderMapper;
    
        @RequestMapping("testAdd")
        public Map<String, Object> testAdd() {
    
            for (int i = 0; i < 20; i++) {
                TOrder tOrder = new TOrder();
                tOrder.setId(new SnowflakeIdWorker(0, 0).nextId());
                tOrder.setOrder_id(RandomUtils.nextLong(0, 5000000));
                tOrder.setUser_id(RandomUtils.nextLong(0, 5000000));
                tOrder.setDescription("description " + i);
                tOrderMapper.insert(tOrder);
            }
    
            result.put("success", true);
            return result;
        }
    
        @RequestMapping("list")
        public Map<String, Object> list() {
            List<TOrder> selectList = tOrderMapper.selectList(null);
    
            result.put("data", selectList);
            return result;
        }
    
    }

    (3)雪华算法生成唯一ID

    package cn.qlq.utils;
    
    /**
     * Twitter_Snowflake<br>
     * SnowFlake的结构如下(每部分用-分开):<br>
     * 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 -
     * 000000000000 <br>
     * 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0<br>
     * 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截)
     * 得到的值),这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。
     * 41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>
     * 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId<br>
     * 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号<br>
     * 加起来刚好64位,为一个Long型。<br>
     * SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,
     * 经测试,SnowFlake每秒能够产生26万ID左右。
     */
    public class SnowflakeIdWorker {
    
        // ==============================Fields===========================================
        /** 开始时间截 (2015-01-01) */
        private final long twepoch = 1420041600000L;
    
        /** 机器id所占的位数 */
        private final long workerIdBits = 5L;
    
        /** 数据标识id所占的位数 */
        private final long datacenterIdBits = 5L;
    
        /** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
        private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    
        /** 支持的最大数据标识id,结果是31 */
        private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    
        /** 序列在id中占的位数 */
        private final long sequenceBits = 12L;
    
        /** 机器ID向左移12位 */
        private final long workerIdShift = sequenceBits;
    
        /** 数据标识id向左移17位(12+5) */
        private final long datacenterIdShift = sequenceBits + workerIdBits;
    
        /** 时间截向左移22位(5+5+12) */
        private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    
        /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */
        private final long sequenceMask = -1L ^ (-1L << sequenceBits);
    
        /** 工作机器ID(0~31) */
        private long workerId;
    
        /** 数据中心ID(0~31) */
        private long datacenterId;
    
        /** 毫秒内序列(0~4095) */
        private long sequence = 0L;
    
        /** 上次生成ID的时间截 */
        private long lastTimestamp = -1L;
    
        // ==============================Constructors=====================================
        /**
         * 构造函数
         * 
         * @param workerId
         *            工作ID (0~31)
         * @param datacenterId
         *            数据中心ID (0~31)
         */
        public SnowflakeIdWorker(long workerId, long datacenterId) {
            if (workerId > maxWorkerId || workerId < 0) {
                throw new IllegalArgumentException(
                        String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
            }
            if (datacenterId > maxDatacenterId || datacenterId < 0) {
                throw new IllegalArgumentException(
                        String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
            }
            this.workerId = workerId;
            this.datacenterId = datacenterId;
        }
    
        // ==============================Methods==========================================
        /**
         * 获得下一个ID (该方法是线程安全的)
         * 
         * @return SnowflakeId
         */
        public synchronized long nextId() {
            long timestamp = timeGen();
    
            // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
            if (timestamp < lastTimestamp) {
                throw new RuntimeException(String.format(
                        "Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
            }
    
            // 如果是同一时间生成的,则进行毫秒内序列
            if (lastTimestamp == timestamp) {
                sequence = (sequence + 1) & sequenceMask;
                // 毫秒内序列溢出
                if (sequence == 0) {
                    // 阻塞到下一个毫秒,获得新的时间戳
                    timestamp = tilNextMillis(lastTimestamp);
                }
            }
            // 时间戳改变,毫秒内序列重置
            else {
                sequence = 0L;
            }
    
            // 上次生成ID的时间截
            lastTimestamp = timestamp;
    
            // 移位并通过或运算拼到一起组成64位的ID
            return ((timestamp - twepoch) << timestampLeftShift) //
                    | (datacenterId << datacenterIdShift) //
                    | (workerId << workerIdShift) //
                    | sequence;
        }
    
        /**
         * 阻塞到下一个毫秒,直到获得新的时间戳
         * 
         * @param lastTimestamp
         *            上次生成ID的时间截
         * @return 当前时间戳
         */
        protected long tilNextMillis(long lastTimestamp) {
            long timestamp = timeGen();
            while (timestamp <= lastTimestamp) {
                timestamp = timeGen();
            }
            return timestamp;
        }
    
        /**
         * 返回以毫秒为单位的当前时间
         * 
         * @return 当前时间(毫秒)
         */
        protected long timeGen() {
            return System.currentTimeMillis();
        }
    
        // ==============================Test=============================================
        /** 测试 */
        public static void main(String[] args) {
            SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
            for (int i = 0; i < 1000; i++) {
                long id = idWorker.nextId();
                System.out.println(Long.toBinaryString(id));
                System.out.println(id);
            }
        }
    }

    访问一次add后查看数据库表:

    (1)查看demo_ds_0.t_order_0。分片规则是user_id为偶数、order_id为偶数

    SELECT * FROM demo_ds_0.t_order_0;

    结果 

     

     (2)查看demo_ds_0.t_order_1。分片规则是user_id为偶数、order_id为奇数

    select * from demo_ds_0.t_order_1;

    结果:

     (3)查看demo_ds_1.t_order_0。分片规则是user_id为奇数、order_id为偶数

    SELECT * FROM demo_ds_1.t_order_0;

    结果:

     (4)查看demo_ds_1.t_order_1。分片规则是user_id为奇数、order_id为奇数

      至此实现了mybatis结合shardingjdbc实现分库分表。可以实现对某些表进行分库分表、也可以对不需要分片的使用默认的数据源(shardingRuleConfig.setDefaultDataSourceName("defaultDataSource");)。

      结合Mybatis使用的时候,对分片的表应当使用其逻辑表名称。

  • 相关阅读:
    vue组件常用传值
    定时器的暂停,继续,刷新
    element-ui表格带复选框使用方法及默认选中方法
    git创建分支
    CAS协议原理与代码实现(单点登录 与 单点登出的流程)
    Java-技术专区-Files类和Paths类的用法
    Java-技术专区-Java8特性-parallelStream
    ELK日志系统环境搭建-6.x系列
    Spring系列-Spring Aop基本用法
    Spring-框架系列-SpringIOC容器初始化
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/13216615.html
Copyright © 2011-2022 走看看