PROCEDURE PROC_START_WF_JOB(IN_WF_PARAM IN OUT PROC_WF_PARAM_TYPE)
AS
V_RUNCNT NUMBER(8);
V_PROGRAMNAME VARCHAR2(30);
V_JOBNAME VARCHAR2(30);
V_PROCNAME VARCHAR2(50);
V_PROGRAMCNT PLS_INTEGER;
V_JOBCNT PLS_INTEGER;
V_JOB_PROCNAME VARCHAR2(50);
--允许并行的JOB数
V_PARALLEL_RUNCNT PLS_INTEGER := PKG_WORKFLOW.FUNC_GET_DICT_VALUE('WORKFLOW','JOB_CONTROL','PARALLEL_RUNCNT');
--可运行的JOB=允许的job并行数-正在运行的job
CURSOR REC_QUEUE_JOBS IS
SELECT *
FROM(SELECT P.WF_INSTID, P.WF_CODE, P.NODE_CODE, Q.LAYER, Q.PRIORITY, Q.SP_NAME, Q.FREQ
FROM (SELECT T.WF_INSTID,
T.WF_CODE,
T.NODE_CODE,
T.QUEUE_ORDER
FROM WF_NODE_QUEUE T
WHERE T.WF_INSTID = IN_WF_PARAM.IN_WF_INSTID
AND T.WF_CODE = IN_WF_PARAM.IN_WF_CODE
AND NOT EXISTS(SELECT 1
FROM WF_NODE_TIMELINE T1
WHERE T1.WF_INSTID = T.WF_INSTID
AND T1.WF_CODE = T.WF_CODE
AND T1.NODE_CODE = T.NODE_CODE
AND T1.STATUS = PKG_WORKFLOW.FUNC_GET_DICT_VALUE('WORKFLOW','NODE_STATUS','003'))
) P, WF_NODE_CONF Q
WHERE P.WF_CODE = Q.WF_CODE
AND P.NODE_CODE = Q.CODE
ORDER BY QUEUE_ORDER)
WHERE ROWNUM <= V_PARALLEL_RUNCNT - V_RUNCNT;
BEGIN
--正在运行的JOB
SELECT COUNT(1)
INTO V_RUNCNT
FROM WF_NODE_TIMELINE T
WHERE T.WF_INSTID = IN_WF_PARAM.IN_WF_INSTID
AND T.WF_CODE = IN_WF_PARAM.IN_WF_CODE
AND T.STATUS = PKG_WORKFLOW.FUNC_GET_DICT_VALUE('WORKFLOW','NODE_STATUS','003');
FOR REC_JOBS IN REC_QUEUE_JOBS
LOOP
--CREATE PROGRAM/ARGUMENT/JOB
--USE_CURRENT_SESSION SET FALSE TO LET JOB RUN AT BACKGROUND
V_PROGRAMNAME := SUBSTR('PRO_' || REC_JOBS.NODE_CODE, 1, 30);
V_JOBNAME := SUBSTR('JOB_' || REC_JOBS.NODE_CODE, 1, 30);
--判断是否已经存在相同的PROGRAM/JOB
SELECT COUNT(1)
INTO V_JOBCNT
FROM USER_SCHEDULER_JOBS T
WHERE T.JOB_NAME = V_JOBNAME;
SELECT COUNT(1)
INTO V_PROGRAMCNT
FROM USER_SCHEDULER_PROGRAMS T
WHERE T.PROGRAM_NAME = V_PROGRAMNAME;
--DROP JOB THEN PROGRAM
IF V_JOBCNT > 0 THEN
DBMS_SCHEDULER.DROP_JOB(V_JOBNAME);
END IF;
IF V_PROGRAMCNT > 0 THEN
DBMS_SCHEDULER.DROP_PROGRAM(V_PROGRAMNAME);
END IF;
IF REC_JOBS.LAYER IN('ODS','IN')
THEN
V_PROCNAME := 'PKG_WORKFLOW.PROC_EXEC_ODS_IN_JOB_CALL';
ELSE
V_PROCNAME := REC_JOBS.SP_NAME;
END IF;
--初始化参数
IN_WF_PARAM.IN_IMPORT_DT := FUNC_GET_IMPORT_DATE(IN_WF_PARAM.IN_BUSI_DT,REC_JOBS.FREQ);
IN_WF_PARAM.IN_NODE_CODE := REC_JOBS.NODE_CODE;
IN_WF_PARAM.IN_NODE_TIER := REC_JOBS.LAYER;
IN_WF_PARAM.IN_NODE_PROC := V_PROCNAME;
V_JOB_PROCNAME := 'PKG_WORKFLOW.PROC_EXEC_JOB_PROC_CALL';
--1.创建JOB对应的PROGRAM和参数
DBMS_SCHEDULER.CREATE_PROGRAM(PROGRAM_NAME => V_PROGRAMNAME,
PROGRAM_TYPE => 'STORED_PROCEDURE',
PROGRAM_ACTION => V_JOB_PROCNAME,
NUMBER_OF_ARGUMENTS => 1,
ENABLED => FALSE);
DBMS_SCHEDULER.DEFINE_ANYDATA_ARGUMENT(PROGRAM_NAME => V_PROGRAMNAME,
ARGUMENT_POSITION => 1,
ARGUMENT_NAME => 'IN_WF_PARAM',
ARGUMENT_TYPE => 'SYS.ANYDATA',
DEFAULT_VALUE => SYS.ANYDATA.CONVERTOBJECT(IN_WF_PARAM));
--2.创建JOB one-time-run
DBMS_SCHEDULER.CREATE_JOB(JOB_NAME => V_JOBNAME,
PROGRAM_NAME => V_PROGRAMNAME,
--START_DATE => SYSDATE,
--REPEAT_INTERVAL => 'freq=weekly; byday=wed',
JOB_CLASS =>'DEFAULT_JOB_CLASS',
ENABLED => FALSE,
AUTO_DROP => TRUE);
--3.ENABLE PROGRAM AND JOB
DBMS_SCHEDULER.ENABLE(V_PROGRAMNAME);
DBMS_SCHEDULER.ENABLE(V_JOBNAME);
--更新TIMELINE status
UPDATE WF_NODE_TIMELINE T
SET T.STATUS = PKG_WORKFLOW.FUNC_GET_DICT_VALUE('WORKFLOW','NODE_STATUS','003'),
T.START_TIME = TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSSSI'),
T.REMARK = 'Running at ' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSSSI')
WHERE T.WF_INSTID = IN_WF_PARAM.IN_WF_INSTID
AND T.WF_CODE = IN_WF_PARAM.IN_WF_CODE
AND T.NODE_CODE = REC_JOBS.NODE_CODE;
COMMIT;
--4.RUN JOB
--CAN'T RUN DIRECTLY HERE,USE_CURRENT_SESSION => FALSE
--DBMS_SCHEDULER.RUN_JOB(JOB_NAME => V_JOBNAME,USE_CURRENT_SESSION => TRUE);
--5.DISABLE PROGRAM AND JOB
DBMS_SCHEDULER.DISABLE(V_JOBNAME);
DBMS_SCHEDULER.DISABLE(V_PROGRAMNAME);
END LOOP;
EXCEPTION
WHEN OTHERS THEN
IN_WF_PARAM.OUT_RET := 1;
IN_WF_PARAM.OUT_RET_MSG := SQLERRM;
END;