zoukankan      html  css  js  c++  java
  • Oracle 11g: DBMS_PARALLEL_EXECUTE

    DBMS_PARALLEL_EXECUTE

    The DBMS_PARALLEL_EXECUTE package allows a workload associated with a base table to be broken down into smaller chunks which can be run in parallel. This process involves several distinct stages. At the end of the article there are some complete examples, using some of the techniques discussed below.
    The user controlling the process needs the CREATE JOB privilege.
    CONN / AS SYSDBA
    GRANT CREATE JOB TO test;
    The examples used in this article require the following table to be created and populated.
    CONN test/test
    
    DROP TABLE test_tab;
    
    CREATE TABLE test_tab (
      id          NUMBER,
      description VARCHAR2(50),
      num_col     NUMBER,
      CONSTRAINT test_tab_pk PRIMARY KEY (id)
    );
    
    INSERT /*+ APPEND */ INTO test_tab
    SELECT level,
           'Description for ' || level,
           CASE
             WHEN MOD(level, 5) = 0 THEN 10
             WHEN MOD(level, 3) = 0 THEN 20
             ELSE 30
           END
    FROM   dual
    CONNECT BY level <= 500000;
    COMMIT;
    
    EXEC DBMS_STATS.gather_table_stats(USER, 'TEST_TAB', cascade => TRUE);
    
    SELECT num_col, COUNT(*)
    FROM   test_tab
    GROUP BY num_col
    ORDER BY num_col;
    
       NUM_COL   COUNT(*)
    ---------- ----------
            10     100000
            20     133333
            30     266667
    
    SQL>

    Create a task

    The CREATE_TASK procedure is used to create a new task. It requires a task name to be specified, but can also include an optional task comment.
    BEGIN
      DBMS_PARALLEL_EXECUTE.create_task (task_name => 'test_task');
    END;
    /
    Information about existing tasks is displayed using the [DBA|USER]_PARALLEL_EXECUTE_TASKS views.
    COLUMN task_name FORMAT A10
    SELECT task_name,
           status
    FROM   user_parallel_execute_tasks;
    
    TASK_NAME  STATUS
    ---------- -------------------
    test_task  CREATED
    
    SQL>
    The GENERATE_TASK_NAME function returns a unique task name if you do not want to name the task manually.
    SELECT DBMS_PARALLEL_EXECUTE.generate_task_name
    FROM   dual;
    
    GENERATE_TASK_NAME
    --------------------------------------------------------------------------------
    TASK$_726
    
    SQL>

    Split the workload into chunks

    The workload is associated with a base table, which can be split into subsets or chunks of rows. There are three methods of splitting the workload into chunks. The chunks associated with a task can be dropped using the DROP_CHUNKS procedure.
    CREATE_CHUNKS_BY_ROWID
    The CREATE_CHUNKS_BY_ROWID procedure splits the data by rowid into chunks specified by the CHUNK_SIZE parameter. If the BY_ROW parameter is set to TRUE, the CHUNK_SIZE refers to the number of rows, otherwise it refers to the number of blocks.
    BEGIN
      DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name   => 'test_task',
                                                   table_owner => 'TEST',
                                                   table_name  => 'TEST_TAB',
                                                   by_row      => TRUE,
                                                   chunk_size  => 10000);
    END;
    /
    Once the operation is complete the task status is changed to 'CHUNKED'.
    COLUMN task_name FORMAT A10
    
    SELECT task_name,
           status
    FROM   user_parallel_execute_tasks;
    
    TASK_NAME  STATUS
    ---------- -------------------
    test_task  CHUNKED
    
    SQL>
    The [DBA|USER]_PARALLEL_EXECUTE_CHUNKS views display information about the individual chunks.
    SELECT chunk_id, status, start_rowid, end_rowid
    FROM   user_parallel_execute_chunks
    WHERE  task_name = 'test_task'
    ORDER BY chunk_id;
    
      CHUNK_ID STATUS               START_ROWID        END_ROWID
    ---------- -------------------- ------------------ ------------------
           287 UNASSIGNED           AAASjoAAEAAAAIwAAA AAASjoAAEAAAAI3CcP
           288 UNASSIGNED           AAASjoAAEAAAAI4AAA AAASjoAAEAAAAI/CcP
    ...
           450 UNASSIGNED           AAASjoAAEAAAAIIAAA AAASjoAAEAAAAIPCcP
           451 UNASSIGNED           AAASjoAAEAAAAIoAAA AAASjoAAEAAAAIvCcP
    
    88 rows selected.
    
    SQL>
    CREATE_CHUNKS_BY_NUMBER_COL
    The CREATE_CHUNKS_BY_NUMBER_COL procedure divides the workload up based on a number column. It uses the specified columns min and max values along with the chunk size to split the data into approximately equal chunks. For the chunks to be equally sized the column must contain a continuous sequence of numbers, like that generated by a sequence.
    BEGIN
      DBMS_PARALLEL_EXECUTE.create_chunks_by_number_col(task_name    => 'test_task',
                                                        table_owner  => 'TEST',
                                                        table_name   => 'TEST_TAB',
                                                        table_column => 'ID',
                                                        chunk_size   => 10000);
    END;
    /
    The [DBA|USER]_PARALLEL_EXECUTE_CHUNKS views display information about the individual chunks.
    SELECT chunk_id, status, start_id, end_id
    FROM   user_parallel_execute_chunks
    WHERE  task_name = 'test_task'
    ORDER BY chunk_id;
    
      CHUNK_ID STATUS                 START_ID     END_ID
    ---------- -------------------- ---------- ----------
           600 UNASSIGNED                    1      10000
           601 UNASSIGNED                10001      20000
     ...
           648 UNASSIGNED               480001     490000
           649 UNASSIGNED               490001     500000
    
    50 rows selected.
    
    SQL>
    CREATE_CHUNKS_BY_SQL
    The CREATE_CHUNKS_BY_SQL procedure divides the workload based on a user-defined query. If the BY_ROWID parameter is set to TRUE, the query must return a series of start and end rowids. If it's set to FALSE, the query must return a series of start and end IDs.
    DECLARE
      l_stmt CLOB;
    BEGIN
      l_stmt := 'SELECT DISTINCT num_col, num_col FROM test_tab';
    
      DBMS_PARALLEL_EXECUTE.create_chunks_by_sql(task_name => 'test_task',
                                                 sql_stmt  => l_stmt,
                                                 by_rowid  => FALSE);
    END;
    /
    The [DBA|USER]_PARALLEL_EXECUTE_CHUNKS views display information about the individual chunks.
    SELECT chunk_id, status, start_id, end_id
    FROM   user_parallel_execute_chunks
    WHERE  task_name = 'test_task'
    ORDER BY chunk_id;
    
      CHUNK_ID STATUS                 START_ID     END_ID
    ---------- -------------------- ---------- ----------
           650 UNASSIGNED                   10         10
           651 UNASSIGNED                   30         30
           652 UNASSIGNED                   20         20
    
    3 rows selected.
    
    SQL>

    Run the task

    Running a task involves running a specific statement for each defined chunk of work. The documentation only shows examples using updates of the base table, but this is not the only use of this functionality. The statement associated with the task can be a procedure call, as shown in one of the examples at the end of the article.
    There are two ways to run a task and several procedures to control a running task.

    RUN_TASK

    The RUN_TASK procedure runs the specified statement in parallel by scheduling jobs to process the workload chunks. The statement specifying the actual work to be done must include a reference to the ':start_id' and ':end_id', which represent a range of rowids or column IDs to be processed, as specified in the chunk definitions. The degree of parallelism is controlled by the number of scheduled jobs, not the number of chunks defined. The scheduled jobs take an unassigned workload chunk, process it, then move on to the next unassigned chunk.
    DECLARE
      l_sql_stmt VARCHAR2(32767);
    BEGIN
      l_sql_stmt := 'UPDATE /*+ ROWID (dda) */ test_tab t 
                     SET    t.num_col = t.num_col + 10
                     WHERE rowid BETWEEN :start_id AND :end_id';
    
      DBMS_PARALLEL_EXECUTE.run_task(task_name      => 'test_task',
                                     sql_stmt       => l_sql_stmt,
                                     language_flag  => DBMS_SQL.NATIVE,
                                     parallel_level => 10);
    END;
    /
    The RUN_TASK procedure waits for the task to complete. On completion, the status of the task must be assessed to know what action to take next.

    User-defined framework

    The DBMS_PARALLEL_EXECUTE package allows you to manually code the task run. The GET_ROWID_CHUNK and GET_NUMBER_COL_CHUNK procedures return the next available unassigned chunk. You can than manually process the chunk and set its status. The example below shows the processing of a workload chunked by rowid.
    DECLARE
      l_sql_stmt    VARCHAR2(32767);
      l_chunk_id    NUMBER;
      l_start_rowid ROWID;
      l_end_rowid   ROWID;
      l_any_rows    BOOLEAN;
    BEGIN
      l_sql_stmt := 'UPDATE /*+ ROWID (dda) */ test_tab t 
                     SET    t.num_col = t.num_col + 10
                     WHERE rowid BETWEEN :start_id AND :end_id';
     
      LOOP
        -- Get next unassigned chunk.
        DBMS_PARALLEL_EXECUTE.get_rowid_chunk(task_name   => 'test_task',
                                              chunk_id    => l_chunk_id,
                                              start_rowid => l_start_rowid,
                                              end_rowid   => l_end_rowid,
                                              any_rows    => l_any_rows);
    
        EXIT WHEN l_any_rows = FALSE;
     
        BEGIN
          -- Manually execute the work.
          EXECUTE IMMEDIATE l_sql_stmt USING l_start_rowid, l_end_rowid;
    
          -- Set the chunk status as processed.
          DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => 'test_task',
                                                 chunk_id  => l_chunk_id,
                                                 status    => DBMS_PARALLEL_EXECUTE.PROCESSED);
          EXCEPTION
            WHEN OTHERS THEN
              -- Record chunk error.
              DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => 'test_task',
                                                     chunk_id  => l_chunk_id,
                                                     status    => DBMS_PARALLEL_EXECUTE.PROCESSED_WITH_ERROR,
                                                     err_num   => SQLCODE,
                                                     err_msg   => SQLERRM);
        END;
    
        -- Commit work.
        COMMIT;
      END LOOP;
    END;
    /

    Task control

    A running task can be stopped and restarted using the STOP_TASK and RESUME_TASK procedures respectively.
    The PURGE_PROCESSED_CHUNKS procedure deletes all chunks with a status of 'PROCESSED' or 'PROCESSED_WITH_ERROR'.
    The ADM_DROP_CHUNKS, ADM_DROP_TASK, ADM_TASK_STATUS and ADM_STOP_TASK routines have the same function as their namesakes, but they allow the operations to performed on tasks owned by other users. In order to use these routines the user must have been granted the ADM_PARALLEL_EXECUTE_TASK role.

    Check the task status

    The simplest way to check the status of a task is to use the TASK_STATUS function. After execution of the task, the only possible return values are the 'FINISHED' or 'FINISHED_WITH_ERROR' constants. If the status is not 'FINISHED', then the task can be resumed using the RESUME_TASK procedure.
    DECLARE
      l_try NUMBER;
      l_status NUMBER;
    BEGIN
      -- If there is error, RESUME it for at most 2 times.
      l_try := 0;
      l_status := DBMS_PARALLEL_EXECUTE.task_status('test_task');
      WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) 
      Loop
        l_try := l_try + 1;
        DBMS_PARALLEL_EXECUTE.resume_task('test_task');
        l_status := DBMS_PARALLEL_EXECUTE.task_status('test_task');
      END LOOP;
    END;
    /
    The status of the task and the chunks can also be queried.
    COLUMN task_name FORMAT A10
    SELECT task_name,
           status
    FROM   user_parallel_execute_tasks;
    
    TASK_NAME  STATUS
    ---------- -------------------
    test_task  FINISHED
    
    SQL>
    If there were errors, the chunks can be queried to identify the problems.
    SELECT status, COUNT(*)
    FROM   user_parallel_execute_chunks
    GROUP BY status
    ORDER BY status;
    
    STATUS                 COUNT(*)
    -------------------- ----------
    PROCESSED                    88
    
    SQL>
    The [DBA|USER]_PARALLEL_EXECUTE_TASKS views contain a record of the JOB_PREFIX used when scheduling the chunks of work.
    SELECT job_prefix
    FROM   user_parallel_execute_tasks
    WHERE  task_name = 'test_task';
    
    JOB_PREFIX
    ------------------------------
    TASK$_368
    
    SQL>
    This value can be used to query information about the individual jobs used during the process. The number of jobs scheduled should match the degree of parallelism specified in the RUN_TASK procedure.
    COLUMN job_name FORMAT A20
    
    SELECT job_name, status
    FROM   user_scheduler_job_run_details
    WHERE  job_name LIKE (SELECT job_prefix || '%'
                          FROM   user_parallel_execute_tasks
                          WHERE  task_name = 'test_task');
    
    JOB_NAME             STATUS
    -------------------- ------------------------------
    TASK$_368_1          SUCCEEDED
    TASK$_368_6          SUCCEEDED
    TASK$_368_2          SUCCEEDED
    TASK$_368_9          SUCCEEDED
    TASK$_368_10         SUCCEEDED
    TASK$_368_8          SUCCEEDED
    TASK$_368_7          SUCCEEDED
    TASK$_368_4          SUCCEEDED
    TASK$_368_5          SUCCEEDED
    TASK$_368_3          SUCCEEDED
    
    10 rows selected.
    
    SQL>

    Drop the task

    Once the job is complete you can drop the task, which will drop the associated chunk information also.
    BEGIN
      DBMS_PARALLEL_EXECUTE.drop_task('test_task');
    END;
    /

    Complete examples

    The following example shows the processing of a workload chunked by rowid.
    DECLARE
      l_task     VARCHAR2(30) := 'test_task';
      l_sql_stmt VARCHAR2(32767);
      l_try      NUMBER;
      l_status   NUMBER;
    BEGIN
      DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task);
    
      DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name   => l_task,
                                                   table_owner => 'TEST',
                                                   table_name  => 'TEST_TAB',
                                                   by_row      => TRUE,
                                                   chunk_size  => 10000);
    
      l_sql_stmt := 'UPDATE /*+ ROWID (dda) */ test_tab t 
                     SET    t.num_col = t.num_col + 10
                     WHERE rowid BETWEEN :start_id AND :end_id';
    
      DBMS_PARALLEL_EXECUTE.run_task(task_name      => l_task,
                                     sql_stmt       => l_sql_stmt,
                                     language_flag  => DBMS_SQL.NATIVE,
                                     parallel_level => 10);
    
      -- If there is error, RESUME it for at most 2 times.
      l_try := 0;
      l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
      WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) 
      Loop
        l_try := l_try + 1;
        DBMS_PARALLEL_EXECUTE.resume_task(l_task);
        l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
      END LOOP;
    
      DBMS_PARALLEL_EXECUTE.drop_task(l_task);
    END;
    /
    The following example shows the processing of a workload chunked by a number column. Notice that the workload is actually a stored procedure in this case.
    CREATE OR REPLACE PROCEDURE process_update (p_start_id IN NUMBER, p_end_id IN NUMBER) AS
    BEGIN
      UPDATE /*+ ROWID (dda) */ test_tab t 
      SET    t.num_col = t.num_col + 10
      WHERE id BETWEEN p_start_id AND p_end_id;
    END;
    /
    
    DECLARE
      l_task     VARCHAR2(30) := 'test_task';
      l_sql_stmt VARCHAR2(32767);
      l_try      NUMBER;
      l_status   NUMBER;
    BEGIN
      DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task);
    
      DBMS_PARALLEL_EXECUTE.create_chunks_by_number_col(task_name    => l_task,
                                                        table_owner  => 'TEST',
                                                        table_name   => 'TEST_TAB',
                                                        table_column => 'ID',
                                                        chunk_size   => 10000);
    
      l_sql_stmt := 'BEGIN process_update(:start_id, :end_id); END;';
    
      DBMS_PARALLEL_EXECUTE.run_task(task_name      => l_task,
                                     sql_stmt       => l_sql_stmt,
                                     language_flag  => DBMS_SQL.NATIVE,
                                     parallel_level => 10);
    
      -- If there is error, RESUME it for at most 2 times.
      l_try := 0;
      l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
      WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) 
      Loop
        l_try := l_try + 1;
        DBMS_PARALLEL_EXECUTE.resume_task(l_task);
        l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
      END LOOP;
    
      DBMS_PARALLEL_EXECUTE.drop_task(l_task);
    END;
    /
    The following example shows a workload chunked by an SQL statement and processed by a user-defined framework.
    DECLARE
      l_task     VARCHAR2(30) := 'test_task';
      l_stmt     CLOB;
      l_sql_stmt VARCHAR2(32767);
      l_chunk_id NUMBER;
      l_start_id NUMBER;
      l_end_id   NUMBER;
      l_any_rows BOOLEAN;
    BEGIN
      DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task);
    
      l_stmt := 'SELECT DISTINCT num_col, num_col FROM test_tab';
    
      DBMS_PARALLEL_EXECUTE.create_chunks_by_sql(task_name => l_task,
                                                 sql_stmt  => l_stmt,
                                                 by_rowid  => FALSE);
    
      l_sql_stmt := 'UPDATE /*+ ROWID (dda) */ test_tab t 
                     SET    t.num_col = t.num_col
                     WHERE num_col BETWEEN :start_id AND :end_id';
    
      LOOP
        -- Get next unassigned chunk.
        DBMS_PARALLEL_EXECUTE.get_number_col_chunk(task_name => 'test_task',
                                                   chunk_id    => l_chunk_id,
                                                   start_id    => l_start_id,
                                                   end_id      => l_end_id,
                                                   any_rows    => l_any_rows);
    
        EXIT WHEN l_any_rows = FALSE;
     
        BEGIN
          -- Manually execute the work.
          EXECUTE IMMEDIATE l_sql_stmt USING l_start_id, l_end_id;
    
          -- Set the chunk status as processed.
          DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => 'test_task',
                                                 chunk_id  => l_chunk_id,
                                                 status    => DBMS_PARALLEL_EXECUTE.PROCESSED);
          EXCEPTION
            WHEN OTHERS THEN
              -- Record chunk error.
              DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name => 'test_task',
                                                     chunk_id  => l_chunk_id,
                                                     status    => DBMS_PARALLEL_EXECUTE.PROCESSED_WITH_ERROR,
                                                     err_num   => SQLCODE,
                                                     err_msg   => SQLERRM);
        END;
    
        -- Commit work.
        COMMIT;
      END LOOP;
    
      DBMS_PARALLEL_EXECUTE.drop_task(l_task);
    END;
    /
  • 相关阅读:
    LeetCode 面试题32
    LeetCode 102. 二叉树的层序遍历
    LeetCode 面试题32
    LeetCode 面试题32
    LeetCode 面试题31. 栈的压入、弹出序列
    LeetCode 946. 验证栈序列
    LeetCode 50. Pow(x, n)
    LeetCode 572. 另一个树的子树
    LeetCode 面试题50. 第一个只出现一次的字符
    LeetCode 面试题37. 序列化二叉树
  • 原文地址:https://www.cnblogs.com/tracy/p/2076324.html
Copyright © 2011-2022 走看看