pg_shard是一个PostgreSQL的sharding extension。可以用于Shards、Replicates tables和高可用。它可以在不修改Applications的情况下无缝分配(或叫做分发?)SQL。作为一个独立的extension,pg_shard适用与很多NOSQL的应用场景。
对一个pg_shard的Cluster来说,PG的各节点分为Master node和Worker node两类。Master node的主要用来存储metadata和作为所有查询的入口。
可以挑选Cluster中的任意一个PG node作为Master,其他节点作为Workers。
最简单的测试方法是在同一台主机上同时配置Master和Worker实例。在同一台主机配置时,每个PG实例运行在不同端口,可以简单的适用“localhost”作为Worker node的节点名称。一般,可以每台主机配置一个PG实例,这样更适合生产环境的复杂情况。这种配置情况下,需要配置这些PG实例可以相互通信。(主要是配置postgresql.conf的listen_address参数和pg_hba.conf文件)。无论怎样部署,Master都必须可以通过TCP无密码连接到Workder Node。
pg_shard的环境搭建
编译安装pg_shard
修改postgresql.conf文件的shared_preload_libraries=’pg_shard’
创建pg_shard的配置文件pg_worker_list.conf文件,格式如下:
# hostname port-number
worker-1 5432
worker-2 5433
需要重新启动Master node。
对Table进行分片的步骤
在Master节点上:
CREATE EXTENSION pg_shard;
SELECT master_create_distributed_table(‘table_name’,’partition_column’);
在Worker节点上:
SELECT master_create_worker_shards(‘table_name’,’shards_num’,’rep_num’);
一旦创建了shards,就可以在Cluster上进行查询了。目前,UPDATE和DELETE需要在WHERE条件子句中包含Partition Column。
pg_shard的管理工具
pgs_distrubution_metadata.partition;
pgs_distrubution_metadata.shard;
pgs_distrubution_metadata.shard_placement;
pg_shard的使用限制
1、不支持跨shard的事务
2、不支持除了partition key和foreign key的其他column上的唯一约束
3、不支持Distrubuted Join(但pg_shard的出品公司的CITUSDB是支持的,开源版本中不支持)
pg_shard不支持的语法
1、不支持修改表,如果要修改表,需要通过脚本在每个Worker上执行
2、DROP TABLE
3、INSERT INTO ...select...
目前不支持和不完善的技术点很多,不建议在生产使用。可以在应用场景比较单一、简单的地方用。例如:
tbl_example (id integer,val jsonb);
这样的表,来模拟类似mongodb的分片。
以下为功能点实际测试部分
---------------------------------------------------------------------------
目前仅测试了部分简单、常见的功能,红色粗体为不支持,红色非粗体为ERROR信息,供参考。
测试环境:master + 3个worker
对表进行shard:
sharddb=# SELECT master_create_distributed_table('customer_reviews', 'customer_id');
sharddb=# SELECT master_create_worker_shards('customer_reviews', 16, 2);
这时,在不同节点看到的表是不同的:
master:
sharddb=# d+
List of relations
Schema | Name | Type | Owner | Size | Description
-----{}---------------{}-------+----------
public | customer_reviews | table | postgres | 32 kB |
(1 row)
worker-1:
sharddb=# d+
List of relations
Schema | Name | Type | Owner | Size | Description
-----{}---------------------{}-------+----------
public | customer_reviews_10000 | table | postgres | 32 kB |
......
public | customer_reviews_10015 | table | postgres | 32 kB |
(11 rows)
worker-2:
sharddb=# d+
List of relations
Schema | Name | Type | Owner | Size | Description
-----{}---------------------{}-------+----------
public | customer_reviews_10000 | table | postgres | 32 kB |
......
public | customer_reviews_10015 | table | postgres | 32 kB |
(11 rows)
worker-3:
sharddb=# d+
List of relations
Schema | Name | Type | Owner | Size | Description
-----{}---------------------{}-------+----------
public | customer_reviews_10001 | table | postgres | 32 kB |
......
public | customer_reviews_10014 | table | postgres | 32 kB |
(10 rows)
在三个worker节点上,所分布的表是不同的,一共有16个表,按照两两保存副本的方式分别创建了表。
简单的写入数据和查询
sharddb=# INSERT INTO customer_reviews (customer_id, review_rating) VALUES ('HN802', 5);
sharddb=# INSERT INTO customer_reviews VALUES ('HN802', '2004-01-01', 1, 10, 4, 'B00007B5DN', 'Tug of War', 133191, 'Music', 'Indie Music', 'Pop', '{}');
sharddb=# INSERT INTO customer_reviews (customer_id, review_rating) VALUES ('FA2K1', 10);
无WHERE子句的SELECT
sharddb=# select * from customer_reviews ;
customer_id | review_date | review_rating | review_votes | review_helpful_votes | product_id | product_title | product_sales_rank | product_group | product_category | product_subcategory | similar_product_ids
----------{}----------------{}------------------------{}---------------{}-----------------------{}---------------------------+------------------
HN802 | | 5 | | | | | | | | |
HN802 | 2004-01-01 | 1 | 10 | 4 | B00007B5DN | Tug of War | 133191 | Music | Indie Music | Pop | {}
FA2K1 | | 10 | | | | | | | | |
(3 rows)
无WHERE子句的avg
sharddb=# SELECT avg(review_rating) FROM customer_reviews;
avg
--------------------
5.3333333333333333
(1 row)
带有GROUP BY子句的avg
sharddb=# SELECT customer_id,avg(review_rating) from customer_reviews GROUP BY customer_id;
customer_id | avg
----------+------------------
FA2K1 | 10.0000000000000000
HN802 | 3.0000000000000000
(2 rows)
带有HAVING子句的avg
sharddb=# SELECT customer_id,avg(review_rating) as avgrating from customer_reviews GROUP BY customer_id HAVING customer_id <> 'FA2K1';
customer_id | avgrating
----------+-----------------
HN802 | 3.0000000000000000
(1 row)
无WHERE子句的avg
sharddb=# SELECT avg(review_rating) FROM customer_reviews WHERE customer_id = 'HN802';
avg
--------------------
3.0000000000000000
(1 row)
COUNT , NULL值
sharddb=# SELECT count(*) FROM customer_reviews;
count
-------
3
(1 row)
sharddb=# SELECT count(*) FROM customer_reviews WHERE review_helpful_votes <> 4;
count
-------
0
(1 row)
sharddb=# SELECT count(*) FROM customer_reviews WHERE review_helpful_votes = 4;
count
-------
1
(1 row)
sharddb=# SELECT count(*) FROM customer_reviews WHERE review_helpful_votes IS NULL;
count
-------
2
(1 row)
^
sharddb=# SELECT count(*) FROM customer_reviews WHERE review_helpful_votes IS NOT NULL;
count
-------
1
(1 row)
带有分区条件列的UPDATE操作
sharddb=# UPDATE customer_reviews SET review_votes = 10 WHERE customer_id = 'HN802';
UPDATE 2
sharddb=#
不带分区条件列的UPDATE操作:
sharddb=# UPDATE customer_reviews SET review_votes = 10 + 1 WHERE review_votes = 10;
ERROR: cannot modify multiple shards during a single query
sharddb=#
不带分区条件列的DELETE操作:
sharddb=# DELETE FROM customer_reviews WHERE review_votes <> 99;
ERROR: cannot modify multiple shards during a single query
sharddb=#
带有分区条件列和其他列的UPDATE操作
sharddb=# UPDATE customer_reviews SET review_votes = 10 + 1 WHERE customer_id = 'HN802' AND review_votes = 10;
UPDATE 2
sharddb=#
管理工具
pgs_distribution_metadata SCHEMA master节点用来存放元数据
sharddb=# dn+
List of schemas
Name | Owner | Access privileges | Description
------------------------{}--------------------+---------------------
pgs_distribution_metadata | postgres | |
public | postgres | postgres=UC/postgres+| standard public schema
| | =UC/postgres |
(2 rows)
sharddb=# SELECT * FROM pgs_distribution_metadata.partition;
relation_id | partition_method | key
----------{}----------------------
24842 | h | customer_id
(1 row)
sharddb=# SELECT * FROM pgs_distribution_metadata.shard;
id | relation_id | storage | min_value | max_value
----{}----------{}-----------------
10000 | 24842 | t | -2147483648 | -1879048194
10001 | 24842 | t | -1879048193 | -1610612739
10002 | 24842 | t | -1610612738 | -1342177284
10003 | 24842 | t | -1342177283 | -1073741829
10004 | 24842 | t | -1073741828 | -805306374
10005 | 24842 | t | -805306373 | -536870919
10006 | 24842 | t | -536870918 | -268435464
10007 | 24842 | t | -268435463 | -9
10008 | 24842 | t | -8 | 268435446
10009 | 24842 | t | 268435447 | 536870901
10010 | 24842 | t | 536870902 | 805306356
10011 | 24842 | t | 805306357 | 1073741811
10012 | 24842 | t | 1073741812 | 1342177266
10013 | 24842 | t | 1342177267 | 1610612721
10014 | 24842 | t | 1610612722 | 1879048176
10015 | 24842 | t | 1879048177 | 2147483647
(16 rows)
sharddb=# SELECT * FROM pgs_distribution_metadata.shard_placement;
id | shard_id | shard_state | node_name | node_port
--------------{}-------------
1 | 10000 | 1 | localhost | 5433
2 | 10000 | 1 | localhost | 5434
......
32 | 10015 | 1 | localhost | 5434
(32 rows)
增加表,但先写几条数据再做shard,会有以下几个严重的错误,所以一定要遵循创建表->做shard-->写数据,否则在做shard之前写入的数据都处于不可见的状态,而且毫无提示:
1)不能自动重新分发数据
2)在worker nodes中并没有成功创建表,而且没有错误提示
3)master节点查询所有shard,还有customer_detail表的信息(releation_id=24940),实际上,在drop掉这张表后,在pg系统表中该表已经被删除了
(master)
sharddb=# INSERT INTO customer_detail VALUES ('HN802','a'),('HN802','b'),('FA2K1','c');
INSERT 0 3
sharddb=#
sharddb=# SELECT master_create_distributed_table('customer_detail', 'customer_id');
master_create_distributed_table
---------------------------------
(1 row)
sharddb=#
sharddb=# SELECT master_create_worker_shards('customer_detail', 16, 2);
master_create_worker_shards
-----------------------------
(1 row)
sharddb=#
sharddb=#
sharddb=# select * from customer_detail ;
customer_id | customer_val
----------+-----------
(0 rows)
(worker nodes)
sharddb=# drop table customer_detail;
ERROR: table "customer_detail" does not exist
sharddb=#
(master)
sharddb=# SELECT * FROM pgs_distribution_metadata.shard;
id | relation_id | storage | min_value | max_value
----{}----------{}-----------------
10000 | 24842 | t | -2147483648 | -1879048194
......(略)
10031 | 24940 | t | 1879048177 | 2147483647
(32 rows)
sharddb=# CREATE TABLE tbl_detail(customer_id text, fid integer , detailval text);
CREATE TABLE
sharddb=#
sharddb=#
sharddb=# SELECT master_create_distributed_table('tbl_detail', 'customer_id');
master_create_distributed_table
---------------------------------
(1 row)
sharddb=# SELECT master_create_worker_shards('tbl_detail', 16, 2);
master_create_worker_shards
-----------------------------
(1 row)
sharddb=#
sharddb=#
sharddb=# select customer_id from customer_reviews ;
customer_id
-------------
HN802
HN802
FA2K1
(3 rows)
插入测试数据,不能使用如下语法批量插入,只能一行一行的插入
sharddb=# INSERT INTO tbl_detail VALUES('HN802',1,'a'),('HN802',2,'b'),('HN802',3,'c'),('FA2K1',4,'d');
ERROR: cannot perform distributed planning for the given query
DETAIL: Multi-row INSERTs to distributed tables are not supported.
sharddb=#
sharddb=# INSERT INTO tbl_detail VALUES('HN802',1,'a');
INSERT 0 1
sharddb=# INSERT INTO tbl_detail VALUES ('HN802',2,'b');
INSERT 0 1
sharddb=# INSERT INTO tbl_detail VALUES ('HN802',3,'c');
INSERT 0 1
sharddb=# INSERT INTO tbl_detail VALUES ('FA2K1',4,'d');
INSERT 0 1
sharddb=#
sharddb=#
sharddb=#
sharddb=# SELECT * FROM tbl_detail ;
customer_id | fid | detailval
----------{}-----------
HN802 | 1 | a
HN802 | 2 | b
HN802 | 3 | c
FA2K1 | 4 | d
(4 rows)
简单的join测试
sharddb=#
sharddb=# SELECT A.*,B.* FROM customer_reviews A, tbl_detail B WHERE A.customer_id = B.customer_id;
ERROR: cannot perform distributed planning for the given query
DETAIL: Joins are not supported in distributed queries.
sharddb=#
无法查看EXPLAIN,这是个硬伤,同时,VACUUM 、 ANALYZE 也需要单独在每个worker操作。
sharddb=# EXPLAIN SELECT * FROM tbl_detail ;
ERROR: EXPLAIN commands on distributed tables are unsupported
sharddb=#
在MASTER上创建索引,在其他worker上都是不能同步的,DROP一样对worker无效
sharddb=# CREATE INDEX CONCURRENTLY ON tbl_detail (customer_id);
CREATE INDEX
sharddb=# d+ tbl_detail
Table "public.tbl_detail"
Column | Type | Modifiers | Storage | Stats target | Description
----------{}----------{}------------+----------
customer_id | text | | extended | |
fid | integer | | plain | |
detailval | text | | extended | |
Indexes:
"tbl_detail_customer_id_idx" btree (customer_id)
sharddb=#
sharddb=# DROP INDEX tbl_detail_customer_id_idx;
DROP INDEX
sharddb=#
ALTER TABLE不会抛出错误,但是如果不在其他节点做同样操作将无法再正确的读取数据
sharddb=# ALTER TABLE tbl_detail ADD COLUMN newcolumn text DEFAULT NULL;
ALTER TABLE
sharddb=# select * from tbl_detail ;
WARNING: Bad result from localhost:5434
DETAIL: Remote message: column "newcolumn" does not exist
WARNING: Bad result from localhost:5433
DETAIL: Remote message: column "newcolumn" does not exist
ERROR: could not receive query results
sharddb=#
DROP DATABASE 需要注意顺序
在master节点存在sharddb时,在worker删除database时会报出错误,DROP掉MASTER节点上的对象后,才可以手动删除对象:
postgres=# DROP DATABASE sharddb;
ERROR: database "sharddb" is being accessed by other users
DETAIL: There is 1 other session using the database.