Running PL/SQL calls in parallel and wait for execution to finish (fork and join)

12,872

Solution 1

I solved the problem using DBMS_SCHEDULER and PIPEs for synchronization/IPC that does not rely on polling and does not need additional tables. It still wakes once per finished job, though.

It's quite some effort, so if some can propose a simpler solution please share it!

  1. Define a procedure that calls the actual procedure that can be run from a program/job and handles IPC (write message to pipe when finished).
  2. Define a program that calls this procedure and defines arguments to be passed to the procedure
  3. Define a procedure that creates a job from the program, maps parameters to job arguments and runs the job
  4. Define logic that waits for all jobs to finish: Wait until every job has sent a message on the pipe.
--
-- Define stored procedures to be executed by job
--

/** Actual method that should be run in parallel*/
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS(
    in_job IN VARCHAR2)
IS
BEGIN
   -- Imagine a lot of IF, ELSEIF, ELSE statements here
  DELETE FROM TEST_BONUS WHERE JOB=in_job;
END;
/

/** Stored procedure to be run from the job: Uses pipes for job synchronization, executes PROC_DELETE_TEST_BONUS. */
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS_CONCUR(in_pipe_name IN VARCHAR2,
    in_job IN VARCHAR2)
IS
 flag INTEGER;
BEGIN
  -- Execute actual procedure
  PROC_DELETE_TEST_BONUS(in_job);

 -- Signal completion
 -- Use the procedure to put a message in the local buffer. 
  DBMS_PIPE.PACK_MESSAGE(SYSDATE ||': Success ' ||in_job);
  -- Send message, success is a zero return value.
  flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
EXCEPTION
WHEN OTHERS THEN
  -- Signal completion
  -- Use the procedure to put a message in the local buffer. 
  DBMS_PIPE.PACK_MESSAGE(SYSDATE ||':Failed ' || in_job);
  -- Send message, success is a zero return value.
  flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
  RAISE;
END;
/


--
-- Run Jobs
--
DECLARE
  timestart NUMBER;
  duration_insert NUMBER;
  jobs_amount NUMBER := 0;
  retval INTEGER;
  message     VARCHAR2(4000);
  rows_amount NUMBER;

/** Create and define a program that calls PROG_DELETE_TEST_BONUS_CONCUR to be run as job. */
PROCEDURE create_prog_delete_test_bonus
IS  
BEGIN
  -- define new in each run in order to ease development. TODO Once it works, no need to redefine for each run!
  dbms_scheduler.drop_program(program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', force=> TRUE);

  dbms_scheduler.create_program ( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', program_action =>
  'PROC_DELETE_TEST_BONUS_CONCUR', program_type => 'STORED_PROCEDURE', number_of_arguments => 2,
  enabled => FALSE );

  dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR',
  argument_position => 1, argument_name => 'in_pipe_name', argument_type => 'VARCHAR2');
  dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name=>'PROG_DELETE_TEST_BONUS_CONCUR',
  argument_position => 2, argument_name => 'in_job', argument_type => 'VARCHAR2');

  dbms_scheduler.enable('PROG_DELETE_TEST_BONUS_CONCUR');
END;

/** "Forks" a job that runs PROG_DELETE_TEST_BONUS_CONCUR */
PROCEDURE RUN_TEST_BONUS_JOB(
    in_pipe_name IN VARCHAR2,
    in_job    IN VARCHAR2,
    io_job_amount IN OUT NUMBER)
IS
  jobname VARCHAR2(100);
BEGIN
  jobname:=DBMS_SCHEDULER.GENERATE_JOB_NAME;

  dbms_scheduler.create_job(job_name => jobname, program_name =>
  'PROG_DELETE_TEST_BONUS_CONCUR');
  dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name => 
  'in_pipe_name' , argument_value => in_pipe_name);
  dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name => 
  'in_job' , argument_value => in_job);
  dbms_output.put_line(SYSDATE || ': Running job: '|| jobname);

  dbms_scheduler.RUN_JOB(jobname, false );
  io_job_amount:= io_job_amount+1;
END;


-- Anonymous "Main" block
BEGIN 
  create_prog_delete_test_bonus;

  -- Define private pipe
  retval := DBMS_PIPE.CREATE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME, 100, FALSE);  
  dbms_output.put_line(SYSDATE || ': Created pipe: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned ' ||retval);

  timestart := dbms_utility.get_time();
  INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
  INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
  INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
  COMMIT;
  duration_insert := dbms_utility.get_time() -  timestart;
  dbms_output.put_line(SYSDATE || ': Duration (1/100s): INSERT=' || duration_insert);

  SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
  dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);

  timestart := dbms_utility.get_time();

