zoukankan      html  css  js  c++  java
  • 【ClickHouse】5:clickhouse集群部署

    背景介绍:

    有三台CentOS7服务器安装了ClickHouse

    HostName IP 安装程序 程序端口
    centf8118.sharding1.db 192.168.81.18 clickhouse-server,clickhouse-client 9000
    centf8119.sharding2.db 192.168.81.19 clickhouse-server,clickhouse-client 9000
    centf8120.sharding3.db 192.168.81.20 clickhouse-server,clickhouse-client 9000

     

     

    clickhouse集群部署简要步骤:

    1. Install ClickHouse server on all machines of the cluster
    2. Set up cluster configs in configuration files
    3. Create local tables on each instance
    4. Create a Distributed table
    1. 在群集的所有计算机上安装ClickHouse服务器
    2. 在配置文件中设置群集配置
    3. 在每个实例上创建本地表
    4. 创建一个 分布式表

    分布式表 实际上是一种 “view” 到ClickHouse集群的本地表。从分布式表中选择查询使用集群所有分片的资源执行。您可以为多个集群指定configs,并创建多个分布式表,为不同的集群提供视图。

    第一步:在群集的所有计算机上安装ClickHouse服务器

    参考文章:【ClickHouse】1:clickhouse安装 (CentOS7)

    第二步:在配置文件中设置集群配置

    2.1: 先在/etc/clickhouse-server/config.xml中新增下面一段内容。不配置的话就默认就在/etc/metrika.xml目录,我这里调整到/etc/clickhouse-server/目录。

    2.2: 再配置/etc/clickhouse-server/metrika.xml文件,配置好后同步到其他两台机器。

    <yandex>
    <!-- 集群配置 -->
    <clickhouse_remote_servers>
        <!-- 3分片1备份 -->
        <cluster_3shards_1replicas>
            <!-- 数据分片1  -->
            <shard>
                <replica>
                    <host>centf8118.sharding1.db</host>
                    <port>9000</port>
                </replica>
            </shard>
            <!-- 数据分片2  -->
            <shard>
                <replica>
                    <host>centf8119.sharding2.db</host>
                    <port> 9000</port>
                </replica>
            </shard>
            <!-- 数据分片3  -->
            <shard>
                <replica>
                    <host>centf8120.sharding3.db</host>
                    <port>9000</port>
                </replica>
            </shard>
        </cluster_3shards_1replicas>
    </clickhouse_remote_servers>
    </yandex>

    说明:

    • clickhouse_remote_servers与config.xml中的incl属性值对应;
    • cluster_3shards_1replicas是集群名,可以随便取名;
    • 共设置3个分片,每个分片只有1个副本;

    打开clickhouse-client,查看集群:
    centf8118.sharding1.db :) select * from system.clusters;
    
    SELECT *
    FROM system.clusters
    
    ┌─cluster───────────────────────────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name──────────────┬─host_address─┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─estimated_recovery_time─┐
    │ cluster_3shards_1replicas         │         111 │ centf8118.sharding1.db │ 192.168.81.1890001 │ default │                  │            00 │
    │ cluster_3shards_1replicas         │         211 │ centf8119.sharding2.db │ 192.168.81.1990000 │ default │                  │            00 │
    │ cluster_3shards_1replicas         │         311 │ centf8120.sharding3.db │ 192.168.81.2090000 │ default │                  │            00 │
    │ test_cluster_two_shards           │         111127.0.0.1127.0.0.190001 │ default │                  │            00 │
    │ test_cluster_two_shards           │         211127.0.0.2127.0.0.290000 │ default │                  │            00 │
    │ test_cluster_two_shards_localhost │         111 │ localhost              │ 127.0.0.190001 │ default │                  │            00 │
    │ test_cluster_two_shards_localhost │         211 │ localhost              │ 127.0.0.190001 │ default │                  │            00 │
    │ test_shard_localhost              │         111 │ localhost              │ 127.0.0.190001 │ default │                  │            00 │
    │ test_shard_localhost_secure       │         111 │ localhost              │ 127.0.0.194400 │ default │                  │            00 │
    │ test_unavailable_shard            │         111 │ localhost              │ 127.0.0.190001 │ default │                  │            00 │
    │ test_unavailable_shard            │         211 │ localhost              │ 127.0.0.110 │ default │                  │            00 │
    └───────────────────────────────────┴───────────┴──────────────┴─────────────┴────────────────────────┴──────────────┴──────┴──────────┴─────────┴──────────────────┴──────────────┴─────────────────────────┘
    
    11 rows in set. Elapsed: 0.004 sec. 

    可以看到cluster_3shards_1replicas就是我们定义的集群名称,一共有三个分片,每个分片有一份数据。

    第三步:在每个实例上创建本地表

    3.1:数据准备

    前置条件:这里用到官方提供的OnTime数据集,先下载下来,并按照文档建表。

    教程: https://clickhouse.yandex/docs/en/single/?query=internal_replication#ontime

     有两种方式:

    • import from raw data
    • download of prepared partitions

    第一种方式:import from raw data

     先下载数据

    # cd /data/clickhouse/tmp
    
    for s in `seq 1987 2018`
    do
    for m in `seq 1 12`
    do
    wget https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_${s}_${m}.zip
    done
    done

    再创建表 ontime

    CREATE TABLE `ontime` (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    ) ENGINE = MergeTree
    PARTITION BY Year
    ORDER BY (Carrier, FlightDate)
    SETTINGS index_granularity = 8192;
    View Code

    最后导入数据(Loading data)

    # cd /data/clickhouse/tmp
    for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/.00//g' | clickhouse-client --host=centf9118.sharding1.db --query="INSERT INTO ontime FORMAT CSVWithNames"; done
    

      

    第二种方式:download of prepared partitions 

    经测试,第二种下载快一点,我这里采用的第二种方法,第一种没验证过。

    $ cd /data/clickhouse/tmp/
    $ curl -O https://clickhouse-datasets.s3.yandex.net/ontime/partitions/ontime.tar
    $ tar xvf ontime.tar -C /data/clickhouse # path to ClickHouse data directory
    $ # check permissions of unpacked data, fix if required
    $ sudo service clickhouse-server restart
    $ clickhouse-client --query "select count(*) from datasets.ontime"

    tar包解压后,datasets的目录所属用户组及用户都是root,需要把所属用户组和用户改为clickhouse。然后重启服务。

    cd /data/clickhouse/data
    chown -R clickhouse:clickhouse ./datasets
    
    # 下面也要操作,一开始没改该文件夹所属组等导致遇到下面的问题。
    cd /data/clickhouse/metadata
    chown -R clickhouse:clickhouse ./datasets 

    执行完会自动创建一个新库:datasets , 库里有一张表:ontime

    查看数据总量:

    [root@centf8119 data]# clickhouse-client --query "select count(*) from datasets.ontime"
    183953732
    

      

    3.2:建数据表: ontime_local

    原始数据有了,在81.19的datasets.ontime中。

    然后在另外两个服务器(81.18,81.20)上新建datasets库。

    最后三个服务器都在datasets库上分别建表:ontime_local 。表结构和ontime完全一样。

    CREATE TABLE `ontime_local` (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    ) ENGINE = MergeTree(FlightDate, (Year, FlightDate), 8192)
    View Code

    18,20的库表都创建好了,但是在19创建ontime_local表的时候就报错了,报错如下:

    Received exception from server (version 20.6.4):
    Code: 76. DB::Exception: Received from localhost:9000. DB::Exception: Cannot open file /data/clickhouse/metadata/datasets/ontime_local.sql.tmp, errno: 13, strerror: Permission denied. 
    
    0 rows in set. Elapsed: 0.007 sec. 

    是因为下载后的tar包解压后的相关文件所属用户组和用户都是root。所以clickhouse账号操作的时候没有权限。

    [root@centf8119 metadata]# pwd
    /data/clickhouse/metadata
    [root@centf8119 metadata]# ll
    total 8
    drwxr-xr-x 2 root       root        24 Aug 27 18:25 datasets
    drwxr-x--- 2 clickhouse clickhouse  65 Aug 28 10:01 default
    -rw-r----- 1 clickhouse clickhouse  42 Aug 26 09:51 default.sql
    drwxr-x--- 2 clickhouse clickhouse 133 Aug 26 13:56 system
    drwxr-x--- 2 clickhouse clickhouse  30 Aug 26 13:59 testdb
    -rw-r----- 1 clickhouse clickhouse  41 Aug 26 13:59 testdb.sql
    

      

    解决方法如下:执行完后重启服务。

    cd /data/clickhouse/metadata
    chown -R clickhouse:clickhouse ./datasets
    

      

     如果为了杜绝这个问题,可以在导入数据之前,先把数据库dataserts建好。这样就不会有这个问题了。

     这里建表的时候会往/data/clickhouse/data/datasets/目录创建表数据,同时也会往/data/clickhouse/metedata/datasets/目录中保存建表sql。前面报错就是在这一步保存建表sql的时候没有权限。

    但是/data/clickhouse/data/datasets/目录的数据文件已经有了。如果要重新创建之前报错的表,需要在这个目录中删除相应的表名目录文件。否则会提示该表文件已存在:

    Code: 57. DB::Exception: Received from localhost:9000. DB::Exception: Directory for table data data/datasets/ontime_local/ already exists. 
    

      

    3.3:建分布表:ontime_all 

    CREATE TABLE ontime_all AS ontime_local
    ENGINE = Distributed(cluster_3shards_1replicas, datasets, ontime_local, rand());

    分布表(Distributed)本身不存储数据,相当于路由,需要指定集群名、数据库名、数据表名、分片KEY,这里分片用rand()函数,表示随机分片。

    查询分布表,会根据集群配置信息,路由到具体的数据表,再把结果进行合并。

    ontime_all与ontime在同一个节点上,方便插入数据。这里只要在81.19的datasets库创建就行了。

    centf8119.sharding2.db :) show tables;
    
    SHOW TABLES
    
    ┌─name─────────┐
    │ ontime       │
    │ ontime_all   │
    │ ontime_local │
    └──────────────┘
    
    3 rows in set. Elapsed: 0.004 sec. 
    

      

    3.4:插入数据

    INSERT INTO ontime_all SELECT * FROM ontime;
    

     

    把ontime的数据插入到ontime_all,ontime_all会随机插入到三个节点的ontime_local里。

    插入完成后,查看总数据量:

    centf8119.sharding2.db :) INSERT INTO ontime_all SELECT * FROM ontime;
    
    INSERT INTO ontime_all SELECT *
    FROM ontime
    
    Ok.
    
    0 rows in set. Elapsed: 486.390 sec. Processed 183.95 million rows, 133.65 GB (378.20 thousand rows/s., 274.78 MB/s.) 
    
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) select count(*) from ontime_all;
    
    SELECT count(*)
    FROM ontime_all
    
    ┌───count()─┐
    │ 183953732 │
    └───────────┘
    
    1 rows in set. Elapsed: 0.014 sec. 

    查看每个节点的数据量:61322750 + 61311299 + 61319683 = 183953732

    可以看到,每个节点大概有1/3的数据。

    3.5:性能对比

    对比一下分片与不分片的性能差异。

    不分片: (执行第一次耗时3.561 sec,第二次耗时1.214 sec。第二次更快可能是缓存的缘故)

    centf8119.sharding2.db :) select Carrier, count() as c, round(quantileTDigest(0.99)(DepDelay), 2) as q from ontime group by Carrier order by q desc limit 5;
    
    SELECT 
        Carrier,
        count() AS c,
        round(quantileTDigest(0.99)(DepDelay), 2) AS q
    FROM ontime
    GROUP BY Carrier
    ORDER BY q DESC
    LIMIT 5
    
    ↙ Progress: 0.00 rows, 0.00 B (0.00 rows/s., 0.00 B/s.) 
    ← Progress: 1.34 million rows, 8.05 MB (10.53 million rows/s., 63.20 MB/s.) 
    ↖ Progress: 6.59 million rows, 39.53 MB (28.92 million rows/s., 173.53 MB/s.)  3%
    ↑ Progress: 10.91 million rows, 65.45 MB (33.25 million rows/s., 199.50 MB/s.)  5%
    ↗ Progress: 15.37 million rows, 92.21 MB (35.88 million rows/s., 215.26 MB/s.)  8%
    → Progress: 20.82 million rows, 124.90 MB (39.38 million rows/s., 236.29 MB/s.) ██████████▋                                                                                      11%
    ↘ Progress: 25.34 million rows, 152.03 MB (40.29 million rows/s., 241.77 MB/s.) █████████████                                                                                    13%
    ↓ Progress: 30.17 million rows, 181.01 MB (41.38 million rows/s., 248.29 MB/s.) ███████████████▌                                                                                 16%
    ┌─Carrier─┬───────c─┬──────q─┐
    │ G4      │   80908268.21 │
    │ NK      │  559143194.38 │
    │ B6      │ 3246853194.14 │
    │ EV      │ 6396753187.97 │
    │ YV      │ 1882273179.92 │
    └─────────┴─────────┴────────┘
    
    5 rows in set. Elapsed: 3.561 sec. Processed 183.95 million rows, 1.10 GB (51.66 million rows/s., 309.97 MB/s.) 
    
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) 
    centf8119.sharding2.db :) SELECT     Carrier,     count() AS c,     round(quantileTDigest(0.99)(DepDelay), 2) AS q FROM ontime GROUP BY Carrier ORDER BY q DESC LIMIT 5;
    
    SELECT 
        Carrier,
        count() AS c,
        round(quantileTDigest(0.99)(DepDelay), 2) AS q
    FROM ontime
    GROUP BY Carrier
    ORDER BY q DESC
    LIMIT 5
    
    ┌─Carrier─┬───────c─┬──────q─┐
    │ G4      │   80908268.28 │
    │ NK      │  559143194.43 │
    │ B6      │ 3246853194.06 │
    │ EV      │ 6396753188.05 │
    │ YV      │ 1882273179.95 │
    └─────────┴─────────┴────────┘
    
    5 rows in set. Elapsed: 1.214 sec. Processed 183.95 million rows, 1.10 GB (151.48 million rows/s., 908.92 MB/s.) 

    第一次执行

     第二次执行:

    分片: (执行第一次耗时17.254 sec,第二次耗时0.892 sec。)

    centf8119.sharding2.db :) select Carrier, count() as c, round(quantileTDigest(0.99)(DepDelay), 2) as q from ontime_all group by Carrier order by q desc limit 5;
    
    SELECT 
        Carrier,
        count() AS c,
        round(quantileTDigest(0.99)(DepDelay), 2) AS q
    FROM ontime_all
    GROUP BY Carrier
    ORDER BY q DESC
    LIMIT 5
    
    ┌─Carrier─┬───────c─┬──────q─┐
    │ G4      │   80908268.15 │
    │ NK      │  559143194.49 │
    │ B6      │ 3246853193.96 │
    │ EV      │ 6396753187.98 │
    │ YV      │ 1882273179.98 │
    └─────────┴─────────┴────────┘
    
    5 rows in set. Elapsed: 17.254 sec. Processed 183.95 million rows, 1.10 GB (10.66 million rows/s., 63.97 MB/s.) 
    
    centf8119.sharding2.db :) select Carrier, count() as c, round(quantileTDigest(0.99)(DepDelay), 2) as q from ontime_all group by Carrier order by q desc limit 5;
    
    SELECT 
        Carrier,
        count() AS c,
        round(quantileTDigest(0.99)(DepDelay), 2) AS q
    FROM ontime_all
    GROUP BY Carrier
    ORDER BY q DESC
    LIMIT 5
    
    ┌─Carrier─┬───────c─┬──────q─┐
    │ G4      │   80908268.5 │
    │ NK      │  559143194.41 │
    │ B6      │ 3246853194.04 │
    │ EV      │ 6396753188.07 │
    │ YV      │ 1882273179.88 │
    └─────────┴─────────┴────────┘
    
    5 rows in set. Elapsed: 0.892 sec. Processed 183.95 million rows, 1.10 GB (206.26 million rows/s., 1.24 GB/s.) 

    为什么第一次耗时这么久,不明白。

    第一次执行:

    第二次执行:

     现在,停掉一个节点,会是神马情况? 停掉20的clickhouse-server。

    [root@centf8120 metadata]# service clickhouse-server stop;
    Stop clickhouse-server service: DONE
    

     

    然后在81.19上查询:

    centf8119.sharding2.db :) select Carrier, count() as c, round(quantileTDigest(0.99)(DepDelay), 2) as q from ontime_all group by Carrier order by q desc limit 5;
    
    SELECT 
        Carrier,
        count() AS c,
        round(quantileTDigest(0.99)(DepDelay), 2) AS q
    FROM ontime_all
    GROUP BY Carrier
    ORDER BY q DESC
    LIMIT 5
    
    ↖ Progress: 71.35 million rows, 428.13 MB (129.17 million rows/s., 775.05 MB/s.)  53%
    Received exception from server (version 20.6.4):
    Code: 279. DB::Exception: Received from localhost:9000. DB::Exception: All connection tries failed. Log: 
    
    Code: 32, e.displayText() = DB::Exception: Attempt to read after eof (version 20.6.4.44 (official build))
    Code: 210, e.displayText() = DB::NetException: Connection refused (centf8120.sharding3.db:9000) (version 20.6.4.44 (official build))
    Code: 210, e.displayText() = DB::NetException: Connection refused (centf8120.sharding3.db:9000) (version 20.6.4.44 (official build))
    
    : While executing Remote. 
    
    0 rows in set. Elapsed: 0.655 sec. Processed 71.35 million rows, 428.13 MB (108.91 million rows/s., 653.48 MB/s.) 

    报错了,看来clickhouse的处理很严格,如果一个分片不可用,就整个分布式表都不可用了。

    当然,此时如果查本地表ontime_local还是可以的。

    那么,如何解决整个问题呢?这就是前文所说的稳定性问题了,解决方案是:数据备份!

    第四步:数据备份

    说明一点,数据备份与分片没有必然联系,这是两个方面的问题。但在clickhouse中,replica是挂在shard上的,因此要用多副本,必须先定义shard。

    最简单的情况:1个分片多个副本。

    4.1:添加集群

    像之前一样,再配置一个集群,叫做cluster_1shards_2replicas,表示1分片2副本,配置信息如下:

     vim /etc/clickhouse-server/metrika.xml

    <yandex>
            <!-- 1分片2备份 -->
            <cluster_1shards_2replicas>
                <shard>
                    <internal_replication>false</internal_replication>
                    <replica>
                        <host>centf8118.sharding1.db</host>
                        <port>9000</port>
                    </replica>
                    <replica>
                        <host>centf8119.sharding2.db</host>
                        <port>9000</port>
                    </replica>
                </shard>
            </cluster_1shards_2replicas>
    </yandex>

     注意,如果配置文件没有问题,是不用重启clickhouse-server的,会自动加载!

     

    4.2:建本地数据表

     建新数据表名为ontime_local_2,在三台机器分别执行。

    CREATE TABLE `ontime_local_2` (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    ) ENGINE = MergeTree(FlightDate, (Year, FlightDate), 8192)
    View Code

    4.3:建分布表

     建新分布表名为ontime_all_2,在81.19机器建一个就行了。

    CREATE TABLE ontime_all_2 AS ontime_local_2
    ENGINE = Distributed(cluster_1shards_2replicas, datasets, ontime_local_2, rand());

    4.4:导入数据

    INSERT INTO ontime_all_2 SELECT * FROM ontime

    4.5:查询数据

    查询ontime_local_2,两个节点都有全量数据。

    关掉一个服务器,仍能查询全量数据,数据副本已经生效。

    4.6:数据一致性探究

    既然有多副本,就有个一致性的问题:加入写入数据时,挂掉一台机器,会怎样?

    我们来模拟一下:

    1:停掉centf8118.sharding1.db服务。

    server clickhouse-server stop

    2:在centf8119.sharding2.db中队ontime_all_2分布表插入几条数据。

    insert into ontime_all_2 select * from ontime limit 10;

    3:启动centf8118.sharding1.db服务。

    service clickhouse-server start

     4:查看数据验证。

     

     查看两个机器的ontime_local_2、以及ontime_all_2,发现都是总数据量都增加了100条,说明这种情况下,集群节点之间能够自动同步。

    再模拟一个复杂点的场景:(后面三台机器简称sharding1,sharding2,sharding3)

    1. 停掉sharding1;
    2. 在sharding2中对ontime_all_2插入10条数据;
    3. 查询ontime_all_2,此时数据增加了10条;
    4. 停掉sharding2;
    5. 启动sharding1,此时,整个集群不可用;
    6. 查询ontime_all_2,此时,集群恢复可用,但数据少了10条;
    7. 启动sharding2,查询ontime_all_2、ontime_local_2,数据自动同步;

    上边都是通过ontime_all_2表插入数据的,如果通过ontime_local_2表插入数据,还能同步吗?

    1. sharding1上往ontime_local_2插入10条数据;
    2. 查询sharding2,ontime_local_2数据没有同步
    综上,通过分布表写入数据,会自动同步数据;而通过数据表写入数据,不会同步;正常情况没什么大问题。

     

    更复杂的情况没有模拟出来,但是可能会存在数据不一致的问题,官方文档描述如下:

    Each shard can have the 'internal_replication' parameter defined in the config file.

    If this parameter is set to 'true', the write operation selects the first healthy replica and writes data to it. Use this alternative if the Distributed table "looks at" replicated tables. In other words, if the table where data will be written is going to replicate them itself.

    If it is set to 'false' (the default), data is written to all replicas. In essence, this means that the Distributed table replicates data itself. This is worse than using replicated tables, because the consistency of replicas is not checked, and over time they will contain slightly different data.

    翻译下:

    分片可以设置internal_replication属性,这个属性是true或者false,默认是false。

    如果设置为true,则往本地表写入数据时,总是写入到完整健康的副本里,然后由表自身完成复制,这就要求本地表是能自我复制的。

    如果设置为false,则写入数据时,是写入到所有副本中。这时,是无法保证一致性的。

    举个栗子,一条数据要insert到ontime_all_2中,假设经过rand()实际是要写入到sharding1的ontime_local_2表中,此时ontime_local_2配置了两个副本。
    如果internal_replication是false,那么就会分别往两个副本中插入这条数据。注意!!!分别插入,可能一个成功,一个失败,插入结果不检验!这就导致了不一致性;
    而如果internal_replication是true,则只往1个副本里写数据,其他副本则是由ontime_local_2自己进行同步,这样就解决了写入一致性问题。

    虽然没有模拟出数据不一致的情况,实际中可能会遇到,所以官方建议使用表自动同步的方式,也就是internal_replication为true。

    具体怎么用,下边具体介绍。



    5:自动数据备份

    自动数据备份,是表的行为,ReplicatedXXX的表支持自动同步。

    Replicated前缀只用于MergeTree系列(MergeTree是最常用的引擎),即clickhouse支持以下几种自动备份的引擎:

    ReplicatedMergeTree
    ReplicatedSummingMergeTree
    ReplicatedReplacingMergeTree
    ReplicatedAggregatingMergeTree
    ReplicatedCollapsingMergeTree
    ReplicatedGraphiteMergeTree

    再强调一遍,Replicated表自动同步与之前的集群自动同步不同,是表的行为,与clickhouse_remote_servers配置没有关系,只要有zookeeper配置就行了。


    为了说明这个问题,先不配置clickhouse_remote_servers,只添加zookeeper配置:
    vim /etc/clickhouse-server/metrika.xml
    <yandex><zookeeper-servers>
            <node index="1">
                <host>centf8118.sharding1.db</host>
                <port>2181</port>
            </node>
            <node index="2">
                <host>centf8119.sharding2.db</host>
                <port>2181</port>
            </node>
            <node index="3">
                <host>centf8120.sharding3.db</host>
                <port>2181</port>
            </node>
        </zookeeper-servers></yandex>
    建数据表:
    sharding1:
    CREATE TABLE `ontime_replica` (
      ...
    ) ENGINE = ReplicatedMergeTree('/data/clickhouse/tables/ontime', 'replica1', FlightDate, (Year, FlightDate), 8192);
    
    sharding2:
    CREATE TABLE `ontime_replica` (
      ...
    ) ENGINE = ReplicatedMergeTree('/data/clickhouse/tables/ontime', 'replica2', FlightDate, (Year, FlightDate), 8192);
    
    sharding3:
    CREATE TABLE `ontime_replica` (
      ...
    ) ENGINE = ReplicatedMergeTree('/data/clickhouse/tables/ontime', 'replica3', FlightDate, (Year, FlightDate), 8192);

    sharding1完整建表SQL:

    CREATE TABLE `ontime_replica` (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    ) ENGINE = ReplicatedMergeTree('/data/clickhouse/tables/ontime', 'replica1', FlightDate, (Year, FlightDate), 8192)
    View Code

    sharding2完整建表SQL:

    CREATE TABLE `ontime_replica` (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    ) ENGINE = ReplicatedMergeTree('/data/clickhouse/tables/ontime', 'replica2', FlightDate, (Year, FlightDate), 8192)
    View Code

    sharding3完整建表SQL:

    CREATE TABLE `ontime_replica` (
      `Year` UInt16,
      `Quarter` UInt8,
      `Month` UInt8,
      `DayofMonth` UInt8,
      `DayOfWeek` UInt8,
      `FlightDate` Date,
      `UniqueCarrier` FixedString(7),
      `AirlineID` Int32,
      `Carrier` FixedString(2),
      `TailNum` String,
      `FlightNum` String,
      `OriginAirportID` Int32,
      `OriginAirportSeqID` Int32,
      `OriginCityMarketID` Int32,
      `Origin` FixedString(5),
      `OriginCityName` String,
      `OriginState` FixedString(2),
      `OriginStateFips` String,
      `OriginStateName` String,
      `OriginWac` Int32,
      `DestAirportID` Int32,
      `DestAirportSeqID` Int32,
      `DestCityMarketID` Int32,
      `Dest` FixedString(5),
      `DestCityName` String,
      `DestState` FixedString(2),
      `DestStateFips` String,
      `DestStateName` String,
      `DestWac` Int32,
      `CRSDepTime` Int32,
      `DepTime` Int32,
      `DepDelay` Int32,
      `DepDelayMinutes` Int32,
      `DepDel15` Int32,
      `DepartureDelayGroups` String,
      `DepTimeBlk` String,
      `TaxiOut` Int32,
      `WheelsOff` Int32,
      `WheelsOn` Int32,
      `TaxiIn` Int32,
      `CRSArrTime` Int32,
      `ArrTime` Int32,
      `ArrDelay` Int32,
      `ArrDelayMinutes` Int32,
      `ArrDel15` Int32,
      `ArrivalDelayGroups` Int32,
      `ArrTimeBlk` String,
      `Cancelled` UInt8,
      `CancellationCode` FixedString(1),
      `Diverted` UInt8,
      `CRSElapsedTime` Int32,
      `ActualElapsedTime` Int32,
      `AirTime` Int32,
      `Flights` Int32,
      `Distance` Int32,
      `DistanceGroup` UInt8,
      `CarrierDelay` Int32,
      `WeatherDelay` Int32,
      `NASDelay` Int32,
      `SecurityDelay` Int32,
      `LateAircraftDelay` Int32,
      `FirstDepTime` String,
      `TotalAddGTime` String,
      `LongestAddGTime` String,
      `DivAirportLandings` String,
      `DivReachedDest` String,
      `DivActualElapsedTime` String,
      `DivArrDelay` String,
      `DivDistance` String,
      `Div1Airport` String,
      `Div1AirportID` Int32,
      `Div1AirportSeqID` Int32,
      `Div1WheelsOn` String,
      `Div1TotalGTime` String,
      `Div1LongestGTime` String,
      `Div1WheelsOff` String,
      `Div1TailNum` String,
      `Div2Airport` String,
      `Div2AirportID` Int32,
      `Div2AirportSeqID` Int32,
      `Div2WheelsOn` String,
      `Div2TotalGTime` String,
      `Div2LongestGTime` String,
      `Div2WheelsOff` String,
      `Div2TailNum` String,
      `Div3Airport` String,
      `Div3AirportID` Int32,
      `Div3AirportSeqID` Int32,
      `Div3WheelsOn` String,
      `Div3TotalGTime` String,
      `Div3LongestGTime` String,
      `Div3WheelsOff` String,
      `Div3TailNum` String,
      `Div4Airport` String,
      `Div4AirportID` Int32,
      `Div4AirportSeqID` Int32,
      `Div4WheelsOn` String,
      `Div4TotalGTime` String,
      `Div4LongestGTime` String,
      `Div4WheelsOff` String,
      `Div4TailNum` String,
      `Div5Airport` String,
      `Div5AirportID` Int32,
      `Div5AirportSeqID` Int32,
      `Div5WheelsOn` String,
      `Div5TotalGTime` String,
      `Div5LongestGTime` String,
      `Div5WheelsOff` String,
      `Div5TailNum` String
    ) ENGINE = ReplicatedMergeTree('/data/clickhouse/tables/ontime', 'replica3', FlightDate, (Year, FlightDate), 8192)
    View Code

    查看zk信息:

    [zk: localhost:2181(CONNECTED) 0] ls /data/clickhouse/tables/ontime/replicas
    [replica2, replica3, replica1]
    

      

    可以看到,zk中已经有了对应的路径和副本信息。

    插入数据:

    注意!只在一个机器上执行插入操作。(ontime数据在sharding2上,所以我在sharding2上操作)

    # sharding2 上操作
    INSERT INTO ontime_replica SELECT * FROM ontime
    centf8119.sharding2.db :) INSERT INTO ontime_replica SELECT * FROM ontime;
    
    INSERT INTO ontime_replica SELECT *
    FROM ontime
    
    Ok.
    
    0 rows in set. Elapsed: 618.798 sec. Processed 183.95 million rows, 133.65 GB (297.28 thousand rows/s., 215.98 MB/s.) 
    View Code

    查看数据:

    分别在三个机器上查询ontime_replica,都有数据,且数据完全一样。

    可以看到,这种方式与之前方式的区别,直接写入一个节点,其他节点自动同步,完全是表的行为;

    而之前的方式必须创建Distributed表,并通过Distributed表写入数据才能同步(目前我们还没有给ontime_replica创建对应的Distributed表)。

    5.1:配置集群

            <!-- 1分片3备份 -->
            <cluster_1shards_3replicas>
                <shard>
                    <internal_replication>true</internal_replication>
                    <replica>
                        <host>centf8118.sharding1.db</host>
                        <port>9000</port>
                    </replica>
                    <replica>
                        <host>centf8119.sharding2.db</host>
                        <port>9000</port>
                    </replica>
                    <replica>
                        <host>centf8120.sharding3.db</host>
                        <port>9000</port>
                    </replica>
                </shard>
            </cluster_1shards_3replicas>

    集群名字为cluster_1shards_3replicas,1个分片,3个副本;与之前不同,这次设置internal_replication为true,表示要用表自我复制功能,而不用集群的复制功能。

    5.2:建分布表

    CREATE TABLE ontime_replica_all AS ontime_replica
    ENGINE = Distributed(cluster_1shards_3replicas, datasets, ontime_replica, rand())

    表名为ontime_replica_all,使用cluster_1shards_3replicas集群,数据表为ontime_replica。

    查询分布表:

    centf8119.sharding2.db :) select count(*) from ontime_replica;
    
    SELECT count(*)
    FROM ontime_replica
    
    ┌───count()─┐
    │ 183953732 │
    └───────────┘
    
    1 rows in set. Elapsed: 0.007 sec. 

    5.3:分布表写入

    前边说了,一个节点ontime_replica写入数据时,其他节点自动同步;那如果通过分布表ontime_replica_all写入数据会如何呢?

    其实,前文已经提到过,internal_replication为true,则通过分布表写入数据时,会自动找到“最健康”的副本写入,然后其他副本通过表自身的复制功能同步数据,最终达到数据一致。

    centf8119.sharding2.db :) select count(*) from ontime_replica;
    
    SELECT count(*)
    FROM ontime_replica
    
    ┌───count()─┐
    │ 183953732 │
    └───────────┘
    
    1 rows in set. Elapsed: 0.007 sec. 
    
    centf8119.sharding2.db :) show tables;
    
    SHOW TABLES
    
    ┌─name───────────────┐
    │ ontime             │
    │ ontime_all         │
    │ ontime_all_2       │
    │ ontime_local       │
    │ ontime_local_2     │
    │ ontime_replica     │
    │ ontime_replica_all │
    └────────────────────┘
    
    7 rows in set. Elapsed: 0.004 sec. 
    
    centf8119.sharding2.db :) insert into ontime_replica select * from ontime limit 68;
    
    INSERT INTO ontime_replica SELECT *
    FROM ontime
    LIMIT 68
    
    Ok.
    
    0 rows in set. Elapsed: 0.575 sec. 
    
    centf8119.sharding2.db :) select count(*) from ontime_replica;
    
    SELECT count(*)
    FROM ontime_replica
    
    ┌───count()─┐
    │ 183953800 │
    └───────────┘
    
    1 rows in set. Elapsed: 0.008 sec. 

    6:clickhouse分片 + 备份

    分片,是为了突破单机上限(存储、计算等上限),备份是为了高可用。

    只分片,提升了性能,但是一个分片挂掉,整个服务不可用;只备份,确实高可用了,但是整体还是受限于单机瓶颈(备份1000份与备份2份没什么区别,除了更浪费机器)。

    所以,生产中这两方面需要同时满足。

    其实,只要把之前的分片和备份整合起来就行了,例如,3分片2备份的配置如下:

    ...
            <!-- 3分片2备份:使用表备份 -->
            <cluster_3shards_2replicas>
                <shard>
                    <internal_replication>true</internal_replication>
                    <replica>
                        <host>centf8118.sharding1.db</host>
                        <port>9000</port>
                    </replica>
                    <replica>
                        <host>centf8119.sharding2.db</host>
                        <port>9000</port>
                    </replica>
                </shard>
                <shard>
                    <internal_replication>true</internal_replication>
                    <replica>
                        <host>centf8120.sharding3.db</host>
                        <port>9000</port>
                    </replica>
                    <replica>
                        <host>centf8121.sharding4.db</host>
                        <port>9000</port>
                    </replica>
                </shard>
                <shard>
                    <internal_replication>true</internal_replication>
                    <replica>
                        <host>centf8122.sharding5.db</host>
                        <port>9000</port>
                    </replica>
                    <replica>
                        <host>centf8123.sharding6.db</host>
                        <port>9000</port>
                    </replica>
                </shard>
            </cluster_3shards_2replicas>
    ...
    这里一共需要建6个数据表:
     
    CREATE TABLE `ontime_replica` (
      ...
    ) ENGINE = ReplicatedMergeTree('/data/clickhouse/tables/ontime/{shard}', '{replica}', FlightDate, (Year, FlightDate), 8192);

    其中,{shard}和{replica}是macros配置(相当于环境变量),修改配置文件:


    # vi /etc/clickhose-server/metrika.xml
    
    ...
        <macros>
            <shard>01</shard>
            <replica>01</replica>
        </macros>
    ...

     每台机器的shard和replica值根据具体情况设置,例如,这里是3分片2副本,则配置比如如下:

    centf8118.sharding1.db: shard=01, replica=01
    centf8119.sharding2.db: shard=01, replica=02
    centf8120.sharding3.db: shard=02, replica=01
    centf8121.sharding4.db: shard=02, replica=02
    centf8122.sharding5.db: shard=03, replica=01
    centf8123.sharding6.db: shard=03, replica=02

    使用macros只是为了建表方便(每个机器可以使用同样的建表语句),不是必须的,只要ReplicatedMergeTree指定zk路径和replica值即可。

    由于资源有限,这里不实验了。

    需要提醒一下,每个clickhouse-server实例只能放一个分片的一个备份,也就是3分片2备份需要6台机器(6个不同的clickhouse-server)。

    之前为了节省资源,打算循环使用,把shard1的两个副本放到sharding1、sharding2两个机器上,shard2的两个副本放到sharding2、sharding3上,shard3的两个副本放到sharding3、sharding1上,结果是不行的。

    原因是shard+replica对应一个数据表,Distributed查询规则是每个shard里找一个replica,把结果合并。

    假如按照以上设置,可能一个查询解析结果为:
    取shard1的replica1,对应sharding1的ontime_replica;
    取shard2的replica2,对应sharding3的ontime_replica;
    取shard3的replica2,对应sharding1的ontime_replica;

    最后,得到的结果是sharding1的ontime_replica查询两次+sharding3的ontime_replica查询一次,结果是不正确的。



     参考文章: https://www.jianshu.com/p/20639fdfdc99

  • 相关阅读:
    第三第四周的笔记
    第一二周的笔记
    jQuery的一些笔记
    函数的执行环境与调用对象
    从click事件理解DOM事件流
    慕课编程题JS选项卡切换
    adb(11)-重新挂载 system 分区为可写
    adb(10)-屏幕截图/录制
    adb(9)-查看设备信息
    adb(8)-查看日志
  • 原文地址:https://www.cnblogs.com/DBArtist/p/clickhouse_cluster.html
Copyright © 2011-2022 走看看