zoukankan      html  css  js  c++  java
  • PostgreSQL的并行查询

    PostgreSQL的并行化包含三个重要组件:进程本身(leader进程)、gather、workers。没有开启并行化的时候,进程自身处理所有的数据;一旦计划器决定某个查询或查询中部分可以使用并行的时候,就会在查询的并行化部分添加一个gather节点,将gather节点作为子查询树的根节点。

    查询执行是从leader进程开始。一旦开启了并行或查询中部分支持并行,就会分配一个gather节点和多个worker线程。relation的blocks在各个workers线程之间划分。workers的数量受postgresql的配置参数控制。workers之间使用共享内存相互协调和通信,一旦worker完成了自己的工作,结果就被传给了leader进程。

    workers和leader进程之间使用消息队列(依赖共享内存)进行通信。每个进程有两个队列:一个是error队列;一个是tuples队列。

     

     

    并行顺序扫描(Parallel sequential scan)

    在PostgreSQL 9.6中,增加了对并行顺序扫描的支持。顺序扫描是在表上进行的扫描,在该表中一个接一个的块顺序地被评估。就其本质而言,顺序扫描允许并行化。这样,整个表将在多个workers线程之间顺序扫描。

    并行顺序扫描快并不是因为可以并行地读,而是将数据分散到了多个cpu。

    abce=# explain analyze select work_hour from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                               QUERY PLAN                                                           
    --------------------------------------------------------------------------------------------------------------------------------
     Seq Scan on hh_adds_static_day  (cost=0.00..261864.36 rows=4241981 width=4) (actual time=0.012..1407.214 rows=4228109 loops=1)
       Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
       Rows Removed by Filter: 735600
     Planning Time: 0.108 ms
     Execution Time: 1585.835 ms
    (5 rows)
    
    abce=# 
    

    顺序扫描产生了大量的行,但是没有使用聚合函数。因此,查询使用的是单个cpu核心。

    增加一个sum函数后,很明显使用了两个工作线程,从而使得查询加速:

    abce=# explain analyze select sum(work_hour) from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                                            QUERY PLAN                                                                        
    ----------------------------------------------------------------------------------------------------------------------------------------------------------
     Finalize Aggregate  (cost=231089.60..231089.61 rows=1 width=4) (actual time=749.998..751.529 rows=1 loops=1)
       ->  Gather  (cost=231089.38..231089.59 rows=2 width=4) (actual time=749.867..751.515 rows=3 loops=1)
             Workers Planned: 2
             Workers Launched: 2
             ->  Partial Aggregate  (cost=230089.38..230089.39 rows=1 width=4) (actual time=746.463..746.464 rows=1 loops=3)
                   ->  Parallel Seq Scan on hh_adds_static_day  (cost=0.00..225670.65 rows=1767492 width=4) (actual time=0.032..489.501 rows=1409370 loops=3)
                         Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
                         Rows Removed by Filter: 245200
     Planning Time: 0.112 ms
     Execution Time: 751.611 ms
    (10 rows)
    
    abce=# 
    

    并行聚合(Parallel Aggregation)

    在数据库中,计算聚合是非常昂贵的操作。如果以单个进程进行执行,则这将花费相当长的时间。在PostgreSQL 9.6中,通过简单地将它们分成多个块(分而治之策略)来增加了并行计算的能力。多个worker线程执行聚合的部分,然后leader再根据它们的结果计算最终结果。

    从技术上讲,将Partial Aggregate节点添加到计划树中,并且每个Partial Aggregate节点包含一个worker线程的输出。然后将这些输出发送到Finalize Aggregate节点,该节点合并来自多个(所有)Partial Aggregate节点的聚合。如此有效的并行部分计划在根部包括一个Finalize Aggregate节点,以及一个将Partial Aggregate节点作为子节点的Gather节点。

    ''Parallel Seq Scan''节点生成用于部分聚合(''Partial Aggregate'')的行。

    ''Partial Aggregate''节点使用SUM()函数减少这些行。最后,由''Gather''节点收集每个worker的总和计数。

    ''Finalize Aggregate''节点计算最后的总和。如果你使用的自己的聚合函数,别忘了将其标记为''parallel safe''的。

    worker数量

    可以动态调整worker的数量:

    abce=# show max_parallel_workers_per_gather;
     max_parallel_workers_per_gather 
    ---------------------------------
     2
    (1 row)
    
    abce=# alter system set max_parallel_workers_per_gather=4;
    ALTER SYSTEM
    abce=# select * from pg_reload_conf();
     pg_reload_conf 
    ----------------
     t
    (1 row)
    
    abce=# explain analyze select sum(work_hour) from hh_adds_static_day where create_time <= date '20201010'-interval '10' day;
                                                                           QUERY PLAN                                                                        
    ---------------------------------------------------------------------------------------------------------------------------------------------------------
     Finalize Aggregate  (cost=218981.25..218981.26 rows=1 width=4) (actual time=473.424..475.156 rows=1 loops=1)
       ->  Gather  (cost=218980.83..218981.24 rows=4 width=4) (actual time=473.314..475.144 rows=5 loops=1)
             Workers Planned: 4
             Workers Launched: 4
             ->  Partial Aggregate  (cost=217980.83..217980.84 rows=1 width=4) (actual time=468.769..468.770 rows=1 loops=5)
                   ->  Parallel Seq Scan on hh_adds_static_day  (cost=0.00..215329.59 rows=1060495 width=4) (actual time=0.036..306.854 rows=845622 loops=5)
                         Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone)
                         Rows Removed by Filter: 147120
     Planning Time: 0.150 ms
     Execution Time: 475.218 ms
    (10 rows)
    
    abce=# 
    

    我们将数量从2变成了4。

    使用多少个worker进程

    首先,max_parallel_workers_per_gather参数定义了worker的最小数量。

    其次,查询执行器从池中获取worker的数量受限于max_parallel_workers的值。

    最后,最顶层的限制是max_worker_processes的值,该参数定义了后台worker进程的总数量。

    如果分配worker进程失败,就会切换成单进程执行。

    查询计划器会根据表或索引的大小,考虑减少worker进程的数量。受参数min_parallel_table_scan_size、min_parallel_index_scan_size影响。

    参数的默认设置:

    abce=# show min_parallel_table_scan_size;
     min_parallel_table_scan_size 
    ------------------------------
     8MB
    (1 row)
    
    abce=# show min_parallel_index_scan_size ;
     min_parallel_index_scan_size 
    ------------------------------
     512kB
    (1 row)
    
    abce=# 
    

    影响关系:

    set min_parallel_table_scan_size='8MB'
    8MB table => 1 worker
    24MB table => 2 workers
    72MB table => 3 workers
    x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
    

    每当表的大小是min_parallel(index|table)scan_size的三倍,postgresql就会增加一个worker进程。worker进程的数量不是基于cost的。

    在实践中,这些规则并不总是被遵守的,可以对特定的表执行alter table ... set (parallel_workers=N)进行设置。

     

    为什么并行执行没有被使用?

    除了并行执行的一些限制,postgresql也检查成本(cost):

    ·parallel_setup_cost避免了对小的查询采用并行执行。它对用于内存设置、进程启动和初始通信的时间进行建模

    ·parallel_tuple_cost:leader进程和worker进程之间的通信会花费很长的时间。时间与worker发送的元组数量成比例。该参数对通信成本进行建模。

     

    嵌套循环连接(Nested loop joins)

    从9.6版本开始,postgresql支持对“Nested loop”执行并行操作:

    explain (costs off) select c_custkey, count(o_orderkey)
                    from    customer left outer join orders on
                                    c_custkey = o_custkey and o_comment not like '%special%deposits%'
                    group by c_custkey;
                                          QUERY PLAN                                      
    --------------------------------------------------------------------------------------
     Finalize GroupAggregate
       Group Key: customer.c_custkey
       ->  Gather Merge
             Workers Planned: 4
             ->  Partial GroupAggregate
                   Group Key: customer.c_custkey
                   ->  Nested Loop Left Join
                         ->  Parallel Index Only Scan using customer_pkey on customer
                         ->  Index Scan using idx_orders_custkey on orders
                               Index Cond: (customer.c_custkey = o_custkey)
                               Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
    

    gather发生在最后阶段,因此“Nested Loop Left Join”是并行操作。从版本10开始提供“Parallel Index Only Scan”。其行为与并行顺序扫描类似。条件c_custkey = o_custkey为每个客户行读取一个订单。 因此,它不是并行的。

     

    哈希连接(Hash Join)

    在PostgreSQL 11之前,每个worker都构建自己的哈希表。因此,4个以上的workers进程无法提高性能。新的实现使用一个共享哈希表。每个worker都可以利用WORK_MEM来构建哈希表。

    select
            l_shipmode,
            sum(case
                    when o_orderpriority = '1-URGENT'
                            or o_orderpriority = '2-HIGH'
                            then 1
                    else 0
            end) as high_line_count,
            sum(case
                    when o_orderpriority <> '1-URGENT'
                            and o_orderpriority <> '2-HIGH'
                            then 1
                    else 0
            end) as low_line_count
    from
            orders,
            lineitem
    where
            o_orderkey = l_orderkey
            and l_shipmode in ('MAIL', 'AIR')
            and l_commitdate < l_receiptdate
            and l_shipdate < l_commitdate
            and l_receiptdate >= date '1996-01-01'
            and l_receiptdate < date '1996-01-01' + interval '1' year
    group by
            l_shipmode
    order by
            l_shipmode
    LIMIT 1;
    
                                                                                                                                        QUERY PLAN                                               
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
       ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
             Group Key: lineitem.l_shipmode
             ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
                   Workers Planned: 4
                   Workers Launched: 4
                   ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                         Group Key: lineitem.l_shipmode
                         ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                               Sort Key: lineitem.l_shipmode
                               Sort Method: external merge  Disk: 2304kB
                               Worker 0:  Sort Method: external merge  Disk: 2064kB
                               Worker 1:  Sort Method: external merge  Disk: 2384kB
                               Worker 2:  Sort Method: external merge  Disk: 2264kB
                               Worker 3:  Sort Method: external merge  Disk: 2336kB
                               ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                     Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                     ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                           Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                           Rows Removed by Filter: 11934691
                                     ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                           Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                           ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
     Planning Time: 0.977 ms
     Execution Time: 7923.770 ms
    

    这里每个worker帮助构建一个共享的hash表。

     

    合并连接(Merge Join)

    鉴于合并连接的自身特性,使其不支持并行查询。如果合并连接是查询执行的最后一个阶段-你仍可以看到并行执行。

    -- Query 2 from TPC-H
    explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
    from    part, supplier, partsupp, nation, region
    where
            p_partkey = ps_partkey
            and s_suppkey = ps_suppkey
            and p_size = 36
            and p_type like '%BRASS'
            and s_nationkey = n_nationkey
            and n_regionkey = r_regionkey
            and r_name = 'AMERICA'
            and ps_supplycost = (
                    select
                            min(ps_supplycost)
                    from    partsupp, supplier, nation, region
                    where
                            p_partkey = ps_partkey
                            and s_suppkey = ps_suppkey
                            and s_nationkey = n_nationkey
                            and n_regionkey = r_regionkey
                            and r_name = 'AMERICA'
            )
    order by s_acctbal desc, n_name, s_name, p_partkey
    LIMIT 100;
                                                    QUERY PLAN                                                
    ----------------------------------------------------------------------------------------------------------
     Limit
       ->  Sort
             Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
             ->  Merge Join
                   Merge Cond: (part.p_partkey = partsupp.ps_partkey)
                   Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
                   ->  Gather Merge
                         Workers Planned: 4
                         ->  Parallel Index Scan using <strong>part_pkey</strong> on part
                               Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
                   ->  Materialize
                         ->  Sort
                               Sort Key: partsupp.ps_partkey
                               ->  Nested Loop
                                     ->  Nested Loop
                                           Join Filter: (nation.n_regionkey = region.r_regionkey)
                                           ->  Seq Scan on region
                                                 Filter: (r_name = 'AMERICA'::bpchar)
                                           ->  Hash Join
                                                 Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                                 ->  Seq Scan on supplier
                                                 ->  Hash
                                                       ->  Seq Scan on nation
                                     ->  Index Scan using idx_partsupp_suppkey on partsupp
                                           Index Cond: (ps_suppkey = supplier.s_suppkey)
                   SubPlan 1
                     ->  Aggregate
                           ->  Nested Loop
                                 Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                                 ->  Seq Scan on region region_1
                                       Filter: (r_name = 'AMERICA'::bpchar)
                                 ->  Nested Loop
                                       ->  Nested Loop
                                             ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                                   Index Cond: (part.p_partkey = ps_partkey)
                                             ->  Index Scan using supplier_pkey on supplier supplier_1
                                                   Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                       ->  Index Scan using nation_pkey on nation nation_1
                                             Index Cond: (n_nationkey = supplier_1.s_nationkey)
    

    “Merge Join”节点在“Gather Merge”上方。 因此,合并不使用并行执行。但是“Parallel Index Scan”节点仍然有助于part_pkey。

    分区智能连接(Partition-wise join)

    PostgreSQL 11默认禁用分区智能连接(partition-wise join)功能。分区智能联接的计划成本很高。分区相似的表的联接可以逐分区进行。这允许postgres使用较小的哈希表。每个分区联接操作都可以并行执行。

    tpch=# set enable_partitionwise_join=t;
    tpch=# explain (costs off) select * from prt1 t1, prt2 t2
    where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN                     
    ---------------------------------------------------
     Append
       ->  Hash Join
             Hash Cond: (t2.b = t1.a)
             ->  Seq Scan on prt2_p1 t2
                   Filter: ((b >= 0) AND (b <= 10000))
             ->  Hash
                   ->  Seq Scan on prt1_p1 t1
                         Filter: (b = 0)
       ->  Hash Join
             Hash Cond: (t2_1.b = t1_1.a)
             ->  Seq Scan on prt2_p2 t2_1
                   Filter: ((b >= 0) AND (b <= 10000))
             ->  Hash
                   ->  Seq Scan on prt1_p2 t1_1
                         Filter: (b = 0)
    tpch=# set parallel_setup_cost = 1;
    tpch=# set parallel_tuple_cost = 0.01;
    tpch=# explain (costs off) select * from prt1 t1, prt2 t2
    where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                            QUERY PLAN                         
    -----------------------------------------------------------
     Gather
       Workers Planned: 4
       ->  Parallel Append
             ->  Parallel Hash Join
                   Hash Cond: (t2_1.b = t1_1.a)
                   ->  Parallel Seq Scan on prt2_p2 t2_1
                         Filter: ((b >= 0) AND (b <= 10000))
                   ->  Parallel Hash
                         ->  Parallel Seq Scan on prt1_p2 t1_1
                               Filter: (b = 0)
             ->  Parallel Hash Join
                   Hash Cond: (t2.b = t1.a)
                   ->  Parallel Seq Scan on prt2_p1 t2
                         Filter: ((b >= 0) AND (b <= 10000))
                   ->  Parallel Hash
                         ->  Parallel Seq Scan on prt1_p1 t1
                               Filter: (b = 0)
    

    最重要的是,仅在分区足够大的情况下,分区智能联接才能使用并行执行。

     

    并行追加(Parallel Append)

    通常可以在UNION ALL查询中看到这一点。缺点是并行性差,因为每个work最终都是为单个查询工作。

    即使启用了四个worker,也只有两个被启动。

    tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                               QUERY PLAN                                           
    ------------------------------------------------------------------------------------------------
     Gather
       Workers Planned: 2
       ->  Parallel Append
             ->  Aggregate
                   ->  Seq Scan on lineitem
                         Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
             ->  Aggregate
                   ->  Seq Scan on lineitem lineitem_1
                         Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
    

      

    重要的变量

    ·work_mem

    ·max_parallel_workers_per_gather:执行器为每个并行执行的计划节点分配的worker数量

    ·max_worker_processes

    ·max_parallel_workers

    在9.6版本中,并行查询执行被引入;

    在10版本中,默认开启并行执行;

    在负载重的oltp系统上,建议关闭并行执行。

  • 相关阅读:
    Sprinig.net 双向绑定 Bidirectional data binding and data model management 和 UpdatePanel
    Memcached是什么
    Spring.net 网络示例 codeproject
    jquery.modalbox.show 插件
    UVA 639 Don't Get Rooked
    UVA 539 The Settlers of Catan
    UVA 301 Transportation
    UVA 331 Mapping the Swaps
    UVA 216 Getting in Line
    UVA 10344 23 out of 5
  • 原文地址:https://www.cnblogs.com/abclife/p/13952833.html
Copyright © 2011-2022 走看看