--  -- Process sequentially
-- PROC_DELETE_TEST_BONUS('A');
-- PROC_DELETE_TEST_BONUS('B');
-- PROC_DELETE_TEST_BONUS('C');

  -- start concurrent processing 
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'A', jobs_amount);
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'B', jobs_amount);
  RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'C', jobs_amount);

  -- "Barrier": Wait for all jobs to finish
  for i in 1 .. jobs_amount loop
    -- Reset the local buffer.
    DBMS_PIPE.RESET_BUFFER;

    -- Wait and receive message. Timeout after an hour.
    retval := SYS.DBMS_PIPE.RECEIVE_MESSAGE(SYS.DBMS_PIPE.UNIQUE_SESSION_NAME, 3600);
    -- Handle errors: timeout, etc.
    IF retval != 0 THEN
      raise_application_error(-20000, 'Error: '||to_char(retval)||' receiving on pipe. See Job Log in table user_scheduler_job_run_details');
    END IF;
    -- Read message from local buffer.
    DBMS_PIPE.UNPACK_MESSAGE(message);
    dbms_output.put_line(SYSDATE || ': Received message on '''|| DBMS_PIPE.UNIQUE_SESSION_NAME ||''' (Status='|| retval ||'): ' || message); 
  end loop;

  dbms_output.put(SYSDATE || ': Duration (1/100s): DELETE=');
  dbms_output.put_line(dbms_utility.get_time() -  timestart);

  SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
  dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);

  retval :=DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
  dbms_output.put_line(systimestamp || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
EXCEPTION
WHEN OTHERS THEN
  dbms_output.put_line(SYSDATE ||  SUBSTR(SQLERRM, 1, 1000) || ' ' ||
                                          SUBSTR(DBMS_UTILITY.FORMAT_ERROR_BACKTRACE, 1, 1000));
  retval := DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
  dbms_output.put_line(SYSDATE || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);

  -- Clean up in case of error
  PROC_DELETE_TEST_BONUS('A');
  PROC_DELETE_TEST_BONUS('B');
  PROC_DELETE_TEST_BONUS('C');
  RAISE;
END;
/

You should always keep in mind that the changes executed within the job are committed in a separate transaction.

Just to get a feeling for what this concurrency achieves, here a some averaged measured values: The sequential code in the question takes about 60s to complete, the parallel one about 40s.
It would be an interesting further investigation how this turns out when there are more than the three jobs running in parallel.

PS
A helpful query to find out about the status of the jobs is the following

SELECT job_name,
     destination,
     TO_CHAR(actual_start_date) AS actual_start_date,
     run_duration,
     TO_CHAR((ACTUAL_START_DATE+run_duration)) AS actual_end_date,
     status,
     error#,
     ADDITIONAL_INFO
FROM   user_scheduler_job_run_details
ORDER BY actual_start_date desc;

Solution 2

Just wanted to add a few notes about DBMS_PARALLEL_EXECUTE package from Oracle.

This can be used to do more than update a table, although many of the examples show this simple use case.

The trick is to use an anonymous block instead of a DML statement, and the rest of the examples are still relevant. So, instead of this:

l_sql_stmt := 'update EMPLOYEES e 
      SET e.salary = e.salary + 10
      WHERE manager_id between :start_id and :end_id';

We might have this:

l_sql_stmt := 'BEGIN my_package.some_procedure(:start_id, :end_id); END;';

The rest of the example can be found in the "Chunk by User-Provided SQL" example section

You will still need to tell Oracle the start/end ids for each process(using CREATE_CHUNKS_BY_SQL), I typically store them in a separate lookup table (if pre-defined) or you can provide a SQL query that returns a set of start/end values. For the latter approach, try using NTILE. For example, using 8 chunks:

select min(id) as start_id, max(id) as end_id
from (
  select id, ntile(8) over (order by 1) bucket
  from some_table
  where some_clause...
)
group by bucket
order by bucket;

Hope that helps

Share:
12,872
schnatterer
Author by

schnatterer

I'm building and running things with people and computers. Tech lead infra team @cloudogu, occasional author and speaker. GitOps, cloud, security, … He/him.

Updated on June 23, 2022

Comments

  • schnatterer
    schnatterer almost 2 years

    In a legacy system there is some PL/SQL procedure that calls the another procedure mutliple times with different parameters. The procedure contains a lot of PL/SQL logic (if, then, else).

    As the execution of this procedure takes very long, we thought about using concurrency to speed things up without even touching the actual logic.

    I understand that there are several ways of running (PL/)SQL in parallel on oracle (see bellow).

    However, I wasn't able to find a way to pass different arguments/parameters to a PL/SQL procedure, execute them in parallel and wait until all procedures are finished executing (i.e. I'm looking for mechanism to join all threads or for a barrier mechanism in oracle).

    Let's use the following simplified example on the SCOTT Schema:

    DECLARE
    
    PROCEDURE DELETE_BONUS(
        in_job IN VARCHAR2)
    IS
    BEGIN
      -- Imagine a lot of IF, ELSEIF, ELSE statements here
      DELETE FROM BONUS WHERE JOB=in_job;
    END;
    
    BEGIN 
     INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
     INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
     INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
    
     -- TODO execute those in parallel
     DELETE_BONUS('A');
     DELETE_BONUS('B');
     DELETE_BONUS('C');
     -- TODO wait for all procedures to finish
    
    EXCEPTION
    WHEN OTHERS THEN
      RAISE;
    END;
    /
    

    Here's what I found so far:

    1. DBMS_JOB (deprecated)
    2. DBMS_SCHEDULER (how to wait for jobs to finish? LOCKS?)
    3. DBMS_SCHEDULER CHAINS (passing parameters/arguments is not really possible?!)
    4. DBMS_PARALLEL_EXECUTE (can be used to run SQL queries in parallel but not PL/SQL procedures)

    Can one of these approaches be used to fork and join the procedure calls? Or is there yet another approach that can?