1 PACKAGE BODY FND_OAM_DSCRAM_UNITS_PKG as
2 /* $Header: AFOAMDSUNITB.pls 120.8 2006/06/07 17:43:32 ilawler noship $ */
3
4 ----------------------------------------
5 -- Private Body Constants
6 ----------------------------------------
7 PKG_NAME CONSTANT VARCHAR2(20) := 'DSCRAM_UNITS_PKG.';
8
9 ----------------------------------------
10 -- Private Body Variables
11 ----------------------------------------
12 CURSOR B_UNITS
13 IS
14 SELECT /*+ FIRST_ROWS(1) */ unit_id, unit_status, phase, actual_workers_allowed, workers_assigned
15 FROM fnd_oam_dscram_units
16 WHERE task_id = FND_OAM_DSCRAM_TASKS_PKG.GET_TASK_ID
17 AND unit_status in (FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_UNPROCESSED,
18 FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_PROCESSING,
19 FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_RESTARTABLE,
20 FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_FINISHING) --need to include finishing to keep from violating phases
21 AND concurrent_group_unit_id IS NULL --only select top-level units, all units with a conc_unit_id should belong to a parent unit
22 ORDER BY phase ASC, priority ASC, weight DESC;
23
24 TYPE b_unit_cache_type IS RECORD
25 (
26 initialized BOOLEAN := FALSE,
27 unit_id NUMBER(15) := NULL,
28 unit_type VARCHAR2(30) := NULL,
29 weight NUMBER := NULL,
30 unit_object_owner VARCHAR2(30) := NULL,
31 unit_object_name VARCHAR2(30) := NULL,
32 error_fatality_level VARCHAR2(30) := NULL,
33 use_splitting BOOLEAN := NULL,
34 batch_size NUMBER(15) := NULL,
35 actual_workers_allowed NUMBER(15) := NULL,
36 last_validated DATE := NULL,
37 last_validation_ret_sts VARCHAR2(6) := NULL
38 );
39 b_unit_info b_unit_cache_type;
40
41 --not part of state because it's set before assign
42 b_last_fetched_unit_id NUMBER(15) := NULL;
43 b_last_fetched_unit_phase NUMBER := NULL;
44
45 ----------------------------------------
46 -- Public/Private Procedures/Functions
47 ----------------------------------------
48 -- Public
49 FUNCTION GET_UNIT_ID
50 RETURN NUMBER
51 IS
52 BEGIN
53 IF NOT b_unit_info.initialized THEN
54 RAISE NO_DATA_FOUND;
55 END IF;
56
57 RETURN b_unit_info.unit_id;
58 END;
59
60 -- Public
61 FUNCTION GET_WORKERS_ALLOWED
62 RETURN NUMBER
63 IS
64 BEGIN
65 IF NOT b_unit_info.initialized THEN
66 RAISE NO_DATA_FOUND;
67 END IF;
68
69 RETURN b_unit_info.actual_workers_allowed;
70 END;
71
72 -- Public
73 FUNCTION GET_BATCH_SIZE
74 RETURN NUMBER
75 IS
76 BEGIN
77 IF NOT b_unit_info.initialized THEN
78 RAISE NO_DATA_FOUND;
79 END IF;
80
81 RETURN b_unit_info.batch_size;
82 END;
83
84 -- Public
85 FUNCTION GET_UNIT_OBJECT_OWNER
86 RETURN VARCHAR2
87 IS
88 BEGIN
89 IF NOT b_unit_info.initialized THEN
90 RAISE NO_DATA_FOUND;
91 END IF;
92
93 RETURN b_unit_info.unit_object_owner;
94 END;
95
96 -- Public
97 FUNCTION GET_UNIT_OBJECT_NAME
98 RETURN VARCHAR2
99 IS
100 BEGIN
101 IF NOT b_unit_info.initialized THEN
102 RAISE NO_DATA_FOUND;
103 END IF;
104
105 RETURN b_unit_info.unit_object_name;
106 END;
107
108 -- Public
109 FUNCTION CREATE_WORK_ITEM(p_priority IN NUMBER,
110 p_weight IN NUMBER,
111 p_item_type IN VARCHAR2,
112 p_item_id IN NUMBER)
113 RETURN work_item_type
114 IS
115 l_work_item work_item_type;
116 BEGIN
117 l_work_item.priority := p_priority;
118 l_work_item.weight := p_weight;
119 l_work_item.item_type := p_item_type;
120 l_work_item.item_id := p_item_id;
121 l_work_item.item_msg := NULL; -- NULL causes complete to defer to the work item cache's value
122
123 RETURN l_work_item;
124 END;
125
126 -- Public
127 PROCEDURE FETCH_NEXT_UNIT(p_requery IN BOOLEAN,
128 x_unit_id OUT NOCOPY NUMBER,
129 x_return_status OUT NOCOPY VARCHAR2,
130 x_return_msg OUT NOCOPY VARCHAR2)
131 IS
132 l_ctxt VARCHAR2(60) := PKG_NAME||'FETCH_NEXT_UNIT';
133
134 l_unit_id NUMBER(15);
135 l_status VARCHAR2(30);
136 l_phase NUMBER;
137 l_workers_allowed NUMBER(15);
138 l_workers_assigned NUMBER(15);
139 BEGIN
140 fnd_oam_debug.log(2, l_ctxt, 'ENTER');
141 x_return_status := FND_API.G_RET_STS_ERROR;
142 x_return_msg := '';
143
144 --handle closing/opening the cursor as necessary depending on p_requery
145 IF p_requery OR
146 NOT B_UNITS%ISOPEN THEN
147 --reset the last vars when doing a requery
148 b_last_fetched_unit_id := NULL;
149 b_last_fetched_unit_phase := NULL;
150
151 IF p_requery AND
152 B_UNITS%ISOPEN THEN
153 CLOSE B_UNITS;
154 END IF;
155 OPEN B_UNITS;
156 END IF;
157
158 --FETCH the next row
159 FETCH B_UNITS INTO l_unit_id, l_status, l_phase, l_workers_allowed, l_workers_assigned;
160
161 -- no rows is an empty
162 IF B_UNITS%NOTFOUND THEN
163 fnd_oam_debug.log(1, l_ctxt, 'B_UNITS empty');
164 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
165 x_return_status := FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_EMPTY;
166 CLOSE B_UNITS;
167 RETURN;
168 END IF;
169
170 -- begin a loop here to do additional fetches if we detect there isn't a spot for a worker
171 LOOP
172 fnd_oam_debug.log(1, l_ctxt, 'Unit ID(Status): '||l_unit_id||'('||l_status||')');
173
174 -- if the last_phase isn't null, make sure we're not looking at a unit in a later
175 -- phase, makes the assumption that there is no phase after null.
176 IF b_last_fetched_unit_phase IS NOT NULL AND
177 (l_phase IS NULL OR b_last_fetched_unit_phase < l_phase) THEN
178 x_return_status := FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_FULL;
179 -- close the cursor to allow the current, invalid unit to get queried up next time
180 CLOSE B_UNITS;
181 fnd_oam_debug.log(1, l_ctxt, 'Unit Phase('||l_phase||') later than last_phase('||b_last_fetched_unit_phase||')');
182 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
183 RETURN;
184 END IF;
185
186 --if the unit's finishing, we're just here to log its phase and fetch the next - we can't return this unit
187 IF l_status = FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_FINISHING THEN
188 fnd_oam_debug.log(1, l_ctxt, 'Found finishing unit, Fetching Next');
189 ELSE
190 -- see if there's space in the unit
191 IF l_workers_allowed IS NULL OR l_workers_assigned < l_workers_allowed THEN
192 --exit to return the unit
193 EXIT;
194 ELSE
195 fnd_oam_debug.log(1, l_ctxt, 'Unit Full, Fetching Next');
196 END IF;
197 END IF;
198
199 -- if we're still in the loop at this point, set this unit to the last fetched and fetch
200 -- the next
201 b_last_fetched_unit_id := l_unit_id;
202 b_last_fetched_unit_phase := l_phase;
203
204 --FETCH the next row
205 FETCH B_UNITS INTO l_unit_id, l_status, l_phase, l_workers_allowed, l_workers_assigned;
206
207 -- no rows at this point isn't an empty but a full
208 IF B_UNITS%NOTFOUND THEN
209 x_return_status := FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_FULL;
210 CLOSE B_UNITS;
211 fnd_oam_debug.log(1, l_ctxt, 'No more units to Fetch');
212 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
213 RETURN;
214 END IF;
215 END LOOP;
216
217 --cache the last unit fetched to allow a quick validation later
218 x_unit_id := l_unit_id;
219 b_last_fetched_unit_id := l_unit_id;
220 b_last_fetched_unit_phase := l_phase;
221
222 --success
223 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
224 x_return_status := FND_API.G_RET_STS_SUCCESS;
225 EXCEPTION
226 WHEN OTHERS THEN
227 IF B_UNITS%ISOPEN THEN
228 CLOSE B_UNITS;
229 END IF;
230 x_return_msg := 'Unhandled Exception: (Code('||SQLCODE||'), Message("'||SQLERRM||'"))';
231 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
232 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
233 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
234 END;
235
236 -- Private
237 -- Called by execute_unit as a sanity check on the unit_id before beginning execution.
238 FUNCTION VALIDATE_START_EXECUTION(p_unit_id IN NUMBER,
239 x_return_status OUT NOCOPY VARCHAR2,
240 x_return_msg OUT NOCOPY VARCHAR2)
241 RETURN BOOLEAN
242 IS
243 l_ctxt VARCHAR2(60) := PKG_NAME||'VALIDATE_UNIT_FOR_EXECUTION';
244
245 l_status VARCHAR2(30);
246
247 l_return_status VARCHAR2(6);
248 l_return_msg VARCHAR2(2048);
249
250 CURSOR C1
251 IS
252 SELECT unit_status
253 FROM fnd_oam_dscram_units
254 WHERE unit_id = p_unit_id;
255 BEGIN
256 fnd_oam_debug.log(2, l_ctxt, 'ENTER');
257 x_return_status := FND_API.G_RET_STS_ERROR;
258 x_return_msg := '';
259
260 -- automatically valid if unit_id same as last fetched
261 IF p_unit_id = b_last_fetched_unit_id THEN
262 x_return_status := FND_API.G_RET_STS_SUCCESS;
263 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
264 RETURN TRUE;
265 END IF;
266
267 --fetch necessary unit attributes
268 OPEN C1;
269 FETCH C1 INTO l_status;
270 IF C1%NOTFOUND THEN
271 x_return_msg := 'Invalid unit_id: ('||p_unit_id||')';
272 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
273 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
274 RETURN FALSE;
275 END IF;
276 CLOSE C1;
277
278 --check that the status executes it's executable
279 IF NOT FND_OAM_DSCRAM_UTILS_PKG.STATUS_IS_EXECUTABLE(l_status) THEN
280 -- report the true status of the unit to execute to pass on to execute's caller
281 x_return_status := FND_OAM_DSCRAM_UTILS_PKG.CONV_VALIDATE_START_STS_TO_RET(l_status);
282 IF FND_OAM_DSCRAM_UTILS_PKG.RET_STS_IS_ERROR(x_return_status) THEN
283 x_return_msg := 'Invalid task status('||l_status||')';
284 fnd_oam_debug.log(1, l_ctxt, x_return_msg);
285 END IF;
286 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
287 RETURN FALSE;
288 END IF;
289
290 --success
291 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
292 x_return_status := FND_API.G_RET_STS_SUCCESS;
293 RETURN TRUE;
294 EXCEPTION
295 WHEN OTHERS THEN
296 x_return_msg := 'Unhandled Exception: (Code('||SQLCODE||'), Message("'||SQLERRM||'"))';
297 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
298 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
299 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
300 RETURN FALSE;
301 END;
302
303 -- Public
304 -- Return Statuses:
305 -- SUCCESS, ERROR, ERROR_UNEXP, STOPPED
306 -- -Converted- : PROCESSED, STOPPED, ERROR_FATAL, ERROR_UNKNOWN
307 FUNCTION VALIDATE_CONTINUED_EXECUTION(p_force_query IN BOOLEAN,
308 p_recurse IN BOOLEAN,
309 x_return_status OUT NOCOPY VARCHAR2,
310 x_return_msg OUT NOCOPY VARCHAR2)
311 RETURN BOOLEAN
312 IS
313 l_ctxt VARCHAR2(60) := PKG_NAME||'VALIDATE_CONTINUED_EXECUTION';
314
315 l_status VARCHAR2(30) := NULL;
316 l_return_status VARCHAR2(6);
317 l_return_msg VARCHAR2(2048);
318
319 CURSOR C1
320 IS
321 SELECT unit_status
322 FROM fnd_oam_dscram_units
323 WHERE unit_id = b_unit_info.unit_id;
324 BEGIN
325 x_return_status := FND_API.G_RET_STS_ERROR;
326 x_return_msg := '';
327
328 -- make sure the state's initialized
329 IF NOT b_unit_info.initialized THEN
330 RAISE NO_DATA_FOUND;
331 END IF;
332
333 -- check if we should do work or if we can presume the cached status
334 IF (p_force_query OR
335 FND_OAM_DSCRAM_UTILS_PKG.VALIDATION_DUE(b_unit_info.last_validated)) THEN
336
337 fnd_oam_debug.log(1, l_ctxt, '>RE-QUERYING<');
338
339 -- re-init the cached fields to allow easy exit
340 b_unit_info.last_validation_ret_sts := x_return_status;
341 b_unit_info.last_validated := SYSDATE;
342
343 --otherwise, fetch necessary run attributes and evaluate
344 OPEN C1;
345 FETCH C1 INTO l_status;
346 IF C1%NOTFOUND THEN
347 --shouldn't happen since we're using the cache
348 x_return_msg := 'Invalid cached unit_id: '||b_unit_info.unit_id;
349 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
350 RETURN FALSE;
351 END IF;
352 CLOSE C1;
353
354 --make sure the unit has been marked as processing
355 IF NOT FND_OAM_DSCRAM_UTILS_PKG.STATUS_IS_PROCESSING(l_status) THEN
356 x_return_status := FND_OAM_DSCRAM_UTILS_PKG.CONV_VALIDATE_CONT_STS_TO_RET(l_status);
357 b_unit_info.last_validation_ret_sts := x_return_status;
358 IF x_return_status <> FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_PROCESSED THEN
359 x_return_msg := 'Invalid unit status('||l_status||')';
360 fnd_oam_debug.log(1, l_ctxt, x_return_msg);
361 END IF;
362 RETURN FALSE;
363 END IF;
364 x_return_status := FND_API.G_RET_STS_SUCCESS;
365 ELSE
366 x_return_status := b_unit_info.last_validation_ret_sts;
367 END IF;
368
369 -- make a recursive call to the the task if required
370 IF p_recurse THEN
371 IF NOT FND_OAM_DSCRAM_TASKS_PKG.VALIDATE_CONTINUED_EXECUTION(p_force_query,
372 TRUE,
373 l_return_status,
374 l_return_msg) THEN
375 -- the run has an invalid status, tell the execute to stop the unit
376 x_return_status := FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_STOPPED;
377 x_return_msg := '[Continued Validation of Parent(s) Failed]:(Status('||l_return_status||'), Msg('||l_return_msg||'))';
378 fnd_oam_debug.log(1, l_ctxt, x_return_msg);
379 RETURN FALSE;
380 END IF;
381 END IF;
382
383 --success
384 b_unit_info.last_validation_ret_sts := x_return_status;
385 b_unit_info.last_validated := SYSDATE;
386 RETURN (x_return_status = FND_API.G_RET_STS_SUCCESS);
387 EXCEPTION
388 WHEN OTHERS THEN
389 x_return_msg := 'Unhandled Exception: (Code('||SQLCODE||'), Message("'||SQLERRM||'"))';
390 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
391 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
392 b_unit_info.last_validation_ret_sts := x_return_status;
393 b_unit_info.last_validated := SYSDATE;
394 RETURN FALSE;
395 END;
396
397 -- Private: Autonomous Txn
398 -- Invariant:
399 -- Validate_unit must preceed this call so we can assume the p_unit_id has a valid
400 -- database row.
401 -- Before a call to execute_unit can start doing work, it needs to call this procedure
402 -- to make sure the status is set to processing and a stats row is created for it.
403 PROCEDURE ASSIGN_WORKER_TO_UNIT(p_unit_id IN NUMBER,
404 x_return_status OUT NOCOPY VARCHAR2,
405 x_return_msg OUT NOCOPY VARCHAR2)
406 IS
407 PRAGMA AUTONOMOUS_TRANSACTION;
408
409 l_ctxt VARCHAR2(60) := PKG_NAME||'ASSIGN_WORKER_TO_UNIT';
410
411 l_suggest_workers_allowed NUMBER;
412 l_actual_workers_allowed NUMBER;
413 l_workers_assigned NUMBER;
414 l_stat_id NUMBER;
415 l_unit_type VARCHAR2(30);
416 l_weight NUMBER;
417 l_unit_object_owner VARCHAR2(30);
418 l_unit_object_name VARCHAR2(30);
419 l_batch_size NUMBER;
420 l_error_fatality_level VARCHAR2(30);
421 l_suggest_disable_splitting VARCHAR2(3);
422 l_actual_disable_splitting VARCHAR2(3);
423
424 l_use_splitting BOOLEAN;
425
426 l_status VARCHAR2(30) := NULL;
427 BEGIN
428 fnd_oam_debug.log(2, l_ctxt, 'ENTER');
429 x_return_status := FND_API.G_RET_STS_ERROR;
430 x_return_msg := '';
431
432 -- Do a locking select without a pre-select since we always have to update the unit
433 -- row to add to the # of workers assigned. Also updates the status to started if it
434 -- hasn't been started yet.
435 SELECT unit_status, suggest_workers_allowed, actual_workers_allowed, workers_assigned, unit_type, weight,
436 unit_object_owner, unit_object_name, error_fatality_level, suggest_disable_splitting, actual_disable_splitting, batch_size
437 INTO l_status, l_suggest_workers_allowed, l_actual_workers_allowed, l_workers_assigned, l_unit_type, l_weight,
438 l_unit_object_owner, l_unit_object_name, l_error_fatality_level, l_suggest_disable_splitting, l_actual_disable_splitting, l_batch_size
439 FROM fnd_oam_dscram_units
440 WHERE unit_id = p_unit_id
441 FOR UPDATE;
442
443 fnd_oam_debug.log(1, l_ctxt, 'Unit Status: '||l_status);
444 fnd_oam_debug.log(1, l_ctxt, 'Workers Allow(Sug/Act), Assigned: ('||l_suggest_workers_allowed||')('||l_actual_workers_allowed||'), '||l_workers_assigned||')');
445
446 -- make sure the status is runnable after the lock
447 IF NOT FND_OAM_DSCRAM_UTILS_PKG.STATUS_IS_EXECUTABLE(l_status) THEN
448 x_return_status := FND_OAM_DSCRAM_UTILS_PKG.CONV_VALIDATE_START_STS_TO_RET(l_status);
449 IF FND_OAM_DSCRAM_UTILS_PKG.RET_STS_IS_ERROR(x_return_status) THEN
450 x_return_msg := 'Invalid unit in assign, status('||l_status||')';
451 fnd_oam_debug.log(1, l_ctxt, x_return_msg);
452 END IF;
453 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
454 ROLLBACK;
455 RETURN;
456 END IF;
457
458 -- make sure there's a spot for us to work here
459 IF ((l_actual_workers_allowed IS NOT NULL AND
460 l_workers_assigned >= l_actual_workers_allowed) OR
461 (l_actual_workers_allowed IS NULL AND
462 l_workers_assigned >= l_suggest_workers_allowed)) THEN
463 x_return_status := FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_FULL;
464 fnd_oam_debug.log(1, l_ctxt, 'No workers slot available, returning full.');
465 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
466 ROLLBACK;
467 RETURN;
468 END IF;
469
470 -- first start, create a stats entry
471 IF l_status <> FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_PROCESSING THEN
472 IF l_workers_assigned = 0 THEN
473 --create a stats entry
474 FND_OAM_DSCRAM_STATS_PKG.CREATE_ENTRY(p_source_object_type => FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_UNIT,
475 p_source_object_id => p_unit_id,
476 p_start_time => SYSDATE,
477 p_prestart_status => l_status,
478 x_stat_id => l_stat_id);
479
480 --for small units, instead of paying the cost of using the parallelization infrastructure, we
481 --can limit the number of workers to one and disable splitting to get the unit components done faster.
482 --the workers allowed count is also set to one because later components can't
483 --execute in parallel with earlier ones unless we're using the AD parallization infrastructure where they
484 --all execute at once on a chunk of rows.
485 IF ((l_weight IS NOT NULL AND
486 l_weight < FND_OAM_DSCRAM_BUNDLES_PKG.GET_MIN_PARALLEL_UNIT_WEIGHT) OR
487 (l_suggest_disable_splitting = FND_API.G_TRUE)) THEN
488 fnd_oam_debug.log(1, l_ctxt, 'Small unit or forced disable splitting, using Serial Execution.');
489
490 -- don't overwrite the actual values if they've been set by a previous start, don't want to change state after
491 -- we've already set it..
492 l_actual_workers_allowed := NVL(l_actual_workers_allowed, 1);
493 l_actual_disable_splitting := NVL(l_actual_disable_splitting, FND_API.G_TRUE);
494 END IF;
495
496 --set to suggested if still no value
497 l_actual_workers_allowed := NVL(l_actual_workers_allowed, l_suggest_workers_allowed); --acceptable to push a NULL
498 l_actual_disable_splitting := NVL(l_actual_disable_splitting, NVL(l_suggest_disable_splitting, FND_API.G_FALSE));
499
500 -- update the status and other fields
501 UPDATE fnd_oam_dscram_units
502 SET unit_status = FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_PROCESSING,
503 stats_finished = FND_API.G_FALSE,
504 actual_workers_allowed = l_actual_workers_allowed,
505 actual_disable_splitting = l_actual_disable_splitting
506 WHERE unit_id = p_unit_id;
507 ELSE
508 --the unit isn't processing but somebody's set it to processing, this shouldn't happen
509 x_return_msg := 'Unit Status ('||l_status||') is not in-progress but the number of workers assigned('||l_workers_assigned||') is nonzero.';
510 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
511 ROLLBACK;
512 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
513 RETURN;
514 END IF;
515 END IF;
516
517 -- convert our splitting decision
518 l_use_splitting := NOT(FND_OAM_DSCRAM_UTILS_PKG.FLAG_TO_BOOLEAN(l_actual_disable_splitting));
519
520 -- finally, always update the last updated fields and the # of workers assigned.
521 UPDATE fnd_oam_dscram_units
522 SET workers_assigned = workers_assigned + 1,
523 last_updated_by = fnd_global.user_id,
524 last_update_login = fnd_global.user_id,
525 last_update_date = SYSDATE
526 WHERE unit_id = p_unit_id;
527
528 -- push changes and release lock
529 COMMIT;
530
531 -- populate the unit state
532 b_unit_info.unit_id := p_unit_id;
533 b_unit_info.unit_type := l_unit_type;
534 b_unit_info.weight := l_weight;
535 b_unit_info.unit_object_owner := l_unit_object_owner;
536 b_unit_info.unit_object_name := l_unit_object_name;
537 b_unit_info.error_fatality_level := l_error_fatality_level;
538 b_unit_info.use_splitting := l_use_splitting;
539 b_unit_info.actual_workers_allowed := l_actual_workers_allowed;
540 b_unit_info.batch_size := l_batch_size;
541 b_unit_info.last_validated := NULL;
542 b_unit_info.initialized := TRUE;
543
544 --invalidate the last fetched unit since we just changed its state, we'll requery next time anyway
545 b_last_fetched_unit_id := NULL;
546 b_last_fetched_unit_phase := NULL;
547
548 x_return_status := FND_API.G_RET_STS_SUCCESS;
549 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
550 EXCEPTION
551 WHEN OTHERS THEN
552 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
553 x_return_msg := 'Unhandled Exception: (Code('||SQLCODE||'), Message("'||SQLERRM||'"))';
554 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
555 ROLLBACK;
556 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
557 END;
558
559 -- Private
560 -- Called when a unit is completed in some way. Duties include updating the
561 -- unit's status, decrementing workers_assigned, completing the stats record. The unit may also be given
562 -- a list of plsqls, dmls and other work items to complete while we're here. This update is made atomic with the unit update
563 -- to disallow the case where a second worker enters a unit after the work item has been updated but before the unit is
564 -- updated and draws a false conclusion on whether there's any work left. (6/6/06) This is not as important now because
565 -- the unit would enter the 'FINISHING' state but still be put into the proper final state by the worker aborted processing
566 -- the unit because of an errored dml. Unlike other COMPLETE_<entity> methods,
567 -- we explicitly provide the p_unit_id in case we need to complete other units in the future since unit is a composite type.
568 PROCEDURE COMPLETE_UNIT(p_unit_id IN NUMBER,
569 p_proposed_status IN VARCHAR2,
570 p_proposed_ret_sts IN VARCHAR2,
571 p_message IN VARCHAR2,
572 px_arg_context IN OUT NOCOPY FND_OAM_DSCRAM_ARGS_PKG.arg_context,
573 p_destroy_caches IN BOOLEAN,
574 px_work_queue_to_complete IN OUT NOCOPY ordered_work_queue_type,
575 x_final_ret_sts OUT NOCOPY VARCHAR2)
576 IS
577 PRAGMA AUTONOMOUS_TRANSACTION;
578
579 l_ctxt VARCHAR2(60) := PKG_NAME||'COMPLETE_UNIT';
580
581 l_proposed_status VARCHAR2(30) := p_proposed_status;
582 l_proposed_ret_sts VARCHAR2(6) := p_proposed_ret_sts;
583 l_final_status VARCHAR2(30);
584 l_final_ret_sts VARCHAR2(6);
585
586 COMPLETE_FAILED EXCEPTION;
587 l_update_context BOOLEAN := FALSE;
588 l_message VARCHAR2(2048) := p_message;
589 l_status VARCHAR2(30);
590 l_workers_assigned NUMBER(15);
591
592 k NUMBER;
593 l_return_status VARCHAR2(6);
594 l_return_msg VARCHAR2(2048);
595 BEGIN
596 fnd_oam_debug.log(2, l_ctxt, 'ENTER');
597
598 -- always lock the unit since we have to decrement the worker count
599 SELECT unit_status, workers_assigned
600 INTO l_status, l_workers_assigned
601 FROM fnd_oam_dscram_units
602 WHERE unit_id = p_unit_id
603 FOR UPDATE;
604
605 --first take care of our child work items that need concurrent completion (DMLs, PLSQLs, ...)
606 IF px_work_queue_to_complete IS NOT NULL AND px_work_queue_to_complete.COUNT > 0 THEN
607 BEGIN
608 --if we're the last worker and we think all work suceeded so far, update the un-set, writable args of all completed
609 --work items
610 IF l_workers_assigned = 1 AND
611 l_proposed_ret_sts = FND_API.G_RET_STS_SUCCESS THEN
612
613 k := px_work_queue_to_complete.FIRST;
614 WHILE k IS NOT NULL LOOP
615 CASE px_work_queue_to_complete(k).item_type
616 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_DML THEN
617 FND_OAM_DSCRAM_DMLS_PKG.UPDATE_COMP_DML_WRITABLE_ARGS(px_work_queue_to_complete(k).item_id,
618 px_arg_context,
619 b_unit_info.use_splitting,
620 l_proposed_ret_sts,
621 l_return_msg);
622
623 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_PLSQL THEN
624 FND_OAM_DSCRAM_PLSQLS_PKG.UPDATE_COMP_PLS_WRITABLE_ARGS(px_work_queue_to_complete(k).item_id,
625 px_arg_context,
626 b_unit_info.use_splitting,
627 l_proposed_ret_sts,
628 l_return_msg);
629
630 ELSE
631 --attach the error message to the unit, completing the work items with an unknown type won't get far
632 l_message := 'Work Item ID ('||px_work_queue_to_complete(k).item_id||'), invalid work item type: '||px_work_queue_to_complete(k).item_type;
633 fnd_oam_debug.log(6, l_ctxt, l_return_msg);
634 RAISE COMPLETE_FAILED;
635 END CASE;
636
637 --if the called update method failed, exit early
638 IF l_proposed_ret_sts <> FND_API.G_RET_STS_SUCCESS THEN
639 l_proposed_status := FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_ERROR_UNKNOWN;
640 --store the failure message on the item
641 px_work_queue_to_complete(k).item_msg := l_return_msg||px_work_queue_to_complete(k).item_msg;
642 --stop processing the loop
643 EXIT;
644 END IF;
645
646 k := px_work_queue_to_complete.NEXT(k);
647 END LOOP;
648
649 --see if the update worked
650 IF l_proposed_ret_sts = FND_API.G_RET_STS_SUCCESS THEN
651 --mark that the DESTROY_<type>_CACHE_ENTRY methods should update the context with values from each
652 --work item's arg list.
653 l_update_context := TRUE;
654 END IF;
655 END IF;
656
657 --always call work item's complete procedure for each work_item, message is derived from the work item's
658 --entry, which defaults to NULL to use the work item's cache message.
659 k := px_work_queue_to_complete.FIRST;
660 WHILE k IS NOT NULL LOOP
661 CASE px_work_queue_to_complete(k).item_type
662 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_DML THEN
663 FND_OAM_DSCRAM_DMLS_PKG.COMPLETE_DML(px_work_queue_to_complete(k).item_id,
664 l_proposed_ret_sts,
665 px_work_queue_to_complete(k).item_msg,
666 l_workers_assigned,
667 l_return_status,
668 l_return_msg);
669
670 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_PLSQL THEN
671 --since pl/sqls don't write the # of rows processed, they only need to be completed by the last worker
672 IF l_workers_assigned = 1 THEN
673 FND_OAM_DSCRAM_PLSQLS_PKG.COMPLETE_PLSQL(px_work_queue_to_complete(k).item_id,
674 l_proposed_ret_sts,
675 px_work_queue_to_complete(k).item_msg,
676 l_return_status,
677 l_return_msg);
678 END IF;
679 ELSE
680 l_message := 'Work Item ID ('||px_work_queue_to_complete(k).item_id||'), invalid work item type: '||px_work_queue_to_complete(k).item_type;
681 fnd_oam_debug.log(6, l_ctxt, l_message);
682 RAISE COMPLETE_FAILED;
683 END CASE;
684
685 --check if the complete_<entity> succeeded, stop processing if it did
686 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
687 RAISE COMPLETE_FAILED;
688 END IF;
689
690 k := px_work_queue_to_complete.NEXT(k);
691 END LOOP;
692
693 --only manually destroy the cache entities when the work item was successful, otherwise
694 --let the catch-all destroy_cache methods get it
695 fnd_oam_debug.log(1, l_ctxt, 'Proposed Unit(s) Ret Sts: '||l_proposed_ret_sts);
696 IF l_proposed_ret_sts = FND_API.G_RET_STS_SUCCESS THEN
697 --we don't need to clone the context here before execution because the run requires an explicit
698 --set_context to update the real context object.
699 k := px_work_queue_to_complete.FIRST;
700 WHILE k IS NOT NULL LOOP
701 CASE px_work_queue_to_complete(k).item_type
702 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_DML THEN
703 FND_OAM_DSCRAM_DMLS_PKG.DESTROY_DML_CACHE_ENTRY(px_work_queue_to_complete(k).item_id,
704 px_arg_context,
705 l_update_context,
706 l_return_status,
707 l_return_msg);
708
709 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_PLSQL THEN
710 FND_OAM_DSCRAM_PLSQLS_PKG.DESTROY_PLSQL_CACHE_ENTRY(px_work_queue_to_complete(k).item_id,
711 px_arg_context,
712 l_update_context,
713 l_return_status,
714 l_return_msg);
715
716 --skip else case, can't happen, complete would have raised an error.
717 END CASE;
718
719 --if destroy failed, log it in the unit, too late for the work item. don't keep executing
720 --destroys to keep dependent parts of the updated context from being rolled up without prior
721 --work items rolling up correctly. Catch-all destroy_cache methods will take care of cleanup.
722 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
723 l_message := l_return_msg;
724 RAISE COMPLETE_FAILED;
725 END IF;
726
727 k := px_work_queue_to_complete.NEXT(k);
728 END LOOP;
729 END IF;
730 EXCEPTION
731 WHEN OTHERS THEN
732 --if anything went wrong while processing the work units, still complete the unit but with an error
733 l_proposed_status := FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_ERROR_UNKNOWN;
734 l_proposed_ret_sts := FND_API.G_RET_STS_UNEXP_ERROR;
735 END;
736 END IF;
737
738 -- if specified, destroy the work item base caches to keep out leaks
739 IF p_destroy_caches THEN
740 BEGIN
741 --destroy the DML cache
742 FND_OAM_DSCRAM_DMLS_PKG.DESTROY_DML_CACHE(px_arg_context,
743 l_return_status,
744 l_return_msg);
745 IF l_return_status <> FND_API.G_RET_STS_SUCCESS AND
746 NOT FND_OAM_DSCRAM_UTILS_PKG.RUN_IS_NORMAL THEN
747 --only error when failing to destory if we're in a non-normal mode
748 l_message := l_return_msg;
749 RAISE COMPLETE_FAILED;
750 END IF;
751
752 --destroy the PLSQL cache
753 FND_OAM_DSCRAM_PLSQLS_PKG.DESTROY_PLSQL_CACHE(px_arg_context,
754 l_return_status,
755 l_return_msg);
756 IF l_return_status <> FND_API.G_RET_STS_SUCCESS AND
757 NOT FND_OAM_DSCRAM_UTILS_PKG.RUN_IS_NORMAL THEN
758 --only error when failing to destory if we're in a non-normal mode
759 l_message := l_return_msg;
760 RAISE COMPLETE_FAILED;
761 END IF;
762 EXCEPTION
763 WHEN OTHERS THEN
764 --if anything went wrong in the DMLs, still complete the unit, just differently
765 l_proposed_status := FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_ERROR_UNKNOWN;
766 l_proposed_ret_sts := FND_API.G_RET_STS_UNEXP_ERROR;
767 END;
768 END IF;
769
770 -- translate the new_status into a valid final status
771 FND_OAM_DSCRAM_UTILS_PKG.TRANSLATE_COMPLETED_STATUS(l_status,
772 l_workers_assigned,
773 l_proposed_status,
774 l_proposed_ret_sts,
775 l_final_status,
776 l_final_ret_sts);
777 fnd_oam_debug.log(1, l_ctxt, 'Translated status "'||l_proposed_status||'" into "'||l_final_status||'"');
778 fnd_oam_debug.log(1, l_ctxt, 'Translated Execute Ret Sts "'||l_proposed_ret_sts||'" into "'||l_final_ret_sts||'"');
779
780 --if we discovered that we're full, possibly temporarily, just decrement the worker count and leave if we're not the last worker
781 IF l_final_ret_sts = FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_FULL AND l_workers_assigned > 1 THEN
782 UPDATE fnd_oam_dscram_units
783 SET workers_assigned = workers_assigned - 1,
784 last_updated_by = fnd_global.user_id,
785 last_update_login = fnd_global.user_id,
786 last_update_date = SYSDATE
787 WHERE unit_id = p_unit_id;
788 ELSE
789 -- otherwise, update the status and workers_assigned
790 UPDATE fnd_oam_dscram_units
791 SET unit_status = l_final_status,
792 workers_assigned = workers_assigned - 1,
793 last_updated_by = fnd_global.user_id,
794 last_update_login = fnd_global.user_id,
795 last_update_date = SYSDATE
796 WHERE unit_id = p_unit_id;
797
798 --only complete stats if we changed state
799 IF l_final_status <> l_status AND
800 FND_OAM_DSCRAM_UTILS_PKG.STATUS_IS_FINAL(l_final_status) THEN
801
802 FND_OAM_DSCRAM_STATS_PKG.COMPLETE_ENTRY(p_source_object_type => FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_UNIT,
803 p_source_object_id => p_unit_id,
804 p_end_time => SYSDATE,
805 p_postend_status => l_final_status,
806 p_end_message => l_message);
807 END IF;
808 END IF;
809
810 -- commit the completed unit and any completed work items in the autonomous transaction
811 COMMIT;
812
813 --return our computed return status
814 x_final_ret_sts := l_final_ret_sts;
815
816 --after completing the unit, kill the state if we completed the topmost unit, should occur only after
817 --completing all child units.
818 IF p_unit_id = b_unit_info.unit_id THEN
819 b_unit_info.initialized := FALSE;
820
821 --also close the unit fetch cursor
822 IF B_UNITS%ISOPEN THEN
823 CLOSE B_UNITS;
824 END IF;
825 END IF;
826
827 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
828 EXCEPTION
829 WHEN OTHERS THEN
830 x_final_ret_sts := FND_API.G_RET_STS_UNEXP_ERROR;
831 fnd_oam_debug.log(6, l_ctxt, 'Unhandled Exception: (Code('||SQLCODE||'), Message("'||SQLERRM||'"))');
832 --safety rollback
833 ROLLBACK;
834 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
835 END;
836
837 -- Private wrapper for aborted unit executions
838 PROCEDURE COMPLETE_UNIT_IN_ERROR(p_unit_id IN NUMBER,
839 p_proposed_status IN VARCHAR2,
840 p_proposed_ret_sts IN VARCHAR2,
841 p_message IN VARCHAR2,
842 x_final_ret_sts OUT NOCOPY VARCHAR2)
843 IS
844 l_empty_arg_context FND_OAM_DSCRAM_ARGS_PKG.arg_context;
845 l_empty_work_queue ordered_work_queue_type;
846 BEGIN
847 --this is only called from execute_unit, which is only called once on the topmost unit
848 COMPLETE_UNIT(p_unit_id,
849 p_proposed_status,
850 p_proposed_ret_sts,
851 p_message,
852 l_empty_arg_context,
853 TRUE,
854 l_empty_work_queue,
855 x_final_ret_sts);
856 END;
857
858 -- Private wrapper for the AD initialize procedure. Autonomous to keep from
859 -- committing the parent transaction.
860 PROCEDURE INITIALIZE_AD_SPLIT(p_unit_id IN NUMBER,
861 p_object_owner IN VARCHAR2,
862 p_object_name IN VARCHAR2,
863 p_num_workers IN NUMBER,
864 p_batch_size IN NUMBER,
865 x_return_status OUT NOCOPY VARCHAR2,
866 x_return_msg OUT NOCOPY VARCHAR2)
867 IS
868 PRAGMA AUTONOMOUS_TRANSACTION;
869
870 l_ctxt VARCHAR2(60) := PKG_NAME||'INITIALIZE_AD_SPLIT';
871 BEGIN
872 x_return_status := FND_API.G_RET_STS_ERROR;
873 x_return_msg := '';
874
875 fnd_oam_debug.log(1, l_ctxt, 'Using owner.table: '||p_object_owner||'.'||p_object_name);
876
877 --Default the # of workers and batch size from the bundle if they're not defined for the unit.
878 --Values in the bundle are non-null and these fields are required for AD to function in the multi-worker case.
879 AD_PARALLEL_UPDATES_PKG.INITIALIZE_ROWID_RANGE(X_update_type => AD_PARALLEL_UPDATES_PKG.ROWID_RANGE,
880 X_owner => p_object_owner,
881 X_table => p_object_name,
882 X_script => FND_OAM_DSCRAM_UTILS_PKG.MAKE_AD_SCRIPT_KEY(p_unit_id),
883 X_worker_id => FND_OAM_DSCRAM_BUNDLES_PKG.GET_WORKER_ID,
884 X_num_workers => NVL(p_num_workers, FND_OAM_DSCRAM_BUNDLES_PKG.GET_WORKERS_ALLOWED),
885 X_batch_size => NVL(p_batch_size, FND_OAM_DSCRAM_BUNDLES_PKG.GET_BATCH_SIZE),
886 X_debug_level => 0,
887 X_processed_mode => AD_PARALLEL_UPDATES_PKG.PRESERVE_PROCESSED_UNITS);
888 --safety commit
889 COMMIT;
890
891 fnd_oam_debug.log(1, l_ctxt, 'Finished initialize.');
892
893 -- no return code indicating whether it succeeded, exceptions are thrown
894 x_return_status := FND_API.G_RET_STS_SUCCESS;
895 EXCEPTION
896 WHEN OTHERS THEN
897 x_return_msg := 'Unit ID ('||p_unit_id||'), Failed to initialize the AD table splitting infrastructure: (Code('||SQLCODE||'), Message("'||SQLERRM||'"))';
898 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
899 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
900 ROLLBACK;
901 END;
902
903 -- Private wrapper for the AD get next procedure. Autonomous to keep from
904 -- committing the parent transaction. Success is indicated by the "x_rows_found" boolean.
905 PROCEDURE GET_NEXT_AD_RANGE(x_rowid_lbound OUT NOCOPY ROWID,
906 x_rowid_ubound OUT NOCOPY ROWID,
907 x_rows_found OUT NOCOPY BOOLEAN,
908 x_return_status OUT NOCOPY VARCHAR2,
909 x_return_msg OUT NOCOPY VARCHAR2)
910 IS
911 PRAGMA AUTONOMOUS_TRANSACTION;
912
913 l_ctxt VARCHAR2(60) := PKG_NAME||'GET_NEXT_AD_RANGE';
914 BEGIN
915 x_return_status := FND_API.G_RET_STS_ERROR;
916 x_return_msg := '';
917
918 fnd_oam_debug.log(1, l_ctxt, 'Getting next range...');
919
920 AD_PARALLEL_UPDATES_PKG.GET_ROWID_RANGE(X_start_rowid => x_rowid_lbound,
921 X_end_rowid => x_rowid_ubound,
922 X_any_rows => x_rows_found,
923 X_num_rows => NULL, --unused in 120.2
924 X_restart => FALSE); --also unused in 120.2
925 --safety commit
926 COMMIT;
927
928 fnd_oam_debug.log(1, l_ctxt, 'Done.');
929
930 --make sure rows found has a value
931 IF x_rows_found IS NULL THEN
932 x_rows_found := FALSE;
933 END IF;
934
935 -- no return code indicating whether it succeeded, exceptions are thrown
936 x_return_status := FND_API.G_RET_STS_SUCCESS;
937 EXCEPTION
938 WHEN OTHERS THEN
939 x_return_msg := 'Failed to fetch next AD range: (Code('||SQLCODE||'), Message("'||SQLERRM||'"))';
940 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
941 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
942 ROLLBACK;
943 END;
944
945 -- Private wrapper for the AD complete procedure. This AD API also
946 -- calls a commit on the transaction but in this case we want it to commit
947 -- the main transaction with previously executed DMLs to keep the operation and its
948 -- AD metadata atmoic.
949 PROCEDURE COMPLETE_AD_RANGE(p_rowid_ubound IN ROWID,
950 p_rows_processed IN NUMBER)
951
952 IS
953 BEGIN
954 --Note: the rows_processed here is based on how many rows the DMLs actually interacted with, not the number
955 --in the range. This means it can be zero even for a large range.
956 AD_PARALLEL_UPDATES_pkg.PROCESSED_ROWID_RANGE(X_rows_processed => p_rows_processed,
957 X_last_rowid => p_rowid_ubound);
958 END;
959
960 --Wrapper to perform range commit autonomously. Used in non-normal modes to roll back the work
961 --but still progress the AD iterator.
962 PROCEDURE COMPLETE_AD_RANGE_AUTONOMOUSLY(p_rowid_ubound IN ROWID,
963 p_rows_processed IN NUMBER)
964
965 IS
966 PRAGMA AUTONOMOUS_TRANSACTION;
967 BEGIN
968 COMPLETE_AD_RANGE(p_rowid_ubound,
969 p_rows_processed);
970
971 --safety commit
972 COMMIT;
973 END;
974
975 -- Executor called by the various EXECUTE_<TYPE>_UNIT delegates to take care of executing
976 -- a list of work. State is read from the unit package cache. If splitting is enabled,
977 -- all work items in the queue are executed on each segment before a commit. If not, each
978 -- work item is executed serially and a commit is issued between each.
979 PROCEDURE INTERNAL_EXECUTE_WORK_QUEUE(px_work_queue IN OUT NOCOPY ordered_work_queue_type,
980 px_arg_context IN OUT NOCOPY FND_OAM_DSCRAM_ARGS_PKG.arg_context,
981 x_work_queue_to_complete OUT NOCOPY ordered_work_queue_type,
982 x_return_status OUT NOCOPY VARCHAR2,
983 x_return_msg OUT NOCOPY VARCHAR2)
984 IS
985 l_ctxt VARCHAR2(60) := PKG_NAME||'INTERNAL_EXECUTE_WORK_QUEUE';
986
987 l_rowid_lbound ROWID;
988 l_rowid_ubound ROWID;
989 l_rows_found BOOLEAN;
990 l_rows_processed NUMBER;
991 l_max_range_rows_processed NUMBER;
992
993 k NUMBER;
994 l_return_status VARCHAR2(6);
995 l_return_msg VARCHAR2(2048);
996 BEGIN
997 --fnd_oam_debug.log(2, l_ctxt, 'ENTER');
998 x_return_msg := '';
999
1000 --if we've got no work, just return
1001 IF px_work_queue IS NULL OR px_work_queue.COUNT = 0 THEN
1002 x_return_status := FND_API.G_RET_STS_SUCCESS;
1003 fnd_oam_debug.log(1, l_ctxt, 'Work queue empty, returning success.');
1004 RETURN;
1005 END IF;
1006
1007 --work to do, default the status
1008 x_return_status := FND_API.G_RET_STS_ERROR;
1009
1010 --at this point either split the work using AD or execute the dmls directly
1011 IF b_unit_info.use_splitting THEN
1012 --default the work queue to complete to our input queue
1013 x_work_queue_to_complete := px_work_queue;
1014
1015 --initialize the AD Parallel Updates infrastructure
1016 INITIALIZE_AD_SPLIT(b_unit_info.unit_id,
1017 b_unit_info.unit_object_owner,
1018 b_unit_info.unit_object_name,
1019 b_unit_info.actual_workers_allowed,
1020 b_unit_info.batch_size,
1021 l_return_status,
1022 l_return_msg);
1023 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
1024 x_return_status := l_return_status;
1025 x_return_msg := '[Error initializing AD Split]: '||l_return_msg;
1026 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1027 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1028 RETURN;
1029 END IF;
1030
1031 --before beginning the loop, force a check of the unit since the only way a worker could get
1032 --a different set of Work Items to execute than a peer would be if the DML and UNIT were updated.
1033 --don't recurse but force a requery. Only checks root unit, assumed to be in sync with child
1034 --units.
1035 IF NOT VALIDATE_CONTINUED_EXECUTION(TRUE,
1036 FALSE,
1037 l_return_status,
1038 l_return_msg) THEN
1039 x_return_status := l_return_status;
1040 x_return_msg := l_return_msg;
1041 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1042 RETURN;
1043 END IF;
1044
1045 --loop over ranges of rows
1046 LOOP
1047 --ask AD for the next range of rowids
1048 GET_NEXT_AD_RANGE(l_rowid_lbound,
1049 l_rowid_ubound,
1050 l_rows_found,
1051 l_return_status,
1052 l_return_msg);
1053 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
1054 x_return_status := l_return_status;
1055 x_return_msg := l_return_msg;
1056 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1057 RETURN;
1058 END IF;
1059
1060 --if no work, exit the loop
1061 IF NOT l_rows_found THEN
1062 EXIT;
1063 END IF;
1064
1065 -- work found, so execute the work list in the order found
1066 l_max_range_rows_processed := 0;
1067 k := px_work_queue.FIRST;
1068 WHILE k IS NOT NULL LOOP
1069 --choose the proper execution function
1070 CASE px_work_queue(k).item_type
1071 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_DML THEN
1072 FND_OAM_DSCRAM_DMLS_PKG.EXECUTE_DML_ON_RANGE(px_work_queue(k).item_id,
1073 px_arg_context,
1074 l_rowid_lbound,
1075 l_rowid_ubound,
1076 l_rows_processed,
1077 l_return_status,
1078 l_return_msg);
1079
1080 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_PLSQL THEN
1081 --set rows processed to zero sinze pl/sql procedures don't report it
1082 l_rows_processed := 0;
1083 FND_OAM_DSCRAM_PLSQLS_PKG.EXECUTE_PLSQL_ON_RANGE(px_work_queue(k).item_id,
1084 px_arg_context,
1085 l_rowid_lbound,
1086 l_rowid_ubound,
1087 l_return_status,
1088 l_return_msg);
1089
1090 ELSE
1091 x_return_msg := 'Work Item ID('||px_work_queue(k).item_id||'), invalid work item type:'||px_work_queue(k).item_type;
1092 --skip setting the work item's message since this can only be logged at the unit level
1093 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1094 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1095 RETURN;
1096 END CASE;
1097
1098 --check if our work suceeded, if not we quick fail because AD can't
1099 --continue fetching ranges and we can't commit only some work items for a range.
1100 --Nothing needs to be done for the AD infrastructure, the block will be re-tried
1101 --automatically next time.
1102 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
1103 ROLLBACK;
1104 x_return_status := l_return_status;
1105 --attach the failure message to the work item
1106 px_work_queue(k).item_msg := l_return_msg||px_work_queue(k).item_msg;
1107 x_work_queue_to_complete(k).item_msg := l_return_msg||x_work_queue_to_complete(k).item_msg;
1108 -- Although sometimes this message will also be logged with the DML, we pass up the message to get logged
1109 -- with the unit also in case the error occurred before the entity could get into the entity's cache and
1110 -- store such an error message locally.
1111 x_return_msg := 'Work Item with type('||px_work_queue(k).item_type||'),id('||px_work_queue(k).item_id||') failed: '||l_return_msg;
1112 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1113 RETURN;
1114 END IF;
1115
1116 --work item suceeded, see if the number of rows processed was greater than our currently seen max
1117 IF l_rows_processed > l_max_range_rows_processed THEN
1118 l_max_range_rows_processed := l_rows_processed;
1119 END IF;
1120
1121 --fetch the next work item
1122 k := px_work_queue.NEXT(k);
1123 END LOOP;
1124
1125 --if we got here, all the work items must have suceeded, complete the ad range
1126 IF FND_OAM_DSCRAM_UTILS_PKG.RUN_IS_NORMAL THEN
1127 COMPLETE_AD_RANGE(l_rowid_ubound,
1128 l_max_range_rows_processed);
1129 COMMIT;
1130 ELSE
1131 ROLLBACK;
1132 fnd_oam_debug.log(1, l_ctxt, 'Rolling back changes because of non-standard run mode');
1133 --autonomously complete the range to keep AD progressing
1134 COMPLETE_AD_RANGE_AUTONOMOUSLY(l_rowid_ubound,
1135 l_max_range_rows_processed);
1136 END IF;
1137
1138 --before getting the next range, validate the unit and above to make sure
1139 --we should keep working
1140 IF NOT VALIDATE_CONTINUED_EXECUTION(FALSE,
1141 TRUE,
1142 l_return_status,
1143 l_return_msg) THEN
1144 x_return_status := l_return_status;
1145 x_return_msg := l_return_msg;
1146 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1147 RETURN;
1148 END IF;
1149
1150 END LOOP;
1151 fnd_oam_debug.log(1, l_ctxt, 'Done splitting, ad range selection loop exhausted.');
1152 ELSE
1153 --if we're not splitting, just execute each work item serially. Here we call
1154 --the individual execute_<type> APIs so that we can do incremental complete and
1155 --commits after each one. This ensures we save our work as we go.
1156 x_work_queue_to_complete := NULL;
1157 k := px_work_queue.FIRST;
1158 WHILE k IS NOT NULL LOOP
1159 --choose the proper execution function, entity stats are created and completed within these functions
1160 CASE px_work_queue(k).item_type
1161 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_DML THEN
1162 FND_OAM_DSCRAM_DMLS_PKG.EXECUTE_DML(px_work_queue(k).item_id,
1163 px_arg_context,
1164 l_return_status,
1165 l_return_msg);
1166
1167 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_PLSQL THEN
1168 FND_OAM_DSCRAM_PLSQLS_PKG.EXECUTE_PLSQL(px_work_queue(k).item_id,
1169 px_arg_context,
1170 l_return_status,
1171 l_return_msg);
1172
1173 ELSE
1174 x_return_msg := 'Work Item ID('||px_work_queue(k).item_id||'), invalid work item type:'||px_work_queue(k).item_type;
1175 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1176 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1177 RETURN;
1178 END CASE;
1179
1180 --see if our work suceeded or not
1181 IF l_return_status = FND_API.G_RET_STS_SUCCESS THEN
1182 --destroy the work item's cache entry exp to get its arg list pushed into the arg context
1183 CASE px_work_queue(k).item_type
1184 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_DML THEN
1185 FND_OAM_DSCRAM_DMLS_PKG.DESTROY_DML_CACHE_ENTRY(px_work_queue(k).item_id,
1186 px_arg_context,
1187 TRUE,
1188 l_return_status,
1189 l_return_msg);
1190
1191 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_TYPE_PLSQL THEN
1192 FND_OAM_DSCRAM_PLSQLS_PKG.DESTROY_PLSQL_CACHE_ENTRY(px_work_queue(k).item_id,
1193 px_arg_context,
1194 TRUE,
1195 l_return_status,
1196 l_return_msg);
1197
1198 END CASE;
1199
1200 --see if the destroy worked
1201 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
1202 ROLLBACK;
1203 x_return_status := l_return_status;
1204 --attach the failure message to the work item
1205 px_work_queue(k).item_msg := l_return_msg||px_work_queue(k).item_msg;
1206 --now return the error to the unit
1207 x_return_msg := 'Work Item with type('||px_work_queue(k).item_type||'),id('||px_work_queue(k).item_id||') failed: '||l_return_msg;
1208 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1209 RETURN;
1210 END IF;
1211
1212 --at this point, the work item was sucessfully executed and its cache entry was removed, commit the work
1213 --on success, we can commit the completed DML locally (unless we're in a non-normal mode)
1214 --to mark our incremental success since we're the only worker.
1215 IF FND_OAM_DSCRAM_UTILS_PKG.RUN_IS_NORMAL THEN
1216 COMMIT;
1217 ELSE
1218 fnd_oam_debug.log(1, l_ctxt, 'Rolling back changes because of non-standard run mode');
1219 ROLLBACK;
1220 END IF;
1221 ELSE
1222 --the work failed, don't continue executing other work units in this case to cover
1223 --for cross-unit dependencies.
1224 ROLLBACK;
1225
1226 --now return the error
1227 x_return_status := l_return_status;
1228 --also push up the message so it shows up in a list of units
1229 x_return_msg := 'Work Item with type('||px_work_queue(k).item_type||'),id('||px_work_queue(k).item_id||') failed: '||l_return_msg;
1230 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1231 RETURN;
1232 END IF;
1233
1234 --before getting the next work item, validate the unit and above to make sure
1235 --we should keep working
1236 IF NOT VALIDATE_CONTINUED_EXECUTION(FALSE,
1237 TRUE,
1238 l_return_status,
1239 l_return_msg) THEN
1240 x_return_status := l_return_status;
1241 x_return_msg := l_return_msg;
1242 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1243 RETURN;
1244 END IF;
1245
1246 --fetch the next work item
1247 k := px_work_queue.NEXT(k);
1248 END LOOP;
1249 END IF;
1250
1251 --if we got here, we're done.
1252
1253 x_return_status := FND_API.G_RET_STS_SUCCESS;
1254 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1255 EXCEPTION
1256 WHEN OTHERS THEN
1257 -- include a safety rollback since this procedure calls other procedures that
1258 -- leave results on the main transaction.
1259 ROLLBACK;
1260 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
1261 x_return_msg := 'Unhandled Exception: [Code('||SQLCODE||'), Message("'||SQLERRM||'")]';
1262 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1263 --fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1264 END;
1265
1266 -- helper to EXECUTE_UNIT to handle the case where the topmost unit is a DML_SET type unit.
1267 -- responsibilities include querying out the dmls for the unit, preparing the structure used by
1268 -- internal_execute_work_queue to execute the dmls, and returning the results to execute_unit.
1269 PROCEDURE EXECUTE_DML_SET_UNIT(px_arg_context IN OUT NOCOPY FND_OAM_DSCRAM_ARGS_PKG.arg_context,
1270 x_work_queue_to_complete OUT NOCOPY ordered_work_queue_type,
1271 x_return_status OUT NOCOPY VARCHAR2,
1272 x_return_msg OUT NOCOPY VARCHAR2)
1273 IS
1274 l_ctxt VARCHAR2(60) := PKG_NAME||'EXECUTE_DML_SET_UNIT';
1275
1276 l_work_queue ordered_work_queue_type;
1277
1278 l_return_status VARCHAR2(6);
1279 l_return_msg VARCHAR2(2048);
1280 BEGIN
1281 fnd_oam_debug.log(2, l_ctxt, 'ENTER');
1282 x_return_status := FND_API.G_RET_STS_ERROR;
1283 x_return_msg := '';
1284
1285 --get the list of dmls as a work queue
1286 FND_OAM_DSCRAM_DMLS_PKG.FETCH_DML_IDS(b_unit_info.unit_id,
1287 l_work_queue,
1288 l_return_status,
1289 l_return_msg);
1290 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
1291 x_return_status := l_return_status;
1292 x_return_msg := l_return_msg;
1293 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1294 RETURN;
1295 END IF;
1296
1297 --execute the work queue
1298 INTERNAL_EXECUTE_WORK_QUEUE(l_work_queue,
1299 px_arg_context,
1300 x_work_queue_to_complete,
1301 x_return_status,
1302 x_return_msg);
1303
1304 --return status of the internal execute
1305 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1306 EXCEPTION
1307 WHEN OTHERS THEN
1308 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
1309 x_return_msg := 'Unhandled Exception: [Code('||SQLCODE||'), Message("'||SQLERRM||'")]';
1310 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1311 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1312 END;
1313
1314 -- helper to EXECUTE_UNIT to handle the case where the topmost unit is a DML_SET type unit.
1315 -- responsibilities include querying out the dmls for the unit, preparing the structure used by
1316 -- internal_execute_work_queue to execute the dmls, and returning the results to execute_unit.
1317 PROCEDURE EXECUTE_PLSQL_SET_UNIT(px_arg_context IN OUT NOCOPY FND_OAM_DSCRAM_ARGS_PKG.arg_context,
1318 x_work_queue_to_complete OUT NOCOPY ordered_work_queue_type,
1319 x_return_status OUT NOCOPY VARCHAR2,
1320 x_return_msg OUT NOCOPY VARCHAR2)
1321 IS
1322 l_ctxt VARCHAR2(60) := PKG_NAME||'EXECUTE_PLSQL_SET_UNIT';
1323
1324 l_work_queue ordered_work_queue_type;
1325
1326 l_return_status VARCHAR2(6);
1327 l_return_msg VARCHAR2(2048);
1328 BEGIN
1329 fnd_oam_debug.log(2, l_ctxt, 'ENTER');
1330 x_return_status := FND_API.G_RET_STS_ERROR;
1331 x_return_msg := '';
1332
1333 --get the list of plsqls as a work queue
1334 FND_OAM_DSCRAM_PLSQLS_PKG.FETCH_PLSQL_IDS(b_unit_info.unit_id,
1335 l_work_queue,
1336 l_return_status,
1337 l_return_msg);
1338 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
1339 x_return_status := l_return_status;
1340 x_return_msg := l_return_msg;
1341 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1342 RETURN;
1343 END IF;
1344
1345 --execute the work queue
1346 INTERNAL_EXECUTE_WORK_QUEUE(l_work_queue,
1347 px_arg_context,
1348 x_work_queue_to_complete,
1349 x_return_status,
1350 x_return_msg);
1351
1352 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1353 EXCEPTION
1354 WHEN OTHERS THEN
1355 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
1356 x_return_msg := 'Unhandled Exception: [Code('||SQLCODE||'), Message("'||SQLERRM||'")]';
1357 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1358 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1359 END;
1360
1361 -- standard work item comparator, -1 if left scheduled earlier than right, 0 if same, 1 if right scheduled earlier.
1362 -- takes into account the priority and then the weight with priority overriding weight. Null priorites come last,
1363 -- Null weights come first.
1364 FUNCTION COMPARE_WORK_ITEMS(p_left IN OUT NOCOPY work_item_type,
1365 p_right IN OUT NOCOPY work_item_type)
1366 RETURN INTEGER
1367 IS
1368 BEGIN
1369 -- priorities are sorted in an ascending order. null priorities are after non-null
1370 IF p_left.priority < p_right.priority THEN
1371 RETURN -1;
1372 ELSIF p_left.priority > p_right.priority THEN
1373 RETURN 1;
1374 ELSIF p_left.priority IS NOT NULL AND p_right.priority IS NULL THEN
1375 RETURN -1;
1376 ELSIF p_left.priority IS NULL AND p_right.priority IS NOT NULL THEN
1377 RETURN 1;
1378 ELSE
1379 --priorities either both null or both not null and equal, move on to weight which is sorted in a descending order.
1380 --null weights are before non-null
1381 IF p_left.weight > p_right.weight THEN
1382 RETURN -1;
1383 ELSIF p_left.weight < p_right.weight THEN
1384 RETURN 1;
1385 ELSIF p_left.weight IS NULL AND p_right.weight IS NOT NULL THEN
1386 RETURN -1;
1387 ELSIF p_left.weight IS NOT NULL AND p_right.weight IS NULL THEN
1388 RETURN 1;
1389 ELSE
1390 --when priorities and weights are the same, they're comparatively the same
1391 RETURN 0;
1392 END IF;
1393 END IF;
1394 END;
1395
1396 -- Private debug procedure
1397 PROCEDURE PRINT_WORK_QUEUE(p_work_queue IN OUT NOCOPY ordered_work_queue_type)
1398 IS
1399 l_ctxt VARCHAR2(60) := PKG_NAME||'PRINT_WORK_QUEUE';
1400
1401 k NUMBER;
1402 BEGIN
1403 k := p_work_queue.FIRST;
1404 WHILE k IS NOT NULL LOOP
1405 fnd_oam_debug.log(1, l_ctxt, 'Q['||k||']: (P:'||p_work_queue(k).priority||')(W:'||p_work_queue(k).weight||') '||p_work_queue(k).item_id||'('||p_work_queue(k).item_type||')');
1406 k := p_work_queue.NEXT(k);
1407 END LOOP;
1408 END;
1409
1410 -- Private helper to EXECUTE_UNIT to handle the case where the topmost unit is a concurrent group meta unit.
1411 -- Responsibilities include querying out all child units, constructing an ordered work queue from the unit priority,
1412 -- work item priority and work item weight and calling INTERNAL_EXECUTE_WORK_QUEUE.
1413 -- The only fields we interact with in a child unit are: unit_id, task_id, concurrent_group_unit_id, unit_type, priority and weight.
1414 -- All other fields are left untouched and unread. Of the above, only the unit_id, unit type and priority are used in code,
1415 -- the rest participate only in the child unit select.
1416 PROCEDURE EXECUTE_CONC_GROUP_UNIT(px_arg_context IN OUT NOCOPY FND_OAM_DSCRAM_ARGS_PKG.arg_context,
1417 x_work_queue_to_complete OUT NOCOPY ordered_work_queue_type,
1418 x_return_status OUT NOCOPY VARCHAR2,
1419 x_return_msg OUT NOCOPY VARCHAR2)
1420 IS
1421 l_ctxt VARCHAR2(60) := PKG_NAME||'EXECUTE_CONC_GROUP_UNIT';
1422
1423 l_master_pos NUMBER;
1424 l_master_pos_finished_ubound NUMBER := 0;
1425 l_master_work_queue ordered_work_queue_type := ordered_work_queue_type();
1426 l_work_queue ordered_work_queue_type;
1427
1428 l_unit_ids DBMS_SQL.NUMBER_TABLE;
1429 l_types DBMS_SQL.VARCHAR2_TABLE;
1430 l_priorities DBMS_SQL.NUMBER_TABLE;
1431
1432 l_priority NUMBER;
1433 l_last_unit_priority NUMBER;
1434
1435 j NUMBER;
1436 k NUMBER;
1437 m NUMBER;
1438 l_return_status VARCHAR2(6);
1439 l_return_msg VARCHAR2(2048);
1440 BEGIN
1441 fnd_oam_debug.log(2, l_ctxt, 'ENTER');
1442 x_return_status := FND_API.G_RET_STS_ERROR;
1443 x_return_msg := '';
1444
1445 --first we need to fetch the component units of this meta-unit
1446 SELECT unit_id, unit_type, priority
1447 BULK COLLECT INTO l_unit_ids, l_types, l_priorities
1448 FROM fnd_oam_dscram_units
1449 WHERE task_id = FND_OAM_DSCRAM_TASKS_PKG.GET_TASK_ID
1450 AND concurrent_group_unit_id = b_unit_info.unit_id
1451 ORDER BY priority ASC, weight DESC;
1452
1453 --now go through the unit list to create the master work queue
1454 k := l_unit_ids.FIRST;
1455 WHILE k IS NOT NULL LOOP
1456 --based on the type, call a different API to get the work queue for this child unit
1457 CASE l_types(k)
1458 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_UNIT_TYPE_DML_SET THEN
1459 FND_OAM_DSCRAM_DMLS_PKG.FETCH_DML_IDS(l_unit_ids(k),
1460 l_work_queue,
1461 l_return_status,
1462 l_return_msg);
1463 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_UNIT_TYPE_PLSQL_SET THEN
1464 FND_OAM_DSCRAM_PLSQLS_PKG.FETCH_PLSQL_IDS(l_unit_ids(k),
1465 l_work_queue,
1466 l_return_status,
1467 l_return_msg);
1468 ELSE
1469 --unknown unit type
1470 x_return_msg := 'Unknown unit type: '||l_types(k);
1471 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1472 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1473 RETURN;
1474 END CASE;
1475
1476 --see if the fetch suceeded, if not quick exit
1477 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
1478 x_return_status := l_return_status;
1479 x_return_msg := l_return_msg;
1480 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1481 RETURN;
1482 END IF;
1483
1484 --debug
1485 IF fnd_oam_debug.test(1) THEN
1486 PRINT_WORK_QUEUE(l_work_queue);
1487 END IF;
1488
1489 --merge the work items into the master ordered queue
1490 l_priority := l_priorities(k);
1491 IF l_master_work_queue.COUNT = 0 THEN
1492 fnd_oam_debug.log(1, l_ctxt, 'Replace');
1493 --master's empty, just replace it
1494 l_master_work_queue := l_work_queue;
1495 ELSE
1496 --if this unit has a later priority, just append
1497 IF ((l_last_unit_priority IS NOT NULL AND l_priority IS NULL) OR
1498 (l_last_unit_priority < l_priority)) THEN
1499
1500 fnd_oam_debug.log(1, l_ctxt, 'Append');
1501 --since we're traversing the child units in order, set our finished_ubound to
1502 --the current end point, we can't find later units that need to insert before this point
1503 l_master_pos_finished_ubound := l_master_work_queue.COUNT;
1504
1505 --extend our queue by N entries
1506 l_master_work_queue.EXTEND(l_work_queue.COUNT);
1507
1508 --insert each entry
1509 j := l_work_queue.FIRST;
1510 WHILE j IS NOT NULL LOOP
1511 l_master_work_queue(l_master_pos_finished_ubound + j) := l_work_queue(j);
1512 j := l_work_queue.NEXT(j);
1513 END LOOP;
1514 ELSE
1515 --this is the ugly part where we have to merge the work items of two or more units
1516 --with the same unit priority. Since there's no nice way of doing an insert, proceed from the
1517 --tail of both queues to minimize number of bumps from an in-list insert. Since the new work
1518 --queue is also sorted, we only need to perform one pass on both lists. Also, all entries before
1519 --the finishined_ubound are off limits since they are in an earlier unit priority.
1520 l_master_pos := l_master_work_queue.COUNT;
1521 j := l_work_queue.LAST;
1522 fnd_oam_debug.log(1, l_ctxt, 'Merge: finished_ubound('||l_master_pos_finished_ubound||')');
1523 WHILE j IS NOT NULL LOOP
1524 fnd_oam_debug.log(1, l_ctxt, 'Item('||j||'), initial master_pos('||l_master_pos||')');
1525 --loop till we find that the work item at master_pos is less than the current item, then set our pos
1526 --to be one past this previous item, stop when we go past the logical(possibly real) end of the master queue
1527 WHILE l_master_pos > l_master_pos_finished_ubound LOOP
1528 IF COMPARE_WORK_ITEMS(l_master_work_queue(l_master_pos),
1529 l_work_queue(j)) <= 0 THEN
1530 l_master_pos := l_master_pos + 1;
1531 fnd_oam_debug.log(1, l_ctxt, 'Found position: '||l_master_pos);
1532 EXIT;
1533 END IF;
1534 l_master_pos := l_master_work_queue.PRIOR(l_master_pos);
1535 END LOOP;
1536
1537 --if we didn't determine a spot, master pos must be reset to the first available slot
1538 IF l_master_pos <= l_master_pos_finished_ubound THEN
1539 l_master_pos := l_master_pos_finished_ubound + 1;
1540 END IF;
1541 fnd_oam_debug.log(1, l_ctxt, 'Final position: '||l_master_pos);
1542
1543 --insert the work queue's item at l_master_pos by first moving everything from pos to count up one index
1544 m := l_master_work_queue.COUNT;
1545 l_master_work_queue.EXTEND(1);
1546 WHILE m >= l_master_pos LOOP
1547 l_master_work_queue(m+1) := l_master_work_queue(m);
1548 m := l_master_work_queue.PRIOR(m);
1549 END LOOP;
1550 --and then inserting the object at this point
1551 l_master_work_queue(l_master_pos) := l_work_queue(j);
1552
1553 --since we know the work queue is sorted, skip comparing the next item to this position.
1554 --move the master_pos down one if we can
1555 IF l_master_pos > l_master_pos_finished_ubound+1 THEN
1556 l_master_pos := l_master_pos - 1;
1557 END IF;
1558
1559 j := l_work_queue.PRIOR(j);
1560 END LOOP;
1561 END IF;
1562 END IF;
1563
1564 l_last_unit_priority := l_priority;
1565
1566 k := l_unit_ids.NEXT(k);
1567 END LOOP;
1568
1569 --debug
1570 IF fnd_oam_debug.test(1) THEN
1571 fnd_oam_debug.log(1, l_ctxt, 'Master Work Queue:');
1572 PRINT_WORK_QUEUE(l_master_work_queue);
1573 END IF;
1574
1575 --at this point we have the finished work queue, just execute it
1576 INTERNAL_EXECUTE_WORK_QUEUE(l_master_work_queue,
1577 px_arg_context,
1578 x_work_queue_to_complete,
1579 x_return_status,
1580 x_return_msg);
1581
1582 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1583 EXCEPTION
1584 WHEN OTHERS THEN
1585 x_return_status := FND_API.G_RET_STS_UNEXP_ERROR;
1586 x_return_msg := 'Unhandled Exception: [Code('||SQLCODE||'), Message("'||SQLERRM||'")]';
1587 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1588 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1589 END;
1590
1591 -- Public, called once for the topmost unit
1592 PROCEDURE EXECUTE_UNIT(p_unit_id IN NUMBER,
1593 x_return_status OUT NOCOPY VARCHAR2,
1594 x_return_msg OUT NOCOPY VARCHAR2)
1595 IS
1596 l_ctxt VARCHAR2(60) := PKG_NAME||'EXECUTE_UNIT';
1597
1598 l_completed_status VARCHAR2(30);
1599 l_arg_context FND_OAM_DSCRAM_ARGS_PKG.arg_context;
1600
1601 l_work_queue_to_complete ordered_work_queue_type;
1602
1603 l_return_status VARCHAR2(6);
1604 l_return_msg VARCHAR2(2048);
1605 l_ignore VARCHAR2(2048);
1606 BEGIN
1607 fnd_oam_debug.log(2, l_ctxt, 'ENTER');
1608 x_return_status := FND_API.G_RET_STS_ERROR;
1609 x_return_msg := '';
1610
1611 --make sure the unit's ok to start execution
1612 IF NOT VALIDATE_START_EXECUTION(p_unit_id,
1613 l_return_status,
1614 l_return_msg) THEN
1615 x_return_status := l_return_status;
1616 x_return_msg := '[Unit Validation Failed]:('||l_return_msg||')';
1617 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1618 RETURN;
1619 END IF;
1620
1621 -- attempt to assign this invocation as a worker for the unit
1622 ASSIGN_WORKER_TO_UNIT(p_unit_id,
1623 l_return_status,
1624 l_return_msg);
1625 IF l_return_status <> FND_API.G_RET_STS_SUCCESS THEN
1626 x_return_status := l_return_status;
1627 IF l_return_status <> FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_FULL THEN
1628 x_return_msg := '[Unit Worker Assignment Failed]:('||l_return_msg||')';
1629 fnd_oam_debug.log(1, l_ctxt, x_return_msg);
1630 END IF;
1631 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1632 RETURN;
1633 END IF;
1634
1635 --before proceeding after the assign, check our parent objects to make sure
1636 --their state suggests we should continue
1637 IF NOT FND_OAM_DSCRAM_TASKS_PKG.VALIDATE_CONTINUED_EXECUTION(FALSE,
1638 TRUE,
1639 l_return_status,
1640 l_return_msg) THEN
1641 --we don't care why a parent is invalid, just knowing so forces us to
1642 --stop our work
1643 COMPLETE_UNIT_IN_ERROR(b_unit_info.unit_id,
1644 FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_STOPPED,
1645 FND_OAM_DSCRAM_UTILS_PKG.G_RET_STS_STOPPED,
1646 l_return_msg,
1647 x_return_status);
1648 x_return_msg := '[Post-Assignment Parent Validation Failed]:('||l_return_msg||')';
1649 fnd_oam_debug.log(1, l_ctxt, x_return_msg);
1650 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1651 RETURN;
1652 END IF;
1653
1654 -- after assign we can start using stuff from the unit_info package state. First, we need to
1655 -- see what type of unit we've got so we delegate to the right subfunction.
1656 l_return_status := FND_API.G_RET_STS_SUCCESS;
1657 fnd_oam_debug.log(1, l_ctxt, 'Executing Unit...');
1658
1659 -- for now, just use the arg context of the run for execution, later we may want to introduce
1660 -- a specific task context that layers on top of the run context that lets units communicate
1661 -- values without affecting other tasks
1662 FND_OAM_DSCRAM_RUNS_PKG.GET_RUN_ARG_CONTEXT(l_arg_context);
1663
1664 -- in the lowest debug level, print the arg context
1665 IF fnd_oam_debug.test(1) THEN
1666 FND_OAM_DSCRAM_ARGS_PKG.PRINT_ARG_CONTEXT(l_arg_context);
1667 END IF;
1668
1669 --delegate the work to different procedures based on the unit type
1670 CASE b_unit_info.unit_type
1671 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_UNIT_TYPE_DML_SET THEN
1672 EXECUTE_DML_SET_UNIT(l_arg_context,
1673 l_work_queue_to_complete,
1674 l_return_status,
1675 l_return_msg);
1676
1677 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_UNIT_TYPE_PLSQL_SET THEN
1678 EXECUTE_PLSQL_SET_UNIT(l_arg_context,
1679 l_work_queue_to_complete,
1680 l_return_status,
1681 l_return_msg);
1682
1683 WHEN FND_OAM_DSCRAM_UTILS_PKG.G_UNIT_TYPE_CONC_GROUP THEN
1684 EXECUTE_CONC_GROUP_UNIT(l_arg_context,
1685 l_work_queue_to_complete,
1686 l_return_status,
1687 l_return_msg);
1688 ELSE
1689 l_return_msg := 'Unhandled Type:'||b_unit_info.unit_type;
1690 fnd_oam_debug.log(6, l_ctxt, l_return_msg);
1691 END CASE;
1692
1693 --determine the status to apply to the unit from the execute's return status
1694 IF l_return_status = FND_API.G_RET_STS_SUCCESS THEN
1695 --unit was sucessful
1696 l_completed_status := FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_PROCESSED;
1697 ELSE
1698 --determine what status the unit should have
1699 l_completed_status := FND_OAM_DSCRAM_UTILS_PKG.CONV_RET_STS_TO_COMPL_STATUS(l_return_status);
1700
1701 --take the fatality level into account to error out parent objects if need be
1702 IF l_completed_status <> FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_PROCESSED AND
1703 b_unit_info.error_fatality_level IS NOT NULL THEN
1704
1705 --update the corresponding parent unit
1706 FND_OAM_DSCRAM_UTILS_PKG.PROPOGATE_FATALITY_LEVEL(b_unit_info.error_fatality_level);
1707
1708 --also change our status to error_fatal
1709 l_completed_status := FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_ERROR_FATAL;
1710 END IF;
1711 END IF;
1712
1713 --finished processing the unit
1714 fnd_oam_debug.log(1, l_ctxt, 'Finished Unit with status: '||l_completed_status||'('||l_return_status||')');
1715 COMPLETE_UNIT(b_unit_info.unit_id,
1716 l_completed_status,
1717 l_return_status,
1718 l_return_msg,
1719 l_arg_context,
1720 TRUE,
1721 l_work_queue_to_complete,
1722 x_return_status);
1723
1724 -- in the lowest debug level, print the arg context
1725 IF fnd_oam_debug.test(1) THEN
1726 FND_OAM_DSCRAM_ARGS_PKG.PRINT_ARG_CONTEXT(l_arg_context);
1727 END IF;
1728
1729 --if sucessfull, set the run arg context to our local context, can't modify object by reference
1730 IF x_return_status = FND_API.G_RET_STS_SUCCESS THEN
1731 FND_OAM_DSCRAM_RUNS_PKG.SET_RUN_ARG_CONTEXT(l_arg_context);
1732 END IF;
1733
1734 fnd_oam_debug.log(2, l_ctxt, 'EXIT');
1735 EXCEPTION
1736 WHEN OTHERS THEN
1737 x_return_msg := 'Unhandled Exception: [Code('||SQLCODE||'), Message("'||SQLERRM||'")]';
1738 fnd_oam_debug.log(6, l_ctxt, x_return_msg);
1739 --safety rollback
1740 ROLLBACK;
1741 COMPLETE_UNIT_IN_ERROR(p_unit_id,
1742 FND_OAM_DSCRAM_UTILS_PKG.G_STATUS_ERROR_UNKNOWN,
1743 FND_API.G_RET_STS_UNEXP_ERROR,
1744 x_return_msg,
1745 x_return_status);
1746 END;
1747
1748 END FND_OAM_DSCRAM_UNITS_PKG;