The following lines contain the word 'select', 'insert', 'update' or 'delete':
update_info update_info_type,
batch_size NUMBER := 0,
debug_level NUMBER := 0,
processed_mode NUMBER := NULL,
lock_name VARCHAR2(128),
lock_handle VARCHAR2(128) := NULL,
worker_id NUMBER := NULL
);
PROCEDURE init_g_cache_update_info
(p_owner IN VARCHAR2,
p_table_name IN VARCHAR2,
p_script_name IN VARCHAR2)
IS
BEGIN
g_cache.ui_initialized := FALSE;
SELECT *
INTO g_cache.update_info
FROM ad_parallel_updates
WHERE owner = p_owner
AND table_name = p_table_name
AND script_name = p_script_name;
END init_g_cache_update_info;
PROCEDURE get_update_info
(x_update_info OUT nocopy update_info_type)
IS
BEGIN
IF (g_cache.ui_initialized) THEN
x_update_info := g_cache.update_info;
END get_update_info;
procedure create_update_record
(X_update_type in number,
X_owner in varchar2,
X_table in varchar2,
X_script in varchar2,
X_id_column in varchar2 default null,
X_num_workers in number default null)
is
l_initialized varchar2(1);
IF (X_update_type in (ID_RANGE_BY_ROWID, ID_RANGE,
ID_RANGE_SUB_RANGE,
ID_RANGE_SUB_RANGE_SQL,
ID_RANGE_SCAN_EQUI_ROWSETS))
THEN
if (X_id_column is null) then
raise_application_error(-20001,
'Cannot get name for the unique id column');
'Cannot get number of workers for ID range updates');
ELSIF (x_update_type = ROWID_RANGE) THEN
NULL;
'Unknown update type : ' || x_update_type);
select null
into l_initialized
from ad_parallel_updates
where owner = X_owner
and table_name = X_table
and script_name = X_script;
insert into ad_parallel_updates (
update_id,
update_type,
owner, script_name, table_name,
object_id,
id_column, num_workers, creation_date,
db_block_size, avg_row_len,
initialized_flag)
select ad_parallel_updates_s.nextval,
X_update_type,
X_owner, X_script, X_table,
nvl(o.dataobj#, o.obj#),
X_id_column, X_num_workers, sysdate,
8192, t.avgrln, 'N' -- only 8k block sizes for 11i and above
from sys.obj$ o,
sys.tab$ t,
sys.user$ u
where u.name = X_owner
and o.owner# = u.user#
and o.name = X_table
and o.type# = 2 -- TABLE
and t.obj# = o.obj#;
init_g_cache_update_info(x_owner, x_table, x_script);
END create_update_record;
SELECT segment_name,
partition_name,
segment_type,
data_object_id,
relative_fno, block_id, blocks
from sys.ad_extents
where owner = X_owner
and segment_name = X_table
and segment_type in ('TABLE', 'TABLE PARTITION',
'TABLE SUBPARTITION')
order by segment_name, partition_name, relative_fno, block_id;
l_update_info update_info_type;
l_statement := 'select version from v$instance';
select sid, serial#
into l_my_sid, l_my_serialid
from v$session where audsid = userenv('sessionid');
create_update_record(ROWID_RANGE,
X_owner,
X_table,
X_script);
get_update_info(l_update_info);
if (nvl(l_update_info.avg_row_len, 0) = 0) then
l_minblocks := DEFAULT_MIN_BLOCKS;
l_minblocks := round((X_batch_size*l_update_info.avg_row_len)/
l_update_info.db_block_size, -1);
select nvl(max(unit_id), 0)
into l_unit_id
from ad_parallel_update_units
where update_id = l_update_info.update_id;
INSERT INTO ad_parallel_update_units(
unit_id, update_id,
data_object_id,
relative_fno, start_block, end_block,
status
)
values( unit_id_tab(k), l_update_info.update_id,
object_id_tab(k),
relative_fno_tab(k),
start_block_tab(k),
end_block_tab(k),
UNASSIGNED_STATUS);
l_stmt := 'select min('||X_id_column||') min_val '||
'from '||X_owner||'.'||X_table;
l_stmt := 'select max('||X_id_column||') max_val '||
'from '||X_owner||'.'||X_table;
X_update_type in number,
X_update_id in number,
X_num_workers in number,
X_batch_size in number,
X_SQL_Stmt in varchar2)
is
l_status varchar2(1);
' INSERT INTO ad_parallel_update_units '||
' (update_id, unit_id, start_id, end_id, status) '||
' SELECT :update_id update_id, '||
' unit_id+1 unit_id, '||
' min(id_value) start_id_value, '||
' max(id_value) end_id_value, '||
' :status status '||
' from ('||
' select id_value, '||
' floor(rank() over (order by id_value)/:batchsize) unit_id '||
' from ( '||
X_SQL_Stmt||
' ) '||
' ) '||
' group by unit_id '
using X_Update_id, l_status, X_batch_size;
'INSERT INTO ad_parallel_update_units '||
' (update_id, unit_id, start_id, end_id, status) '||
' select :update_id, unit_id, start_id_value, end_id_value, :status '||
' from ( '||
X_SQL_Stmt||
' ) ');
(X_update_type in number,
X_owner in varchar2,
X_table in varchar2,
X_script in varchar2,
X_id_column in varchar2,
X_num_workers in number,
X_batch_size in number,
X_SQL_Stmt in varchar2,
X_Begin_ID in number,
X_End_ID in number)
is
l_table_start_id number;
l_update_info update_info_type;
l_entire_range := (X_update_type = ID_RANGE_BY_ROWID);
create_update_record(X_update_type,
X_owner, X_table, X_script,
X_id_column, X_num_workers);
get_update_info(l_update_info);
if (X_update_type = ID_RANGE_SCAN_EQUI_ROWSETS)
then
populate_id_ranges(
X_update_type,
l_update_info.update_id,
X_num_workers, X_batch_size,
X_SQL_Stmt);
if (X_update_type = ID_RANGE_SUB_RANGE)
then
l_table_start_id := X_Begin_ID;
insert into ad_parallel_update_units(
unit_id, update_id,
start_id, end_id,
status
)
values (unit_id_tab(i), l_update_info.update_id,
start_id_tab(i), end_id_tab(i),
UNASSIGNED_STATUS);
insert into ad_parallel_update_units(
unit_id, update_id,
start_id, end_id,
status
)
values (unit_id_tab(i), l_update_info.update_id,
start_id_tab(i), end_id_tab(i),
UNASSIGNED_STATUS);
end if; /* if not X_Update_type = ID_RANGE_SCAN_EQUI_ROWSETS */
(X_update_type in number,
X_owner in varchar2,
X_table in varchar2,
X_script in varchar2,
X_ID_column in varchar2,
X_worker_id in number,
X_num_workers in number,
X_batch_size in number,
X_debug_level in number,
X_processed_mode in number,
X_SQL_Stmt in varchar2,
X_Begin_ID in number,
X_End_ID in number)
is
l_initialized varchar2(1);
l_update_id number;
DELETE_PROCESSED_UNITS))
then
raise_application_error(-20001,
'Incorrect mode specified for processed units. '||
'Must be either PRESERVE_PROCESSED_UNITS or '||
'DELETE_PROCESSED_UNITS. ');
if (X_update_type = ID_RANGE_SCAN_EQUI_ROWSETS
and
X_SQL_Stmt is null)
then
raise_application_error(-20001,
'You must specify a SQL statement to derive processing units.');
if (X_update_type = ID_RANGE_SUB_RANGE
and
X_SQL_Stmt is not null)
then
raise_application_error(-20001,
'You cannot specify a SQL statement for specific ID range.');
if (X_update_type = ID_RANGE_SUB_RANGE_SQL
and
X_SQL_Stmt is null)
then
raise_application_error(-20001,
'You must specify a SQL statement for this ID range method.');
select update_id, initialized_flag, num_workers
into l_update_id, l_initialized, l_num_workers
from ad_parallel_updates
where owner = X_owner
and table_name = X_table
and script_name = X_script;
l_update_id := null;
if (l_update_id is not null) then
delete from ad_parallel_update_units
where update_id = l_update_id;
if (X_update_type = ROWID_RANGE) then
populate_extent_info(X_owner, X_table, X_script, X_batch_size);
populate_id_info(X_update_type,
X_owner, X_table, X_script, X_ID_column,
X_num_workers, X_batch_size,
X_SQL_Stmt, X_Begin_ID, X_End_ID);
update ad_parallel_updates
set initialized_flag = 'Y',
num_workers = X_num_workers
where owner = X_owner
and table_name = X_table
and script_name = X_script;
select 1
into l_unproc_units_exist
from sys.dual
where exists (
select 1
from ad_parallel_update_units
where update_id = l_update_id
and status in (UNASSIGNED_STATUS, ASSIGNED_STATUS));
if (X_update_type not in (ROWID_RANGE, ID_RANGE)) then
--
-- for ID over ROWID range methods, you cannot reduce the
-- number of workers after initialization
--
if (X_num_workers < l_num_workers) then
raise_application_error(-20001,
'Cannot reduce the number of workers after initialization.');
update ad_parallel_update_units
set status = UNASSIGNED_STATUS,
worker_id = null
where update_id = l_update_id
and status = ASSIGNED_STATUS
and worker_id > X_num_workers;
update ad_parallel_workers
set start_unit_id = 0,
end_unit_id = 0,
start_rowid = null,
start_id = null,
end_rowid = null,
end_id = null
where update_id = l_update_id
and worker_id > X_num_workers;
update ad_parallel_updates
set num_workers = X_num_workers
where update_id = l_update_id;
end if; -- incomplete update
g_cache.update_info.owner||'.'||
g_cache.update_info.table_name||'.'||
g_cache.update_info.script_name)) THEN
init_g_cache_update_info(x_owner, x_table, x_script);
(X_update_type in number,
X_owner in varchar2,
X_table in varchar2,
X_script in varchar2,
X_worker_id in number,
X_num_workers in number,
X_batch_size in number,
X_debug_level in number)
is
begin
initialize_rowid_range
(X_update_type => X_update_type,
X_owner => X_owner,
X_table => X_table,
X_script => X_script,
X_worker_id => X_worker_id,
X_num_workers => X_num_workers,
X_batch_size => X_batch_size,
X_debug_level => X_debug_level,
X_processed_mode => PRESERVE_PROCESSED_UNITS);
(X_update_type in number,
X_owner in varchar2,
X_table in varchar2,
X_script in varchar2,
X_worker_id in number,
X_num_workers in number,
X_batch_size in number,
X_debug_level in number,
X_processed_mode in number)
is
begin
debug_info('initialize_rowid_range()+');
initialize(X_update_type,
X_owner, X_table, X_script, null,
X_worker_id, X_num_workers,
X_batch_size, X_debug_level,
X_processed_mode,
null, null, null);
(X_update_type in number,
X_owner in varchar2,
X_table in varchar2,
X_script in varchar2,
X_ID_column in varchar2,
X_worker_id in number,
X_num_workers in number,
X_batch_size in number,
X_debug_level in number,
X_SQL_Stmt in varchar2 default NULL,
X_Begin_ID in number default NULL,
X_End_ID in number default NULL)
is
begin
initialize(X_update_type,
X_owner, X_table, X_script, X_ID_column,
X_worker_id, X_num_workers,
X_batch_size, X_debug_level,
ad_parallel_updates_pkg.PRESERVE_PROCESSED_UNITS,
X_SQL_Stmt, X_Begin_ID, X_End_ID);
X_update_id in number,
X_res_start_rowid out nocopy rowid,
X_res_end_rowid out nocopy rowid,
X_res_start_id out nocopy number,
X_res_end_id out nocopy number,
X_start_unit_id out nocopy number,
X_end_unit_id out nocopy number)
is
begin
select start_rowid, end_rowid,
start_id, end_id,
start_unit_id, end_unit_id
into X_res_start_rowid, X_res_end_rowid,
X_res_start_id, X_res_end_id,
X_start_unit_id, X_end_unit_id
from ad_parallel_workers
where worker_id = X_worker_id
and update_id = X_update_id;
l_update_info update_info_type;
get_update_info(l_update_info);
select start_unit_id, end_unit_id
into l_start_unit_id, l_end_unit_id
from ad_parallel_workers
where worker_id = l_worker_id
and update_id = l_update_info.update_id;
update ad_parallel_workers
set start_rowid = nvl(X_last_rowid, start_rowid),
start_id = least(nvl(X_last_id, start_id)+1, end_id+1),
rows_processed = nvl(rows_processed, 0) + nvl(X_rows_processed, 0)
where worker_id = l_worker_id
and update_id = l_update_info.update_id;
if (g_cache.processed_mode = DELETE_PROCESSED_UNITS) then
delete from ad_parallel_update_units
where update_id = l_update_info.update_id
and unit_id between l_start_unit_id and l_end_unit_id;
update ad_parallel_update_units
set status = decode(l_update_info.update_type,
ROWID_RANGE, PROCESSED_STATUS,
decode(X_last_id,
end_id, PROCESSED_STATUS,
status)),
end_date = sysdate,
rows_processed = nvl(rows_processed, 0) +
nvl(X_rows_processed, 0)
where update_id = l_update_info.update_id
and unit_id between l_start_unit_id and l_end_unit_id;
l_update_info update_info_type;
get_update_info(l_update_info);
update ad_parallel_workers
set start_unit_id = X_start_unit_id,
end_unit_id = X_end_unit_id,
start_rowid = X_start_rowid,
end_rowid = X_end_rowid,
start_id = X_start_id,
end_id = X_end_id
where worker_id = X_worker_id
and update_id = l_update_info.update_id;
insert into ad_parallel_workers (
worker_id, update_id,
start_unit_id, end_unit_id,
start_rowid, end_rowid,
start_id, end_id
)
values (X_worker_id, l_update_info.update_id,
X_start_unit_id, X_end_unit_id,
X_start_rowid, X_end_rowid,
X_start_id, X_end_id
);
update ad_parallel_update_units
set status = ASSIGNED_STATUS,
worker_id = X_worker_id,
start_date = nvl(start_date, sysdate)
where update_id = l_update_info.update_id
and unit_id between X_start_unit_id and X_end_unit_id;
l_unit_rec ad_parallel_update_units%rowtype;
l_update_info update_info_type;
cursor c_range(p_update_id number) is
select /*+ FIRST_ROWS +*/ *
from ad_parallel_update_units
where update_id = p_update_id
and status = UNASSIGNED_STATUS
for update of status
skip locked;
get_update_info(l_update_info);
get_restart_range(l_worker_id, l_update_info.update_id,
l_res_start_rowid, l_res_end_rowid,
l_res_start_id, l_res_end_id,
l_start_unit_id, l_end_unit_id);
open c_range(l_update_info.update_id);
l_update_info.object_id),
l_unit_rec.relative_fno,
l_unit_rec.start_block,
0);
l_update_info.object_id),
l_unit_rec.relative_fno,
l_unit_rec.end_block,
G_MAX_ROWS_IN_BLOCK);
l_unit_rec ad_parallel_update_units%rowtype;
l_update_info update_info_type;
cursor c_range(p_update_id in number) is
select /*+ FIRST_ROWS +*/ *
from ad_parallel_update_units
where update_id = p_update_id
and status = UNASSIGNED_STATUS
for update of status
skip locked;
get_update_info(l_update_info);
get_restart_range(l_worker_id, l_update_info.update_id,
l_res_start_rowid, l_res_end_rowid,
l_res_start_id, l_res_end_id,
l_start_unit_id, l_end_unit_id);
if (l_update_info.update_type = ID_RANGE_SCAN_EQUI_ROWSETS) then
X_end_id := l_res_end_id;
select status
into l_status
from ad_parallel_update_units
where update_id = l_update_info.update_id
and unit_id = l_start_unit_id;
if (l_update_info.update_type = ID_RANGE_SCAN_EQUI_ROWSETS) then
X_end_id := l_res_end_id;
open c_range(l_update_info.update_id);
if (l_update_info.update_type = ID_RANGE_SCAN_EQUI_ROWSETS) then
X_end_id := l_end_id;
update AD_PARALLEL_UPDATE_UNITS
set STATUS = PROCESSED_STATUS
where update_id = l_update_info.update_id
and unit_id = l_unit_rec.unit_id;
select update_id
from ad_parallel_updates p
where initialized_flag = 'Y'
and owner = nvl(upper(X_owner), owner)
and table_name = nvl(upper(X_table), table_name)
and script_name = nvl(X_script, script_name)
and not exists (
select update_id
from ad_parallel_update_units u
where u.update_id = p.update_id
and u.status in ('A', 'U'));
delete from ad_parallel_update_units
where update_id = c_rec.update_id;
delete from ad_parallel_workers
where update_id = c_rec.update_id;
procedure delete_update_information(
X_update_type in number,
X_owner in varchar2,
X_table in varchar2,
X_script in varchar2)
is
l_update_id number;
select update_id
into l_update_id
from ad_parallel_updates
where owner = upper(X_owner)
and table_name = upper(X_table)
and script_name = X_script;
delete from ad_parallel_workers
where update_id = l_update_id;
delete from ad_parallel_update_units
where update_id = l_update_id;
delete from ad_parallel_updates
where update_id = l_update_id;
select update_id, table_name, owner
from ad_parallel_updates pu
where owner = nvl(upper(X_owner), owner)
and table_name = nvl(upper(X_table), table_name)
and script_name = nvl(X_script, script_name)
and update_type = ROWID_RANGE
and initialized_flag = 'Y'
and exists (
select 'Unprocessed units exist'
from ad_parallel_update_units pun
where pun.update_id = pu.update_id
and pun.status in (UNASSIGNED_STATUS, ASSIGNED_STATUS));
update ad_parallel_updates
set initialized_flag = 'N'
where update_id = crec.update_id;
delete from ad_parallel_update_units
where update_id = crec.update_id;
delete from ad_parallel_workers
where update_id = crec.update_id;