The following lines contain the word 'select', 'insert', 'update' or 'delete':
* insert for parallelisation.
*/
type range_info_r is record
(
rows_in_range number,
range_number number,
range_start number,
range_end number
);
range_ins_dbg boolean, /* information about range inserts */
range_proc_dbg boolean, /* info on processing ranges */
get_id_dbg boolean, /* log info from failing get_id functions */
conc_file_dbg boolean, /* send messages to concurrent log file */
batch_line_info boolean /* show batch_line information */
);
select parameter_value
into p_para_value
from pay_action_parameters
where parameter_name = p_para_name;
select grp.business_group_id
, grp.security_group_id
, pbh.batch_name
, pbh.batch_status
, pbh.business_group_name
, nvl(upper(pbh.atomic_linked_calls), 'N')
into p_business_group_id
, p_security_group_id
, p_batch_name
, p_batch_status
, p_business_group_name
, p_atomic_linked_calls
from per_business_groups_perf grp
, hr_pump_batch_headers pbh
where pbh.batch_id = p_batch_id
and grp.name (+) = pbh.business_group_name
for update of pbh.batch_status
;
update hr_pump_batch_headers h
set h.batch_status = 'P'
where h.batch_id = p_batch_id;
* Outputs information about range inserts.
*/
procedure range_ins_debug
(
p_info in range_info_r
) is
l_rows varchar2(80);
* updated correctly.
* p_overmsg is an override message e.g. in validate mode
* the code saves error text before it rolls back any
* processing it performed. The saved error text is used
* as the override message in a call to post_error after the
* rollback.
*/
function post_error
(
p_sqlcode in number,
p_errmsg in varchar2,
p_overmsg in varchar2,
p_level in varchar2,
p_type in varchar2,
p_id in number,
p_processing in boolean default false
) return varchar2 is
l_exception_text hr_pump_batch_exceptions.exception_text%type;
update hr_pump_batch_headers pbh
set pbh.batch_status = 'E'
where pbh.batch_id = p_id;
update hr_pump_batch_lines pbl
set pbl.line_status = 'E'
where pbl.batch_line_id = p_id;
insert into hr_pump_batch_exceptions (
exception_sequence,
exception_level,
source_id,
source_type,
format,
exception_text)
values (hr_pump_batch_exceptions_s.nextval,
p_level,
p_id,
p_type,
'TRANSLATED',
l_exception_text);
* Inserts a pump requests row.
* Gets the concurrent request from profile.
* Does not perform commit.
*/
procedure ins_pump_request
(
p_batch_id in number,
p_process_type in varchar2
) is
l_request_id number;
insert into hr_pump_requests (
batch_id,
request_id,
process_type)
values (p_batch_id,
l_request_id,
p_process_type);
* Deletes a pump request row for the
* current request_id.
* Does not perform commit.
*/
procedure del_pump_request
(
p_batch_id number
) is
l_request_id number;
delete from hr_pump_requests hpr
where hpr.batch_id = p_batch_id
and hpr.request_id = l_request_id;
* Procedure to select important startup
* data for the master process.
*/
procedure get_startup_info
(
p_batch_id in number,
p_pap_group_id in number,
p_env out nocopy master_env_r,
p_batch_status out nocopy varchar2
) is
l_found boolean;
* Procedure to insert a row in the ranges table.
* Before insert, it checks that the range
* is not empty.
*/
procedure insert_range
(
p_batch_id in number,
p_range_info in range_info_r
) is
begin
hr_data_pump.entry('insert_range');
insert into hr_pump_ranges (
batch_id,
range_number,
range_status,
starting_process_sequence,
ending_process_sequence)
values (p_batch_id,
p_range_info.range_number,
'U',
p_range_info.range_start,
p_range_info.range_end);
hr_data_pump.exit('insert_range');
end insert_range;
* Procedure to insert range rows for parallelisation.
* Note that these rows are inserted afresh on every
* run on the process. This function should not be
* called if there are rows in existence.
*/
function process_ranges
(
p_env in master_env_r,
p_batch_id in number
) return number is
-- Cursor returning rows to process.
cursor c1 is
select pbl.batch_line_id,
pbl.link_value
from hr_pump_batch_lines pbl
where pbl.batch_id = p_batch_id
and pbl.line_status <> 'C'
order by nvl(pbl.user_sequence, pbl.batch_line_id);
* link_value are inserted into the same
* range.
*/
loop
fetch c1 into c1rec;
update hr_pump_batch_lines pbl
set pbl.process_sequence = l_proc_seq
where pbl.batch_line_id = c1rec.batch_line_id;
insert_range(p_batch_id, l_range_info);
* Updates results for a range. Used at after the API have been executed
* to:
* DELETE existing rows from HR_PUMP_BATCH_EXCEPTIONS.
* UPDATE HR_PUMP_BATCH_LINES LINE_STATUS.
* INSERT rows into HR_PUMP_BATCH_EXCEPTIONS.
*/
procedure update_range_results
(p_failed_lines in dbms_sql.number_table
,p_exc_ids in dbms_sql.number_table
,p_exc_text in dbms_sql.varchar2_table
,p_ls_ids in dbms_sql.number_table
,p_ls_statuses in dbms_sql.varchar2s
) is
lbound binary_integer;
hr_data_pump.entry('update_range_results');
delete from hr_pump_batch_exceptions e
where e.source_id = p_failed_lines(i)
and e.source_type = 'BATCH_LINE'
;
update hr_pump_batch_lines bl
set bl.line_status = p_ls_statuses(i)
where bl.batch_line_id = p_ls_ids(i);
insert into hr_pump_batch_exceptions
(exception_sequence
,exception_level
,source_id
,source_type
,format
,exception_text
)
values
(hr_pump_batch_exceptions_s.nextval
,'F'
,p_exc_ids(i)
,'BATCH_LINE'
,'TRANSLATED'
,p_exc_text(i)
);
hr_data_pump.exit('update_range_results');
end update_range_results;
select pbl.batch_line_id
, pbl.line_status
, ham.api_module_id
, ham.module_package
, ham.module_name
, pbl.link_value
, grp.business_group_id
, grp.security_group_id
, pbl.business_group_name
from hr_pump_batch_lines pbl
, hr_api_modules ham
, per_business_groups_perf grp
where pbl.batch_id = p_batch_id
and ham.api_module_id = pbl.api_module_id
and pbl.process_sequence between
p_range_start and p_range_end
and pbl.line_status <> 'C'
and grp.name (+)= pbl.business_group_name
order by pbl.process_sequence;
select business_group_id
, security_group_id
from per_business_groups_perf
where name = p_business_group_name
;
l_failed_lines.delete(l_failed_lines.count);
l_ls_ids.delete(l_ls_pos);
l_ls_statuses.delete(l_ls_pos);
update_range_results
(p_failed_lines => l_failed_lines
,p_exc_ids => l_exc_ids
,p_exc_text => l_exc_text
,p_ls_ids => l_ls_ids
,p_ls_statuses => l_ls_statuses
);
select hpr.rowid,
hpr.starting_process_sequence,
hpr.ending_process_sequence
from hr_pump_ranges hpr,
hr_pump_batch_headers pbh
where hpr.batch_id = p_batch_id
and hpr.range_status = 'U'
and pbh.batch_id = hpr.batch_id
and pbh.batch_status <> 'E'
order by
hpr.starting_process_sequence
for update of
hpr.starting_process_sequence, pbh.batch_status
;
select hpr.rowid,
hpr.starting_process_sequence,
hpr.ending_process_sequence
from hr_pump_ranges hpr,
hr_pump_batch_headers pbh
where hpr.batch_id = p_batch_id
and hpr.range_status = 'U'
and pbh.batch_id = hpr.batch_id
and pbh.batch_status <> 'E'
and rownum < 2 -- only get one row
for update of
hpr.starting_process_sequence, pbh.batch_status
;
update hr_pump_ranges hpr
set hpr.range_status = 'P'
where hpr.rowid = l_range_rowid;
delete from hr_pump_ranges hpr
where hpr.rowid = l_range_rowid;
* to exit. Therefore, attempt to update the batch status
* as appropriate to what has happened.
*/
-- Attempt to lock the row.
select pbh.batch_status
into l_batch_status
from hr_pump_batch_headers pbh
where pbh.batch_id = p_batch_id
for update of pbh.batch_status;
select count(*)
into l_range_count
from hr_pump_ranges hpr
where hpr.batch_id = p_batch_id;
update hr_pump_batch_headers pbh
set pbh.batch_status = 'C'
where pbh.batch_id = p_batch_id;
delete from hr_pump_ranges hpr
where hpr.rowid = l_range_rowid;
and inserts a pump request row - both of which would already
have been done if the slave is called directly from the master.
*/
procedure slave
(
errbuf out nocopy varchar2,
retcode out nocopy number,
p_business_group_id in number,
p_security_group_id in number,
p_batch_id in number,
p_max_errors in binary_integer,
p_validate in varchar2 default 'N'
,p_pap_group_id in number default null
) is
begin
--
-- Set action_parameter_group_id. This must be done before any
-- code that accesses PAY_ACTION_PARAMETERS.
--
pay_core_utils.set_pap_group_id(p_pap_group_id => p_pap_group_id);
delete from hr_pump_requests hpr
where hpr.batch_id = p_batch_id;
delete from hr_pump_batch_exceptions e
where e.source_id = p_batch_id
and e.source_type = 'BATCH_HEADER';
delete from hr_pump_ranges where batch_id = p_batch_id;
delete
from hr_pump_batch_lines l
where l.batch_id = p_batch_id
and l.line_status = p_line_status
and rownum <= p_chunk_size
;
delete
from hr_pump_batch_line_user_keys uk
where uk.batch_line_id is null
and uk.user_key_id between
p_lower_bound and p_upper_bound
and rownum <= p_chunk_size
;
,p_delete_header in boolean
,p_error_message out nocopy varchar2
) is
l_work_to_do boolean;
update hr_pump_batch_line_user_keys uk
set uk.batch_line_id = null
where uk.batch_line_id in
(
select bl.batch_line_id
from hr_pump_batch_lines bl
where bl.batch_id = p_batch_id
)
and rownum <= p_chunk_size
;
delete
from hr_pump_batch_line_user_keys uk
where uk.batch_line_id in
(
select l.batch_line_id
from hr_pump_batch_lines l
where l.batch_id = p_batch_id
)
and rownum <= p_chunk_size
;
delete
from hr_pump_batch_exceptions e
where (
(
e.source_id = p_batch_id and
e.source_type = 'BATCH_HEADER'
) or
(
e.source_id in
(
select l.batch_line_id
from hr_pump_batch_lines l
where l.batch_id = p_batch_id
) and
e.source_type = 'BATCH_LINE'
)
)
and rownum <= p_chunk_size;
delete
from hr_pump_ranges r
where r.batch_id = p_batch_id
and rownum <= p_chunk_size
;
delete from hr_pump_requests
where batch_id = p_batch_id;
if p_delete_header then
--
-- Only do the delete if there are no remaining batch lines.
--
select count(*)
into l_count
from hr_pump_batch_lines
where batch_id = p_batch_id
;
delete
from hr_pump_batch_headers
where batch_id = p_batch_id
;
update hr_pump_batch_headers
set batch_status = 'C'
where batch_id = p_batch_id;
update hr_pump_batch_headers
set batch_status = 'C'
where batch_id = p_batch_id;
update hr_pump_batch_headers
set batch_status = 'E'
where batch_id = p_batch_id;
,p_delete_header in varchar2 default 'Y'
,p_chunk_size in number
,p_thread_number in number
,p_threads in number
,p_pap_group_id in number
,p_lower_bound in number
,p_upper_bound in number
) is
l_all_batches boolean;
l_delete_header boolean;
select h.batch_id
from hr_pump_batch_headers h
where h.batch_id between
p_lower_bound and p_upper_bound
and h.batch_status <> 'P'
;
l_delete_header := upper(p_delete_header) = 'Y';
,p_delete_header => l_delete_header
,p_error_message => l_error_message
);
,p_delete_header => l_delete_header
,p_error_message => l_error_message
);
,p_delete_header in varchar2 default 'Y'
,p_pap_group_id in number default null
) is
l_chunk_size number;
select A.maximum
, B.minimum
from (select max(batch_id) maximum from hr_pump_batch_headers) A
, (select min(batch_id) minimum from hr_pump_batch_headers) B
;
select A.maximum
, B.minimum
from (select max(user_key_id) maximum from hr_pump_batch_line_user_keys) A
, (select min(user_key_id) minimum from hr_pump_batch_line_user_keys) B
;
hr_data_pump.message('P_DELETE_HEADER:' || p_delete_header);
,argument7 => p_delete_header
,argument8 => to_char(l_chunk_size)
,argument9 => to_char(thread)
,argument10 => to_char(l_threads)
,argument11 => to_char(p_pap_group_id)
,argument12 => to_char(l_lower)
,argument13 => to_char(l_upper)
);
,p_delete_header => p_delete_header
,p_chunk_size => l_chunk_size
,p_thread_number => 0
,p_threads => l_threads
,p_pap_group_id => p_pap_group_id
,p_lower_bound => l_lower
,p_upper_bound => l_upper
);