zoukankan      html  css  js  c++  java
  • PostgreSQL的并行查询

    PostgreSQL的并行化包含三个重要组件:进程本身(leader进程)、gather、workers。没有开启并行化的时候,进程自身处理所有的数据;一旦计划器决定某个查询或查询中部分可以使用并行的时候,就会在查询的并行化部分添加一个gather节点,将gather节点作为子查询树的根节点。

    查询执行是从leader进程开始。一旦开启了并行或查询中部分支持并行,就会分配一个gather节点和多个worker线程。relation的blocks在各个workers线程之间划分。workers的数量受postgresql的配置参数控制。workers之间使用共享内存相互协调和通信,一旦worker完成了自己的工作,结果就被传给了leader进程。

    workers和leader进程之间使用消息队列(依赖共享内存)进行通信。每个进程有两个队列:一个是error队列;一个是tuples队列。

     

     

    并行顺序扫描(Parallel sequential scan)

    在PostgreSQL 9.6中,增加了对并行顺序扫描的支持。顺序扫描是在表上进行的扫描,在该表中一个接一个的块顺序地被评估。就其本质而言,顺序扫描允许并行化。这样,整个表将在多个workers线程之间顺序扫描。

    并行顺序扫描快并不是因为可以并行地读,而是将数据分散到了多个cpu。

    abce=# explain analyze select work_hour from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                               QUERY PLAN                                                           
    --------------------------------------------------------------------------------------------------------------------------------
     Seq Scan on hh_adds_static_day  (cost=0.00..261864.36 rows=4241981 width=4) (actual time=0.012..1407.214 rows=4228109 loops=1)
       Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
       Rows Removed by Filter: 735600
     Planning Time: 0.108 ms
     Execution Time: 1585.835 ms
    (5 rows)
    
    abce=# 
    

    顺序扫描产生了大量的行,但是没有使用聚合函数。因此,查询使用的是单个cpu核心。

    增加一个sum函数后,很明显使用了两个工作线程,从而使得查询加速:

    abce=# explain analyze select sum(work_hour) from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                                            QUERY PLAN                                                                        
    ----------------------------------------------------------------------------------------------------------------------------------------------------------
     Finalize Aggregate  (cost=231089.60..231089.61 rows=1 width=4) (actual time=749.998..751.529 rows=1 loops=1)
       ->  Gather  (cost=231089.38..231089.59 rows=2 width=4) (actual time=749.867..751.515 rows=3 loops=1)
             Workers Planned: 2
             Workers Launched: 2
             ->  Partial Aggregate  (cost=230089.38..230089.39 rows=1 width=4) (actual time=746.463..746.464 rows=1 loops=3)
                   ->  Parallel Seq Scan on hh_adds_static_day  (cost=0.00..225670.65 rows=1767492 width=4) (actual time=0.032..489.501 rows=1409370 loops=3)
                         Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
                         Rows Removed by Filter: 245200
     Planning Time: 0.112 ms
     Execution Time: 751.611 ms
    (10 rows)
    
    abce=# 
    

    并行聚合(Parallel Aggregation)

    在数据库中,计算聚合是非常昂贵的操作。如果以单个进程进行执行,则这将花费相当长的时间。在PostgreSQL 9.6中,通过简单地将它们分成多个块(分而治之策略)来增加了并行计算的能力。多个worker线程执行聚合的部分,然后leader再根据它们的结果计算最终结果。

    从技术上讲,将Partial Aggregate节点添加到计划树中,并且每个Partial Aggregate节点包含一个worker线程的输出。然后将这些输出发送到Finalize Aggregate节点,该节点合并来自多个(所有)Partial Aggregate节点的聚合。如此有效的并行部分计划在根部包括一个Finalize Aggregate节点,以及一个将Partial Aggregate节点作为子节点的Gather节点。

    ''Parallel Seq Scan''节点生成用于部分聚合(''Partial Aggregate'')的行。

    ''Partial Aggregate''节点使用SUM()函数减少这些行。最后,由''Gather''节点收集每个worker的总和计数。

    ''Finalize Aggregate''节点计算最后的总和。如果你使用的自己的聚合函数,别忘了将其标记为''parallel safe''的。

    worker数量

    可以动态调整worker的数量:

    abce=# show max_parallel_workers_per_gather;
     max_parallel_workers_per_gather 
    ---------------------------------
     2
    (1 row)
    
    abce=# alter system set max_parallel_workers_per_gather=4;
    ALTER SYSTEM
    abce=# select * from pg_reload_conf();
     pg_reload_conf 
    ----------------
     t
    (1 row)
    
    abce=# explain analyze select sum(work_hour) from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                                           QUERY PLAN                                                                        
    ---------------------------------------------------------------------------------------------------------------------------------------------------------
     Finalize Aggregate  (cost=218981.25..218981.26 rows=1 width=4) (actual time=473.424..475.156 rows=1 loops=1)
       ->  Gather  (cost=218980.83..218981.24 rows=4 width=4) (actual time=473.314..475.144 rows=5 loops=1)
             Workers Planned: 4
             Workers Launched: 4
             ->  Partial Aggregate  (cost=217980.83..217980.84 rows=1 width=4) (actual time=468.769..468.770 rows=1 loops=5)
                   ->  Parallel Seq Scan on hh_adds_static_day  (cost=0.00..215329.59 rows=1060495 width=4) (actual time=0.036..306.854 rows=845622 loops=5)
                         Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
                         Rows Removed by Filter: 147120
     Planning Time: 0.150 ms
     Execution Time: 475.218 ms
    (10 rows)
    
    abce=# 
    

    我们将数量从2变成了4。

    使用多少个worker进程

    首先,max_parallel_workers_per_gather参数定义了worker的最小数量。

    其次,查询执行器从池中获取worker的数量受限于max_parallel_workers的值。

    最后,最顶层的限制是max_worker_processes的值,该参数定义了后台worker进程的总数量。

    如果分配worker进程失败,就会切换成单进程执行。

    查询计划器会根据表或索引的大小,考虑减少worker进程的数量。受参数min_parallel_table_scan_size、min_parallel_index_scan_size影响。

    参数的默认设置:

    abce=# show min_parallel_table_scan_size;
     min_parallel_table_scan_size 
    ------------------------------
     8MB
    (1 row)
    
    abce=# show min_parallel_index_scan_size ;
     min_parallel_index_scan_size 
    ------------------------------
     512kB
    (1 row)
    
    abce=# 
    

    影响关系:

    set min_parallel_table_scan_size='8MB'
    8MB table => 1 worker
    24MB table => 2 workers
    72MB table => 3 workers
    x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
    

    每当表的大小是min_parallel(index|table)scan_size的三倍,postgresql就会增加一个worker进程。worker进程的数量不是基于cost的。

    在实践中,这些规则并不总是被遵守的,可以对特定的表执行alter table ... set (parallel_workers=N)进行设置。

     

    为什么并行执行没有被使用?

    除了并行执行的一些限制,postgresql也检查成本(cost):

    ·parallel_setup_cost避免了对小的查询采用并行执行。它对用于内存设置、进程启动和初始通信的时间进行建模

    ·parallel_tuple_cost:leader进程和worker进程之间的通信会花费很长的时间。时间与worker发送的元组数量成比例。该参数对通信成本进行建模。

     

    嵌套循环连接(Nested loop joins)

    从9.6版本开始,postgresql支持对“Nested loop”执行并行操作:

    explain (costs off) select c_custkey, count(o_orderkey)
                    from    customer left outer join orders on
                                    c_custkey = o_custkey and o_comment not like '%special%deposits%'
                    group by c_custkey;
                                          QUERY PLAN                                      
    --------------------------------------------------------------------------------------
     Finalize GroupAggregate
       Group Key: customer.c_custkey
       ->  Gather Merge
             Workers Planned: 4
             ->  Partial GroupAggregate
                   Group Key: customer.c_custkey
                   ->  Nested Loop Left Join
                         ->  Parallel Index Only Scan using customer_pkey on customer
                         ->  Index Scan using idx_orders_custkey on orders
                               Index Cond: (customer.c_custkey = o_custkey)
                               Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
    

    gather发生在最后阶段,因此“Nested Loop Left Join”是并行操作。从版本10开始提供“Parallel Index Only Scan”。其行为与并行顺序扫描类似。条件c_custkey = o_custkey为每个客户行读取一个订单。 因此,它不是并行的。

     

    哈希连接(Hash Join)

    在PostgreSQL 11之前,每个worker都构建自己的哈希表。因此,4个以上的workers进程无法提高性能。新的实现使用一个共享哈希表。每个worker都可以利用WORK_MEM来构建哈希表。

    select
            l_shipmode,
            sum(case
                    when o_orderpriority = '1-URGENT'
                            or o_orderpriority = '2-HIGH'
                            then 1
                    else 0
            end) as high_line_count,
            sum(case
                    when o_orderpriority <> '1-URGENT'
                            and o_orderpriority <> '2-HIGH'
                            then 1
                    else 0
            end) as low_line_count
    from
            orders,
            lineitem
    where
            o_orderkey = l_orderkey
            and l_shipmode in ('MAIL', 'AIR')
            and l_commitdate < l_receiptdate
            and l_shipdate < l_commitdate
            and l_receiptdate >= date '1996-01-01'
            and l_receiptdate < date '1996-01-01' + interval '1' year
    group by
            l_shipmode
    order by
            l_shipmode
    LIMIT 1;
    
                                                                                                                                        QUERY PLAN                                               
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
       ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
             Group Key: lineitem.l_shipmode
             ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
                   Workers Planned: 4
                   Workers Launched: 4
                   ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                         Group Key: lineitem.l_shipmode
                         ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                               Sort Key: lineitem.l_shipmode
                               Sort Method: external merge  Disk: 2304kB
                               Worker 0:  Sort Method: external merge  Disk: 2064kB
                               Worker 1:  Sort Method: external merge  Disk: 2384kB
                               Worker 2:  Sort Method: external merge  Disk: 2264kB
                               Worker 3:  Sort Method: external merge  Disk: 2336kB
                               ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                     Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                     ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                           Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                           Rows Removed by Filter: 11934691
                                     ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                           Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                           ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
     Planning Time: 0.977 ms
     Execution Time: 7923.770 ms
    

    这里每个worker帮助构建一个共享的hash表。

     

    合并连接(Merge Join)

    鉴于合并连接的自身特性,使其不支持并行查询。如果合并连接是查询执行的最后一个阶段-你仍可以看到并行执行。

    -- Query 2 from TPC-H
    explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
    from    part, supplier, partsupp, nation, region
    where
            p_partkey = ps_partkey
            and s_suppkey = ps_suppkey
            and p_size = 36
            and p_type like '%BRASS'
            and s_nationkey = n_nationkey
            and n_regionkey = r_regionkey
            and r_name = 'AMERICA'
            and ps_supplycost = (
                    select
                            min(ps_supplycost)
                    from    partsupp, supplier, nation, region
                    where
                            p_partkey = ps_partkey
                            and s_suppkey = ps_suppkey
                            and s_nationkey = n_nationkey
                            and n_regionkey = r_regionkey
                            and r_name = 'AMERICA'
            )
    order by s_acctbal desc, n_name, s_name, p_partkey
    LIMIT 100;
                                                    QUERY PLAN                                                
    ----------------------------------------------------------------------------------------------------------
     Limit
       ->  Sort
             Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
             ->  Merge Join
                   Merge Cond: (part.p_partkey = partsupp.ps_partkey)
                   Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
                   ->  Gather Merge
                         Workers Planned: 4
                         ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                               Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
                   ->  Materialize
                         ->  Sort
                               Sort Key: partsupp.ps_partkey
                               ->  Nested Loop
                                     ->  Nested Loop
                                           Join Filter: (nation.n_regionkey = region.r_regionkey)
                                           ->  Seq Scan on region
                                                 Filter: (r_name = 'AMERICA'::bpchar)
                                           ->  Hash Join
                                                 Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                                 ->  Seq Scan on supplier
                                                 ->  Hash
                                                       ->  Seq Scan on nation
                                     ->  Index Scan using idx_partsupp_suppkey on partsupp
                                           Index Cond: (ps_suppkey = supplier.s_suppkey)
                   SubPlan 1
                     ->  Aggregate
                           ->  Nested Loop
                                 Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                                 ->  Seq Scan on region region_1
                                       Filter: (r_name = 'AMERICA'::bpchar)
                                 ->  Nested Loop
                                       ->  Nested Loop
                                             ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                                   Index Cond: (part.p_partkey = ps_partkey)
                                             ->  Index Scan using supplier_pkey on supplier supplier_1
                                                   Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                       ->  Index Scan using nation_pkey on nation nation_1
                                             Index Cond: (n_nationkey = supplier_1.s_nationkey)
    

    “Merge Join”节点在“Gather Merge”上方。 因此,合并不使用并行执行。但是“Parallel Index Scan”节点仍然有助于part_pkey。

    分区智能连接(Partition-wise join)

    PostgreSQL 11默认禁用分区智能连接(partition-wise join)功能。分区智能联接的计划成本很高。分区相似的表的联接可以逐分区进行。这允许postgres使用较小的哈希表。每个分区联接操作都可以并行执行。

    tpch=# set enable_partitionwise_join=t;
    tpch=# explain (costs off) select * from prt1 t1, prt2 t2
    where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN                     
    ---------------------------------------------------
     Append
       ->  Hash Join
             Hash Cond: (t2.b = t1.a)
             ->  Seq Scan on prt2_p1 t2
                   Filter: ((b >= 0) AND (b <= 10000))
             ->  Hash
                   ->  Seq Scan on prt1_p1 t1
                         Filter: (b = 0)
       ->  Hash Join
             Hash Cond: (t2_1.b = t1_1.a)
             ->  Seq Scan on prt2_p2 t2_1
                   Filter: ((b >= 0) AND (b <= 10000))
             ->  Hash
                   ->  Seq Scan on prt1_p2 t1_1
                         Filter: (b = 0)
    tpch=# set parallel_setup_cost = 1;
    tpch=# set parallel_tuple_cost = 0.01;
    tpch=# explain (costs off) select * from prt1 t1, prt2 t2
    where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                            QUERY PLAN                         
    -----------------------------------------------------------
     Gather
       Workers Planned: 4
       ->  Parallel Append
             ->  Parallel Hash Join
                   Hash Cond: (t2_1.b = t1_1.a)
                   ->  Parallel Seq Scan on prt2_p2 t2_1
                         Filter: ((b >= 0) AND (b <= 10000))
                   ->  Parallel Hash
                         ->  Parallel Seq Scan on prt1_p2 t1_1
                               Filter: (b = 0)
             ->  Parallel Hash Join
                   Hash Cond: (t2.b = t1.a)
                   ->  Parallel Seq Scan on prt2_p1 t2
                         Filter: ((b >= 0) AND (b <= 10000))
                   ->  Parallel Hash
                         ->  Parallel Seq Scan on prt1_p1 t1
                               Filter: (b = 0)
    

    最重要的是,仅在分区足够大的情况下,分区智能联接才能使用并行执行。

     

    并行追加(Parallel Append)

    通常可以在UNION ALL查询中看到这一点。缺点是并行性差,因为每个work最终都是为单个查询工作。

    即使启用了四个worker,也只有两个被启动。

    tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                               QUERY PLAN                                           
    ------------------------------------------------------------------------------------------------
     Gather
       Workers Planned: 2
       ->  Parallel Append
             ->  Aggregate
                   ->  Seq Scan on lineitem
                         Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
             ->  Aggregate
                   ->  Seq Scan on lineitem lineitem_1
                         Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
    

      

    重要的变量

    ·work_mem

    ·max_parallel_workers_per_gather:执行器为每个并行执行的计划节点分配的worker数量

    ·max_worker_processes

    ·max_parallel_workers

    在9.6版本中,并行查询执行被引入;

    在10版本中,默认开启并行执行;

    在负载重的oltp系统上,建议关闭并行执行。

  • 相关阅读:
    牛客网Java刷题知识点之方法覆盖(方法重写)和方法重载的区别
    牛客网Java刷题知识点之自动拆装箱
    安装Phoenix时./sqlline.py执行报错File "./sqlline.py", line 27, in <module> import argparse ImportError: No module named argparse解决办法(图文详解)
    Apache-kylin-2.0.0-bin-hbase1x.tar.gz的下载与安装(图文详解)
    Apache Kylin的架构特性
    Apache Kylin Cube 的存储
    Apache Kylin Cube 的构建过程
    Apache Kylin的核心概念
    中央网络安全和信息化领导小组办公室
    中国智慧城市建设投资联盟
  • 原文地址:https://www.cnblogs.com/abclife/p/13952833.html
Copyright © 2011-2022 走看看