1 PACKAGE BODY WF_QUEUE AS
2 /* $Header: wfqueb.pls 120.5 2007/06/11 14:09:45 dgadhira ship $ */
3
4 --
5 -- Exceptions
6 --
7 dequeue_timeout exception;
8 pragma EXCEPTION_INIT(dequeue_timeout, -25228);
9
10 dequeue_disabled exception;
11 pragma EXCEPTION_INIT(dequeue_disabled, -25226);
12
13 dequeue_outofseq exception;
14 pragma EXCEPTION_INIT(dequeue_outofseq, -25237);
15
16 no_queue exception;
17 pragma EXCEPTION_INIT(no_queue, -24010);
18
19 shutdown_pending exception;
20
21 no_savepoint exception;
22 pragma EXCEPTION_INIT(no_savepoint, -1086);
23
24 msgid_notfound exception;
25 pragma EXCEPTION_INIT(msgid_notfound, -25263);
26
27 -- Bug 4005674
28 -- private global variables to store the item_type:item_key:actid at dequeue time
29 g_dequeue_item_type varchar2(8);
30 g_dequeue_item_key varchar2(240);
31 g_dequeue_actid number;
32 g_background_begin_date date;
33 g_Key number;
34
35 TYPE ActivityHistoryREC IS RECORD (
36 ITEM_TYPE VARCHAR2(8),
37 ITEM_KEY VARCHAR2(240),
38 ACTID NUMBER,
39 HISTORY_COUNT NUMBER );
40
41 TYPE ActivityHistoryCountTAB IS TABLE OF ActivityHistoryREC INDEX BY BINARY_INTEGER;
42 g_ActivityHistoryCount ActivityHistoryCountTAB;
43
44 -- ====================================================================
45 --
46 -- Private Routine to check for shutdown
47 --
48 -- ====================================================================
49
50 function check_instance return boolean
51 as
52 shutdown varchar2(3);
53 begin
54
55 select shutdown_pending into shutdown from v$instance;
56
57 if shutdown = 'YES' then
58 return(TRUE);
59 else
60 return(FALSE);
61 end if;
62
63 end;
64
65 -- ====================================================================
66 --
67 -- Private Procedure which processes the payload dequeued off the
68 -- Inbound queue .
69 --
70 -- ====================================================================
71
72 -- Process_Inbound_Event (PRIVATE)
73 -- Executes the payload dequeued off the inbound queue
74 -- IN
75 -- itemtype - itemtype,itemkey,actid to uniquely identify the
76 -- itemkey - activity
77 -- actid -
78 -- message_handle - pointer to queue message
79 -- p_payload - the message payload . Lets have it as in/out parameter
80 -- so that if callback (for which it is in/out) changes something
81 -- we can have it.
82
83 procedure Process_Inbound_Event(itemtype in varchar2,
84 itemkey in varchar2,
85 actid in number,
86 message_handle in raw,
87 p_payload in out nocopy system.wf_payload_t)
88 as
89 colon number;
90 status varchar2(30);
91
92 plist varchar2(4000);
93 attrpair varchar2(4000);
94 delimiter number;
95 aname varchar2(40);
96 avalue varchar2(4000);
97 lcorrelation varchar2(80);
98
99 nvalue number; --required but not used by wf_engine.CB
100 dvalue date; --required but not used by wf_engine.CB
101
102 begin
103
104 --process the parameter list.
105 plist:= p_payload.param_list;
106
107 if plist is not null then
108 loop
109 -- if plist is null then EXIT; end if;
110 delimiter:=instr(plist,'^');
111
112 if delimiter = 0 then
113 attrpair:=plist;
114 else
115 attrpair:=substr(plist,1,delimiter-1);
116 end if;
117
118 aname := upper(substr(attrpair,1,instr(attrpair,'=')-1));
119 avalue := substr(attrpair,instr(attrpair,'=')+1);
120
121 begin
122 --Set the value for the attribute
123 wf_engine.SetItemAttrText(itemtype, itemkey,
124 aname, avalue);
125 exception when others then
126 if ( wf_core.error_name = 'WFENG_ITEM_ATTR' ) then
127 --If the attribute does not exist first create it
128 --and then add the value
129 Wf_Engine.AddItemAttr(itemtype, itemkey, aname);
130 Wf_Engine.SetItemAttrText(itemtype, itemkey, aname, avalue);
131 else
132 raise;
133 end if;
134 end;
135
136 exit when delimiter = 0;
137
138 plist := substr(plist,delimiter+1);
139
140 end loop;
141 end if;
142
143 --if payload contains a colon, then its ERROR else its COMPLETE status
144
145 colon:= instr(p_payload.result,':');
146 if colon=0 or p_payload.result is null then
147 -- check if activity is already complete
148 wf_item_activity_status.status(itemtype,itemkey,actid,status);
149 if (status is not null)
150 and (status <> 'COMPLETE') then
151 -- mark activity as Complete:<result>
152 wf_engine.CB(command => 'COMPLETE',
153 context =>itemtype||':'||
154 itemkey ||':'||
155 actid,
156 text_value => p_payload.result,
157 number_value => nvalue,
158 date_value => dvalue);
159 end if;
160 else
161 -- at the moment we only accept :ERROR:<error text> (may add other statuses later)
162 if substr(p_payload.result,colon+1,5) = 'ERROR' then
163
164 begin
165 wf_core.clear;
166 -- set the function name for courtesy.
167 wf_core.token('FUNCTION_NAME',
168 Wf_Activity.activity_function(itemtype,
169 itemkey,actid));
170 wf_core.raise('WF_EXT_FUNCTION');
171 exception when others then null;
172 end;
173 --function name on payload is upto 200 char so use it to record error
174 wf_core.error_stack := p_payload.function_name;
175
176 wf_engine.CB(command => 'ERROR',
177 context =>itemtype||':'||
178 itemkey ||':'||
179 actid,
180 text_value => p_payload.result,
181 number_value => nvalue,
182 date_value => dvalue);
183 end if;
184 end if;
185
186 --If we came successfully till here let us purge off the
187 --data from the Q
188 wf_queue.PurgeEvent(wf_queue.InboundQueue, message_handle, FALSE);
189
190 exception
191 when others then
192 Wf_Core.Context('Wf_Queue', 'Process_Inbound_Event', itemtype,itemkey);
193
194 raise;
195
196 end Process_Inbound_Event;
197
198 -- ====================================================================
199 -- Queue Setup Functions (PUBLIC)
200 -- ====================================================================
201 function DeferredQueue return varchar2
202 as
203 begin
204 if (wf_queue.name_init = FALSE) then
205 wf_queue.set_queue_names;
206 end if;
207 return (wf_queue.deferred_queue_name);
208 exception
209 when others then
210 Wf_Core.Context('Wf_Queue', 'DeferredQueue');
211 raise;
212 end;
213
214 function OutboundQueue return varchar2
215 as
216 begin
217 if (wf_queue.name_init = FALSE) then
218 wf_queue.set_queue_names;
219 end if;
220 return (wf_queue.outbound_queue_name);
221 exception
222 when others then
223 Wf_Core.Context('Wf_Queue', 'OutboundQueue');
224 raise;
225 end;
226
227 function InboundQueue return varchar2
228 as
229 begin
230 if (wf_queue.name_init = FALSE) then
231 wf_queue.set_queue_names;
232 end if;
233 return (wf_queue.inbound_queue_name);
234 exception
235 when others then
236 Wf_Core.Context('Wf_Queue', 'InboundQueue');
237 raise;
238 end;
239
240 -- NAME: Set_queue_names (PRIVATE)
241 -- called once at the beginning of a session to set up queue names
242 -- when AQ supports db synonyms, remove this and use synonyms instead
243 procedure set_queue_names as
244
245 schema_name varchar2(320);
246
247 begin
248
249 --dont bother re-executing this if already initialized.
250 if wf_queue.name_init then
251 return;
252 end if;
253
254 schema_name := wf_core.translate('WF_SCHEMA');
255
256 -- Do not determine account name by STANDALONE vs. EMBEDDED any more
257
258 -- Current_schema is the schema in effect.
259 -- Sys_context is an 8i feature. Below allows us to tag on the
260 -- intended schema whether the install is with invoker's right or
261 -- definer's right (default).
262 begin
263 select sys_context('USERENV', 'CURRENT_SCHEMA')
264 into wf_queue.account_name
265 from sys.dual;
266 exception
267 when OTHERS then
268 wf_queue.account_name := NULL;
269 end;
270
271 wf_queue.deferred_queue_name := schema_name||'.WF_DEFERRED_QUEUE_M';
272 wf_queue.outbound_queue_name := schema_name||'.WF_OUTBOUND_QUEUE';
273 wf_queue.inbound_queue_name := schema_name||'.WF_INBOUND_QUEUE';
274 wf_queue.name_init := TRUE;
275 exception
276 when others then
277 Wf_Core.Context('Wf_Queue', 'Set_queue_names');
278 raise;
279 end set_queue_names;
280
281
282 -- ====================================================================
283 -- Public routines
284 -- ====================================================================
285
286 -- NAME: PurgeEvent
287 -- removes the event from the specified queue WITHOUT PROCESSING
288 -- queuename - the queue to purge
289 -- message_handle - the specific event to purge
290 --
291 procedure PurgeEvent(queuename in varchar2,
292 message_handle in raw,
293 multiconsumer in boolean default FALSE) as
294
295 event system.wf_payload_t;
296 dequeue_options dbms_aq.dequeue_options_t;
297 message_properties dbms_aq.message_properties_t;
298 msg_id raw(16);
299 begin
300 if message_handle is not null then
301
302 dequeue_options.dequeue_mode := dbms_aq.REMOVE;
303 dequeue_options.msgid := message_handle;
304 dequeue_options.wait := dbms_aq.NO_WAIT;
305 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
306
307 -- check if we need to have a consumer
308 if (multiconsumer) then
309 dequeue_options.consumer_name := wf_queue.account_name;
310 end if;
311
312 dbms_aq.dequeue
313 (
314 queue_name => queuename,
315 dequeue_options => dequeue_options,
316 message_properties => message_properties,
317 payload => event,
318 msgid => msg_id
319 );
320
321 end if;
322
323 exception
324 when dequeue_timeout then
325 null; -- not found on queue so must already be removed.
326
327 when msgid_notfound then
328 null; -- Already purged from the queue.
329
330 when others then
331 Wf_Core.Context('Wf_Queue', 'PurgeEvent', queuename,
332 rawtohex(message_handle));
333 raise;
334
335 end PurgeEvent;
336
337 -- NAME: PurgeItemtype
338 -- removes all events belonging to an itemtype from the specified queue
339 -- ** WARNING ** IT DOES NOT PROCESS THE EVENT
340 -- queuename - the queue to purge
341 -- itemtype - the itemtype to purge
342 --
343 procedure PurgeItemtype(queuename in varchar2,
344 itemtype in varchar2 default null,
345 correlation in varchar2 default null )
346 as
347 event system.wf_payload_t;
348 dequeue_options dbms_aq.dequeue_options_t;
349 message_properties dbms_aq.message_properties_t;
350 msg_id raw(16);
351
352 begin
353 dequeue_options.dequeue_mode := dbms_aq.REMOVE;
354 dequeue_options.wait := dbms_aq.NO_WAIT;
355 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
356 wf_queue.set_queue_names;
357
358 if correlation is not null then
359 dequeue_options.correlation := correlation;
360 else
361 dequeue_options.correlation := wf_queue.account_name||nvl(itemtype,'%');
362 end if;
363
364 LOOP
365 dbms_aq.dequeue
366 (
367 queue_name => queuename,
368 dequeue_options => dequeue_options,
369 message_properties => message_properties,
370 payload => event,
371 msgid => msg_id
372 );
373
374 END LOOP;
375
376 exception
377 when dequeue_timeout then
378 null; -- nothing left on queue to remove
379 when others then
380 Wf_Core.Context('Wf_Queue', 'PurgeItemtype', queuename, itemtype);
381 raise;
382 end PurgeItemtype;
383
384 -- ProcessDeferredEvent (PRIVATE)
385 -- Executes the event payload dequeued off the deferred queue
386 -- IN
387 -- itemtype - itemtype,itemkey,actid to uniquely identify the
388 -- itemkey - activity
389 -- actid -
390 -- message_handle - pointer to queue message
391 -- minthreshold - threshold levels of the background engine
392 -- maxthreshold
393 --
394 procedure ProcessDeferredEvent(itemtype in varchar2,
395 itemkey in varchar2,
396 actid in number,
397 message_handle in raw,
398 minthreshold in number,
399 maxthreshold in number)
400 as
401 begin
402 Wf_Item_Activity_Status.Create_Status(itemtype, itemkey, actid,
403 wf_engine.eng_active, null, null, null);
404
405 -- Continue processing on activity
406 begin
407
408 begin
409 begin
410
411 savepoint wf_savepoint;
412
413 Wf_Engine_Util.Process_Activity(itemtype, itemkey, actid,
414 maxthreshold, TRUE);
415
416 -- we successfully processed the activity so dequeue it.
417 wf_queue.PurgeEvent(wf_queue.DeferredQueue, message_handle, TRUE);
418
419
420 Exception
421 when others then
422 -- In the unlikely event this process thread raises an exception:
423 -- 1. rollback any work in this process thread
424 -- raise an error for the next excption handler to complete
425 -- remaining steps.
426
427 rollback to wf_savepoint;
428 raise;
429 end;
430 exception
431 when NO_SAVEPOINT then
432 -- Catch any savepoint error in case of a commit happened.
433 Wf_Core.Token('ACTIVITY', Wf_Engine.GetActivityLabel(actid));
434 Wf_Core.Raise('WFENG_COMMIT_IN_PROCESS');
435 end;
436 exception
437 when OTHERS then
438 -- Remaining steps for proces thread raises an exception:
439 -- 2. set this activity to error status
440 -- 3. execute the error process (if any)
441 -- 4. clear the error to continue with next activity
442 -- **note the error stack will refer to the actid that has been
443 -- rolled back!
444 Wf_Core.Context('Wf_Queue', 'ProcessDeferredEvent', itemtype,
445 to_char(minthreshold), to_char(maxthreshold));
446 Wf_Item_Activity_Status.Set_Error(itemtype, itemkey, actid,
447 wf_engine.eng_exception, FALSE);
448 Wf_Engine_Util.Execute_Error_Process(itemtype, itemkey,
449 actid, wf_engine.eng_exception);
450 Wf_Core.Clear;
451 end;
452
453 -- Commit work to insure this activity thread doesn't interfere
454 -- with others.
455 commit;
456
457 Fnd_Concurrent.Set_Preferred_RBS;
458
459 exception
460 when others then
461 Wf_Core.Context('Wf_Queue', 'ProcessDeferredEvent', itemtype,
462 to_char(minthreshold), to_char(maxthreshold));
463 raise;
464 end ProcessDeferredEvent;
465
466
467 --Name: EnqueueInbound (PUBLIC)
468 --Enqueues the result from an outbound event onto
469 --the inbound queue. Wf will mark this as complete with the
470 --given result when it processes the queue.
471
472 procedure EnqueueInbound(
473 itemtype in varchar2,
474 itemkey in varchar2,
475 actid in number,
476 result in varchar2 default null,
477 attrlist in varchar2 default null,
478 correlation in varchar2 default null,
479 error_stack in varchar2 default null)
480
481 as
482 handle raw(16);
483 lcorrelation varchar2(80);
484 lresult varchar2(30);
485 begin
486
487
488 if correlation is not null then
489 lcorrelation := correlation;
490 else
491 wf_queue.set_queue_names;
492 lcorrelation := wf_queue.account_name||itemtype;
493 end if;
494
495 -- if error stack is defined then set result to ERROR.
496 if error_stack is null then
497 lresult := result;
498 else
499 lresult := ':ERROR';
500 end if;
501
502
503 wf_queue.Enqueue_Event(queuename =>wf_queue.InboundQueue,
504 itemtype =>enqueueInbound.itemtype,
505 itemkey =>enqueueInbound.itemkey,
506 actid =>enqueueInbound.actid,
507 funcname =>enqueueInbound.error_stack,
508 correlation =>lcorrelation,
509 paramlist =>enqueueInbound.attrlist,
510 result =>lresult,
511 message_handle =>handle);
512 exception
513 when others then
514 Wf_Core.Context('Wf_Queue', 'EnqueueInbound', itemtype,
515 itemkey, actid);
516 raise;
517 end EnqueueInbound;
518
519
520 function Get_param_list (itemtype in varchar2,
521 itemkey in varchar2,
522 actid in number) return varchar2
523
524 as
525
526 startdate date;
527 paramlist varchar2(4000);
528 lvalue varchar2(4000);
529
530 cursor attr_list is
531 select aa.name,
532 aa.value_type, -- CONSTANT or ITEMATTR
533 aa.type, -- NUMBER/TEXT/DATE etc
534 aa.format,
535 av.TEXT_VALUE,
536 av.NUMBER_VALUE,
537 av.DATE_VALUE
538 from wf_activity_attr_values av,
539 wf_activity_attributes aa,
540 wf_activities a,
541 wf_process_activities pa
542 where pa.activity_item_type = a.item_type
543 and pa.activity_name = a.name
544 and pa.instance_id=actid
545 and a.begin_date< startdate and nvl(a.end_date,startdate) >= startdate
546 and a.item_type = aa.activity_item_type
547 and a.name = aa.activity_name
548 and a.version = aa.activity_version
549 and av.process_activity_id = actid
550 and av.name=aa.name
551 order by aa.sequence;
552
553 begin
554 paramlist:=null;
555 startdate:=wf_item.active_date(itemtype,itemkey);
556
557 for attr_row in attr_list loop
558 if (attr_row.value_type = 'ITEMATTR' and
559 attr_row.text_value is not null) then
560 -- null itemattr text_value means null value, not an error
561 lvalue := wf_engine.GetItemAttrText(itemtype,itemkey,
562 attr_row.text_value);
563 else --must be CONSTANT
564 if (attr_row.type = 'NUMBER') then
565 if (attr_row.format is null) then
566 lvalue := to_char(attr_row.NUMBER_VALUE);
567 else
568 lvalue := to_char(attr_row.NUMBER_VALUE, attr_row.format);
569 end if;
570 elsif (attr_row.type = 'DATE') then
571 if (attr_row.format is null) then
572 lvalue := to_char(attr_row.DATE_VALUE);
573 else
574 lvalue := to_char(attr_row.DATE_VALUE, attr_row.format);
575 end if;
576 else
577 lvalue := attr_row.text_value;
578 end if;
579 end if;
580
581 if paramlist is not null then
582 -- Overflow, cannot hold anymore attributes.
583 if (lengthb(paramlist||'^') > 4000) then
584 exit;
585 else
586 paramlist := paramlist||'^';
587 end if;
588 end if;
589
590 if (lengthb(paramlist||attr_row.name||'='||lvalue) > 4000) then
591 -- Overflow, cannot hold anymore attributes.
592 paramlist:=substrb(paramlist||attr_row.name||'='||lvalue, 1, 4000);
593 exit;
594 else
595 paramlist := paramlist||attr_row.name||'='||lvalue;
596 end if;
597 end loop;
598
599 return(paramlist);
600
601 exception
602 when others then
603 Wf_Core.Context('Wf_Queue', 'Get_param_list', itemtype,
604 itemkey, actid);
605 raise;
606 end Get_param_list;
607
608
609
610 --Name: DequeueOutbound (PUBLIC)
611
612 procedure DequeueOutbound(
613 dequeuemode in number,
614 navigation in number default 1,
615 correlation in varchar2 default null,
616 itemtype in varchar2 default null,
617 payload out NOCOPY system.wf_payload_t,
618 message_handle in out NOCOPY raw,
619 timeout out NOCOPY boolean)
620
621 as
622 lcorrelation varchar2(80);
623 begin
624 wf_queue.set_queue_names;
625
626 if correlation is not null then
627 lcorrelation := correlation;
628 else
629 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
630 end if;
631
632 wf_queue.Dequeue_Event(queuename =>wf_queue.OutboundQueue,
633 dequeuemode =>DequeueOutbound.dequeuemode,
634 navigation =>DequeueOutbound.navigation,
635 correlation =>lcorrelation,
636 payload =>DequeueOutbound.payload,
637 message_handle =>DequeueOutbound.message_handle,
638 timeout =>DequeueOutbound.timeout);
639
640
641
642 exception
643 when others then
644 Wf_Core.Context('Wf_Queue', 'DequeueOutbound', payload.itemtype,
645 payload.itemkey, payload.actid);
646 raise;
647
648 end DequeueOutbound;
649
650 --Name: DequeueEventDetail (PUBLIC)
651 --
652 --Wrapper to Dequeue_Event in which the payload is EXPanded out to avoid
653 --use of type itemtypes.
654
655 procedure DequeueEventDetail(
656 dequeuemode in number,
657 navigation in number default 1,
658 correlation in varchar2 default null,
659 itemtype in out NOCOPY varchar2,
660 itemkey out NOCOPY varchar2,
661 actid out NOCOPY number,
662 function_name out NOCOPY varchar2,
663 param_list out NOCOPY varchar2,
664 message_handle in out NOCOPY raw,
665 timeout out NOCOPY boolean)
666 as
667 event system.wf_payload_t;
668 lcorrelation varchar2(80);
669 begin
670 wf_queue.set_queue_names;
671
672 --use the correlation or default it if null
673 if DequeueEventDetail.correlation is not null then
674 lcorrelation := DequeueEventDetail.correlation;
675 else
676 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
677 end if;
678
679 -- call dequeue to retrieve the event
680 wf_queue.Dequeue_Event(queuename=>wf_queue.OutboundQueue,
681 dequeuemode=>DequeueEventDetail.dequeuemode,
682 navigation =>DequeueEventDetail.navigation,
683 correlation=>lcorrelation,
684 payload=>event,
685 message_handle=>DequeueEventDetail.message_handle,
686 timeout =>DequeueEventDetail.timeout);
687
688 --expand the payload structure
689 DequeueEventDetail.itemtype:=event.itemtype;
690 DequeueEventDetail.itemkey:=event.itemkey;
691 DequeueEventDetail.actid:=event.actid;
692 DequeueEventDetail.function_name:=event.function_name;
693 DequeueEventDetail.param_list:=event.param_list;
694
695
696 exception
697 when others then
698 Wf_Core.Context('Wf_Queue', 'DequeueEventDetail', itemtype||':'||itemkey,to_char(actid));
699 raise;
700
701 end DequeueEventDetail;
702
703
704 --Dequeue_Event (PRIVATE)
705 --
706 --Dequeues an event (message) from any queue
707 --IN
708 -- QueueName - the queue name, may contain owner or database
709 -- DeQueueMode - either 1 (Browse), 2 (Locked) or 3 (Remove)
710 -- Navigation - either First or Next
711 -- Correlation - helps restrict the queue
712 -- Payload - the event actually dequeued
713 -- message_handle - id for the event
714 -- timeout - determines if anything was found or if the q timedout.
715
716 procedure Dequeue_Event(queuename in varchar2,
717 dequeuemode in number,
718 navigation in number default 1,
719 correlation in varchar2 default null,
720 payload out NOCOPY system.wf_payload_t,
721 message_handle in out NOCOPY raw,
722 timeout out NOCOPY boolean,
723 multiconsumer in boolean default FALSE)
724 as
725
726 dequeue_options dbms_aq.dequeue_options_t;
727 message_properties dbms_aq.message_properties_t;
728 snap_too_old exception;
729 pragma exception_init(snap_too_old, -1555);
730 begin
731
732 -- find out the schema name
733 wf_queue.set_queue_names;
734
735 dequeue_options.dequeue_mode := dequeuemode;
736 dequeue_options.wait := dbms_aq.NO_WAIT;
737 dequeue_options.navigation := navigation;
738
739 -- if message_handle is set then use it instead of correlation
740 -- NOTE: if message_handle is set FIRST/NEXT_MESSAGE dont have effect
741
742 if message_handle is not null then
743 dequeue_options.msgid := message_handle;
744 dequeue_options.correlation := null;
745 message_handle := null;
746 else
747 -- set correlation to item_type or % if its null
748 if correlation is null then
749 dequeue_options.correlation := '%';
750 else
751 dequeue_options.correlation := correlation;
752 end if;
753
754 end if;
755
756 -- check if we need to have a consumer
757 if (multiconsumer) then
758 dequeue_options.consumer_name := wf_queue.account_name;
759 end if;
760
761 begin
762 dbms_aq.dequeue( queue_name => Dequeue_Event.queuename,
763 dequeue_options => dequeue_options,
764 message_properties => message_properties,
765 payload => Dequeue_Event.payload,
766 msgid => message_handle );
767
768 exception
769 when snap_too_old then
770 --Workaround for AQ when receiving ORA-01555 using NEXT_MESSAGE as
771 --navigation. We will try to set to FIRST_MESSAGE and dequeue to
772 --silently handle this exception.
773 if (dequeue_options.navigation = dbms_aq.FIRST_MESSAGE) then
774 raise;
775 else
776 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
777 dbms_aq.dequeue( queue_name => Dequeue_Event.queuename,
778 dequeue_options => dequeue_options,
779 message_properties => message_properties,
780 payload => Dequeue_Event.payload,
781 msgid => message_handle );
782 end if;
783
784 when OTHERS then
785 raise;
786
787 end;
788
789 timeout:= FALSE;
790
791 exception
792 when dequeue_timeout then
793 timeout := TRUE;
794 when others then
795 if correlation is null then
796 Wf_Core.Context('WF_QUEUE', 'Dequeue_Event', queuename, '%');
797 else
798 Wf_Core.Context('WF_QUEUE', 'Dequeue_Event', queuename, correlation);
799 end if;
800 timeout := FALSE;
801 raise;
802
803 end Dequeue_Event;
804
805 -- Activity_Valid (PRIVATE)
806 -- checks the deferred activity is valid for processing
807 --
808 -- IN
809 -- event - the event to check
810 -- message_handle of event in the deferred queue
811 -- maxthreshold - the threshold level
812 -- minthreshold
813 --
814 function activity_valid (event in system.wf_payload_t,
815 message_handle in raw,
816 maxthreshold in number default null,
817 minthreshold in number default null)
818 return BOOLEAN is
819 cost pls_integer;
820 litemtype varchar2(8);
821 l_begdate date; -- <dlam:3070112>
822
823 resource_busy exception;
824 pragma exception_init(resource_busy, -00054);
825
826 begin
827
828
829 -- Activity must be valid if
830 -- 1) in given cost range
831 -- 2) parent is not suspended
832 -- note: suspendprocess/resumeprocess will remove/add deferred jobs
833
834
835 -- <dlam:3070112> check begin date as well
836 -- move the BEGIN_DATE, SYSDATE comparion to a separate clause
837 SELECT CWA.COST, CWIAS.BEGIN_DATE
838 into cost, l_begdate
839 FROM WF_ITEM_ACTIVITY_STATUSES CWIAS,
840 WF_PROCESS_ACTIVITIES CWPA,
841 WF_ITEMS WI,
842 WF_ACTIVITIES CWA
843 where CWIAS.ACTIVITY_STATUS = 'DEFERRED'
844 and CWIAS.PROCESS_ACTIVITY = CWPA.INSTANCE_ID
845 and CWPA.ACTIVITY_ITEM_TYPE = CWA.ITEM_TYPE
846 and CWPA.ACTIVITY_NAME = CWA.NAME
847 and CWIAS.ITEM_TYPE = WI.ITEM_TYPE
848 and CWIAS.ITEM_KEY = WI.ITEM_KEY
849 and WI.BEGIN_DATE >= CWA.BEGIN_DATE
850 and WI.BEGIN_DATE < nvl(CWA.END_DATE, WI.BEGIN_DATE+1)
851 and CWIAS.ITEM_TYPE = event.itemtype
852 and CWIAS.ITEM_KEY = event.itemkey
853 and CWIAS.PROCESS_ACTIVITY = event.actid;
854
855 -- dont bother locking: the original msg has been locked on the queue
856 -- for update of CWIAS.ACTIVITY_STATUS NOWAIT;
857
858 -- dont bother checking if parent is suspended.
859 -- the suspend process should remove any jobs from the queue,
860 -- but if any get through, process_activity will manage it.
861
862 -- <dlam:3070112>
863 -- begin date has not reached yet, leave the message alone.
864 -- this is to work around a problem where the aq delay seems to
865 -- to be shorter than expected
866 if (l_begdate > sysdate) then
867 return(FALSE);
868 end if;
869
870 if cost < nvl(minthreshold,cost) or cost > nvl(maxthreshold,cost) then
871 return(FALSE);
872 else
873 return(TRUE);
874 end if;
875
876 exception
877 when no_data_found then
878 -- this event is no longer valid so remove it from the queue
879 -- happens when a rewind moved activity to history table
880 -- or the activity status is no longer defered
881 wf_queue.PurgeEvent(wf_queue.DeferredQueue,message_handle,TRUE);
882 return(FALSE);
883 when resource_busy then
884 return(FALSE);
885 when others then
886 Wf_Core.Context('Wf_Queue', 'Activity_valid', 'Invalid',
887 event.itemtype||':'||event.itemkey, to_char(event.actid));
888 return(FALSE);
889 end activity_valid;
890
891 --
892 -- ====================================================================
893 --
894 -- Enqueue_Event (PRIVATE)
895 -- Enqueues a message onto any WF queue (because all queues have same payload)
896 --
897
898 procedure Enqueue_Event(queuename in varchar2,
899 itemtype in varchar2,
900 itemkey in varchar2,
901 actid in number,
902 correlation in varchar2 default null,
903 delay in number default 0,
904 funcname in varchar2 default null,
905 paramlist in varchar2 default null,
906 result in varchar2 default null,
907 message_handle in out NOCOPY raw,
908 priority in number default null)
909
910 as
911 event system.wf_payload_t;
912 enqueue_options dbms_aq.enqueue_options_t;
913 message_properties dbms_aq.message_properties_t;
914 l_increment_delay number;
915 l_min_delay number;
916 l_background_occurrence number;
917
918 begin
919
920 l_increment_delay := delay;
921
922 -- Bug 4005674
923 -- Check the occurrence of item_type:item_key:actid. If this is the same
924 -- activity which we just dequeued, calculate the number of occurrence
925 -- from history table since the background engine started.
926 if (wf_queue.g_dequeue_item_type = enqueue_event.itemtype and
927 wf_queue.g_dequeue_item_key = enqueue_event.itemkey and
928 wf_queue.g_dequeue_actid = enqueue_event.actid) then
929
930 g_Key := WF_CACHE.HashKey(enqueue_event.itemtype||':'||
931 enqueue_event.itemkey||':'||enqueue_event.actid);
932
933 -- If hashkey does not exist or the itemtype:itemkey:actid do not match,
934 -- get the history count from base table, else increment the l_background_occurrence
935 if (not g_ActivityHistoryCount.EXISTS(g_Key) or
936 (g_ActivityHistoryCount(g_Key).ITEM_TYPE <> enqueue_event.itemtype) or
937 (g_ActivityHistoryCount(g_Key).ITEM_KEY <> enqueue_event.itemkey) or
938 (g_ActivityHistoryCount(g_Key).ACTID <> enqueue_event.actid)) then
939
940 select count(process_activity)
941 into l_background_occurrence
942 from wf_item_activity_statuses_h
943 where item_type = enqueue_event.itemtype
944 and item_key = enqueue_event.itemkey
945 and process_activity = enqueue_event.actid
946 and begin_date >= g_background_begin_date;
947 else
948 l_background_occurrence := g_ActivityHistoryCount(g_Key).HISTORY_COUNT + 1;
949 end if;
950
951 -- Record the itemtype:itemkey:actid:history_count in hash table
952 g_ActivityHistoryCount(g_Key).ITEM_TYPE := enqueue_event.itemtype;
953 g_ActivityHistoryCount(g_Key).ITEM_KEY := enqueue_event.itemkey;
954 g_ActivityHistoryCount(g_Key).ACTID := enqueue_event.actid;
955 g_ActivityHistoryCount(g_Key).HISTORY_COUNT := l_background_occurrence;
956
957 -- Bug 4005674
958 -- For every 100 occurrences, add 5 mins to the delay up to a max of 60 mins
959 l_min_delay := floor(l_background_occurrence/wf_queue.g_defer_occurrence)
960 * wf_queue.g_add_delay_seconds;
961
962 if (l_min_delay < wf_queue.g_max_delay_seconds) then
963 l_increment_delay := l_increment_delay + l_min_delay;
964 elsif (l_min_delay >= wf_queue.g_max_delay_seconds) then
965 l_increment_delay := l_increment_delay + wf_queue.g_max_delay_seconds;
966 end if;
967
968 begin
969 -- Add a run-time item attribute of #DELAY_ACTID_<actid> to track the
970 -- continuous loop
971 Wf_Engine.SetItemAttrNumber(itemtype=>enqueue_event.itemtype,
972 itemkey=>enqueue_event.itemkey,
973 aname=>'#DELAY_ACTID_'||enqueue_event.actid,
974 avalue=>l_increment_delay);
975 exception
976 when others then
977 if (wf_core.error_name = 'WFENG_ITEM_ATTR') then
978 Wf_Core.Clear;
979 Wf_Engine.AddItemAttr(itemtype=>enqueue_event.itemtype,
980 itemkey=>enqueue_event.itemkey,
981 aname=>'#DELAY_ACTID_'||enqueue_event.actid,
982 number_value=>l_increment_delay);
983 else
984 raise;
985 end if;
986 end;
987 end if;
988
989 wf_queue.set_queue_names;
990 -- construct the event object
991 event:=system.wf_payload_t(itemtype,itemkey,actid,funcname,paramlist,result);
992
993 -- dont make the data visible on the queue until a commit is issued
994 -- this way queue data and normal table data (wf statuses) are in synch.
995 enqueue_options.visibility := DBMS_AQ.ON_COMMIT;
996
997 -- Set the delay if any
998 if l_increment_delay < 0 then
999 message_properties.delay := 0;
1000 else
1001 -- message_properties.delay is BINARY_INTEGER, so check if delay is
1002 -- too big, and set the max delay to be (2**31)-1.
1003 if (l_increment_delay >= power(2,31)) then
1004 message_properties.delay := power(2,31)-1;
1005 else
1006 message_properties.delay := l_increment_delay;
1007 end if;
1008
1009 end if;
1010
1011 if correlation is not null then
1012 message_properties.correlation := enqueue_event.correlation;
1013 else
1014 message_properties.correlation := wf_queue.account_name||itemtype;
1015 end if;
1016
1017 -- check the correlation is always set to something
1018 -- else it wil never be dequeued because we always default the dequeue
1019 -- corellation to '%'
1020 if message_properties.correlation is null then
1021 -- this shouldnt happen.
1022 message_properties.correlation := '%';
1023 end if;
1024
1025 -- Set the priority so that we can dequeue by priority
1026 if priority is not null then
1027 message_properties.priority := priority;
1028 end if;
1029
1030 dbms_aq.enqueue
1031 (
1032 queue_name => Enqueue_Event.queuename,
1033 enqueue_options => enqueue_options,
1034 message_properties => message_properties,
1035 payload => event,
1036 msgid => message_handle
1037 );
1038
1039
1040 exception
1041 when others then
1042 Wf_Core.Context('Wf_Queue', 'Enqueue_event', itemtype,
1043 itemkey, to_char(actid), to_char(delay));
1044 raise;
1045
1046 end;
1047
1048
1049 -- ProcessInboundQueue (PUBLIC)
1050 -- reads everythig off the Inbound queue and records it as complete
1051 -- with the given result and updates item attributes as specified in
1052 -- the paramlist
1053
1054
1055 procedure ProcessInboundQueue (itemtype in varchar2 default null,
1056 correlation in varchar2 default null)
1057 as
1058
1059 payload system.wf_payload_t;
1060 navigation varchar2(10);
1061 timeout boolean:= FALSE;
1062 cursor_name number;
1063 row_processed integer;
1064 message_handle raw(16);
1065 -- first_time boolean := TRUE;
1066 plist varchar2(4000);
1067 lcorrelation varchar2(80);
1068 nothing_processed boolean := TRUE;
1069
1070 begin
1071 commit;
1072
1073 Fnd_Concurrent.Set_Preferred_RBS;
1074
1075 wf_queue.set_queue_names;
1076
1077 if correlation is not null then
1078 lcorrelation := correlation;
1079 else
1080 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
1081 end if;
1082
1083 -- loop through the inbound queue.
1084 loop --Process until nothing left on the queue
1085
1086 navigation := dbms_aq.FIRST_MESSAGE;
1087 nothing_processed :=TRUE;
1088
1089 loop -- Process till timeout
1090
1091 message_handle:=null;
1092 --Lets set a savepoint here
1093 --We would use this savepoint to rollback if we found that a
1094 --lock is not possible in this session for the reterived itemytype key
1095
1096 wf_queue.Dequeue_Event(wf_queue.InboundQueue,
1097 dbms_aq.LOCKED,
1098 navigation,
1099 lcorrelation,
1100 payload,
1101 message_handle,
1102 timeout);
1103
1104 -- if no message is found, the message may be enqueued with the
1105 -- old correlation format, so reset the correlation id and retry.
1106 if (navigation = dbms_aq.FIRST_MESSAGE and message_handle is null
1107 and correlation is null and lcorrelation <> nvl(itemtype,'%')) then
1108
1109 lcorrelation := nvl(itemtype,'%');
1110 goto nextmesg;
1111 end if;
1112
1113 --else check timeout
1114 if (timeout) then
1115 EXIT;
1116 end if;
1117
1118 --Bug 2607770
1119 --Ensure that we have got a message
1120 --Now try to acquire the lock
1121 --Check the parameterlist null/not within Process_Inbound_Event
1122
1123 if wf_item.acquire_lock(payload.itemtype, payload.itemkey) then
1124 --Process the payload
1125 wf_queue.Process_Inbound_Event(itemtype=>payload.itemtype,
1126 itemkey=>payload.itemkey,
1127 actid=>payload.actid,
1128 message_handle=>ProcessInboundQueue.message_handle,
1129 p_payload => payload);
1130
1131 nothing_processed:=FALSE;
1132
1133 end if;
1134
1135 -- commit any processing or any clean up
1136 commit;
1137 Fnd_Concurrent.Set_Preferred_RBS;
1138
1139 navigation := dbms_aq.NEXT_MESSAGE;
1140
1141 <<nextmesg>> -- This is for the case when we reset the corrid and verify
1142 null;
1143 end loop; -- process till timeout
1144
1145 exit when nothing_processed;
1146 end loop;
1147 exception
1148 when others then
1149 Wf_Core.Context('Wf_Queue', 'ProcessInboundQueue');
1150 raise;
1151 end ProcessInboundQueue;
1152
1153 procedure ProcessDeferredQueue (itemtype in varchar2 default null,
1154 minthreshold in number default null,
1155 maxthreshold in number default null,
1156 correlation in varchar2 default null)
1157
1158 as
1159 payload system.wf_payload_t;
1160 timeout boolean:= FALSE;
1161 navigation varchar2(10);
1162 row_processed integer;
1163 message_handle raw(16);
1164 -- first_time boolean := TRUE;
1165 nothing_processed boolean:=TRUE;
1166 lcorrelation varchar2(80);
1167
1168 begin
1169
1170 -- Bug 4005674
1171 -- Record the sysdate when background engine started.
1172 g_background_begin_date := sysdate;
1173
1174 wf_queue.set_queue_names;
1175
1176 if correlation is not null then
1177 lcorrelation := correlation;
1178
1179 -- for standalone, we first try the old correlation id format.
1180 elsif (wf_core.translate('WF_INSTALL') = 'STANDALONE'
1181 and itemtype is not null) then
1182 lcorrelation := itemtype;
1183
1184 -- for embedded, there was never the old format, so we are fine.
1185 -- or it is standalone with null item type, we cannot support the
1186 -- old correlation id format; otherwise, it will pick up everything.
1187 else
1188 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
1189 end if;
1190
1191 loop -- keep processing the queue until there is nothing left
1192
1193 navigation := dbms_aq.FIRST_MESSAGE;
1194 nothing_processed :=TRUE;
1195
1196 loop -- keep processing until a timeout.
1197
1198 message_handle:=null;
1199 wf_queue.Dequeue_Event(
1200 wf_queue.DeferredQueue,
1201 dbms_aq.LOCKED,
1202 navigation,
1203 lcorrelation,
1204 payload,
1205 message_handle,
1206 timeout,
1207 TRUE);
1208
1209 -- Bug 4005674
1210 -- Record the item_type:item_key:actid at dequeue time
1211 wf_queue.g_dequeue_item_type := payload.itemtype;
1212 wf_queue.g_dequeue_item_key := payload.itemkey;
1213 wf_queue.g_dequeue_actid := payload.actid;
1214
1215 -- if no message is found, the message may be enqueued with the
1216 -- new correlation format, so reset the correlation id and retry.
1217 if (navigation = dbms_aq.FIRST_MESSAGE and message_handle is null
1218 and correlation is null and lcorrelation = itemtype) then
1219
1220 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
1221
1222 -- otherwise, process the message
1223 else
1224 if (timeout) then
1225 EXIT;
1226 end if;
1227
1228 --
1229 -- Execute the PL/SQL call stored in the payload if this is valid
1230 --
1231 if activity_valid (payload,
1232 message_handle,
1233 maxthreshold,
1234 minthreshold )
1235 AND
1236 wf_item.acquire_lock(payload.itemtype,payload.itemkey) then
1237
1238 wf_queue.ProcessDeferredEvent(itemtype=>payload.itemtype,
1239 itemkey=>payload.itemkey,
1240 actid=>payload.actid,
1241 message_handle=>ProcessDeferredQueue.message_handle,
1242 minthreshold=>ProcessDeferredQueue.minthreshold,
1243 maxthreshold=>ProcessDeferredQueue.maxthreshold);
1244
1245 nothing_processed:=FALSE;
1246
1247 end if;
1248
1249 -- commit any processing or any clean up from activity_valid
1250 commit;
1251 Fnd_Concurrent.Set_Preferred_RBS;
1252
1253 --
1254 -- Test for Instance Shutdown
1255 --
1256 if wf_queue.check_instance then
1257 raise shutdown_pending;
1258 end if;
1259
1260 navigation := dbms_aq.NEXT_MESSAGE;
1261
1262 end if;
1263 end loop; -- process till time out
1264
1265 exit when nothing_processed;
1266
1267 end loop;
1268
1269 exception
1270 when dequeue_disabled then
1271 Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue', 'Queue shutdown');
1272 raise;
1273 when shutdown_pending then
1274 Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue', 'DB shutting down');
1275 raise;
1276 when others then
1277 Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue');
1278 raise;
1279 end ProcessDeferredQueue;
1280
1281
1282 --============================================================
1283 -- Support utilities. not sure if we want to release these
1284 --============================================================
1285 -- GetMessageHandle
1286 -- does a sequential search through the queue for the message handle
1287
1288 function GetMessageHandle(queuename in varchar2,
1289 itemtype in varchar2,
1290 itemkey in varchar2,
1291 actid in number,
1292 correlation in varchar2 default null,
1293 multiconsumer in boolean default FALSE) return raw
1294 is
1295 event system.wf_payload_t;
1296 dequeue_options dbms_aq.dequeue_options_t;
1297 message_properties dbms_aq.message_properties_t;
1298 msg_id raw(16);
1299 begin
1300 dequeue_options.dequeue_mode := dbms_aq.BROWSE;
1301 dequeue_options.wait := dbms_aq.NO_WAIT;
1302 wf_queue.set_queue_names;
1303 if correlation is not null then
1304 dequeue_options.correlation := correlation;
1305 else
1306 dequeue_options.correlation := wf_queue.account_name||nvl(itemtype,'%');
1307 end if;
1308
1309 if (multiconsumer) then
1310 dequeue_options.consumer_name := wf_queue.account_name;
1311 end if;
1312
1313 --execute first read
1314 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
1315 dbms_aq.dequeue
1316 (
1317 queue_name => queuename,
1318 dequeue_options => dequeue_options,
1319 message_properties => message_properties,
1320 payload => event,
1321 msgid => msg_id
1322 );
1323
1324 if event.itemtype = itemtype
1325 and event.itemkey = itemkey
1326 and event.actid = nvl(actid,event.actid) then
1327 return (msg_id);
1328 end if;
1329
1330 -- loop with next message
1331 LOOP
1332 dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
1333 dbms_aq.dequeue
1334 (
1335 queue_name => queuename,
1336 dequeue_options => dequeue_options,
1337 message_properties => message_properties,
1338 payload => event,
1339 msgid => msg_id
1340 );
1341
1342 if event.itemtype = itemtype
1343 and event.itemkey = itemkey
1344 and event.actid = actid then
1345 return (msg_id);
1346 end if;
1347
1348 END LOOP;
1349
1350 return(null);
1351
1352 exception -- timeout will fall to here
1353 when others then
1354 return(null);
1355 end GetMessageHandle;
1356 --=============================================================
1357 -- PUBLIC API to dequeue from exception queue to wf_error
1358 -- queue
1359 --=============================================================
1360 procedure DequeueException (queuename in varchar2)
1361 is
1362
1363 l_event wf_event_t;
1364 x_dequeue_options dbms_aq.dequeue_options_t;
1365 x_message_properties dbms_aq.message_properties_t;
1366 x_msgid RAW(16);
1367 erragt wf_agent_t;
1368 lsysname varchar2(30);
1369 cmd varchar2(1000);
1370 no_messages exception;
1371 pragma exception_init (no_messages, -25228);
1372
1373 begin
1374
1375 -- To Dequeue from Exception Queue, consumer name must be null
1376 x_dequeue_options.consumer_name := null;
1377 x_dequeue_options.wait := 1;
1378
1379 loop
1380 begin
1381 dbms_aq.dequeue(queue_name => queuename,
1382 dequeue_options => x_dequeue_options,
1383 message_properties => x_message_properties, /* OUT */
1384 payload => l_event, /* OUT */
1385 msgid => x_msgid); /* OUT */
1386
1387 /*
1388 ** Update the event to let everyone know it expired
1389 */
1390 l_event.SetErrorMessage(wf_core.translate('WFE_MESSAGE_EXPIRED'));
1391 l_event.addParameterToList('ERROR_NAME',
1392 wf_core.translate('WFE_MESSAGE_EXPIRED') );
1393 l_event.addParameterToList('ERROR_TYPE', 'ERROR');
1394
1395 /*
1396 ** As we can't use the private API SaveErrorToQueue
1397 ** we copy a little bit of code to do it
1398 */
1399 select name into lsysname
1400 from wf_systems
1401 where guid = hextoraw(wf_core.translate('WF_SYSTEM_GUID'));
1402
1403 erragt := wf_agent_t('WF_ERROR', lsysname);
1404 cmd := 'begin WF_ERROR_QH.enqueue(:v1, :v2); end;';
1405 execute immediate cmd using in l_event,
1406 in erragt;
1407
1408 commit;
1409
1410 exception
1411 when no_messages then
1412 if (wf_log_pkg.level_event >= fnd_log.g_current_runtime_level) then
1413 wf_log_pkg.string(wf_log_pkg.level_event,
1414 'wf.plsql.WF_QUEUE.DequeueException.queue_empty',
1415 'No more messages in ExceptionDequeue.');
1416 end if;
1417 exit;
1418 end;
1419 end loop;
1420 exception
1421 when others then
1422 Wf_Core.Context('Wf_Queue', 'DequeueException',queuename);
1423 raise;
1424 end DequeueException;
1425 --=============================================================
1426 -- Declare all developer APIs for Inbound queue manipulation
1427 --
1428 --=============================================================
1429
1430 --
1431 -- ClearMsgStack
1432 -- Clears runtime cache
1433 procedure ClearMsgStack
1434 is
1435 begin
1436 wf_queue.stck_itemtype(1) := '';
1437 wf_queue.stck_itemkey(1) := '';
1438 wf_queue.stck_actid(1) := 0;
1439 wf_queue.stck_ctr := 0;
1440 exception
1441 when others then
1442 Wf_Core.Context('Wf_Queue', 'ClearMsgStack');
1443 raise;
1444 end ClearMsgStack;
1445
1446
1447 --Name: WriteMsg
1448 --writes a message from stack to the queue
1449 procedure WriteMsg (
1450 itemtype in varchar2,
1451 itemkey in varchar2,
1452 actid in number)
1453 is
1454 i pls_integer;
1455 begin
1456
1457 i := wf_queue.SearchMsgStack(itemtype,itemkey,actid);
1458
1459 wf_queue.EnqueueInbound(
1460 itemtype=>wf_queue.stck_itemtype(i),
1461 itemkey =>wf_queue.stck_itemkey(i),
1462 actid =>wf_queue.stck_actid(i),
1463 result =>wf_queue.stck_result(i),
1464 attrlist=>wf_queue.stck_attrlist(i));
1465
1466
1467 exception
1468 when others then
1469 Wf_Core.Context('Wf_Queue', 'WriteMsg');
1470 raise;
1471
1472 end WriteMsg;
1473
1474 --Name: CreateMsg
1475 --creates a message on the stack
1476 --
1477 procedure CreateMsg (
1478 itemtype in varchar2,
1479 itemkey in varchar2,
1480 actid in number)
1481 is
1482 i pls_integer;
1483 begin
1484
1485 i := wf_queue.SearchMsgStack(itemtype,itemkey,actid);
1486
1487 exception
1488 when others then
1489 Wf_Core.Context('Wf_Queue', 'CreateMsg');
1490 raise;
1491
1492 end CreateMsg;
1493
1494
1495
1496 --Name: SetMsgAttr (PUBLIC)
1497 --Appends message attributes.
1498 --
1499 procedure SetMsgAttr(
1500 itemtype in varchar2,
1501 itemkey in varchar2,
1502 actid in number,
1503 attrName in varchar2,
1504 attrValue in varchar2)
1505 is
1506 i pls_integer;
1507 begin
1508 i := SearchMsgStack (itemtype, itemkey, actid);
1509
1510 if wf_queue.stck_attrlist(i) is null then
1511 wf_queue.stck_attrlist(i) := upper(attrName)||'='||AttrValue;
1512 else
1513 wf_queue.stck_attrlist(i) :=
1514 wf_queue.stck_attrlist(i) ||'^'||attrName||'='||AttrValue;
1515 end if;
1516
1517 exception
1518 when others then
1519 Wf_Core.Context('Wf_Queue', 'SetMsgAttr',
1520 itemtype, itemkey, actid, to_char(stck_ctr));
1521 raise;
1522 end SetMsgAttr;
1523
1524 --Name: SetMsgResult (PUBLIC)
1525 --Sets the result value for this message.
1526 --
1527 procedure SetMsgResult(
1528 itemtype in varchar2,
1529 itemkey in varchar2,
1530 actid in number,
1531 result in varchar2)
1532 is
1533 i pls_integer;
1534 begin
1535 i := SearchMsgStack (itemtype, itemkey, actid);
1536
1537 wf_queue.stck_result(i) :=result;
1538
1539 exception
1540 when others then
1541 Wf_Core.Context('Wf_Queue', 'AddResult',
1542 itemtype, itemkey, actid, to_char(stck_ctr));
1543 raise;
1544 end SetMsgResult;
1545
1546 --
1547 -- AddNewMsg (PRIVATE)
1548 -- Add a new message to the stack
1549 -- IN
1550 -- itemtype - item itemtype
1551 -- itemkey - item itemkey
1552 -- actid - instance id of process
1553 --
1554 procedure AddNewMsg(
1555 itemtype in varchar2,
1556 itemkey in varchar2,
1557 actid in number)
1558 is
1559 begin
1560
1561 -- Add the process to the stack
1562 wf_queue.stck_ctr := wf_queue.stck_ctr + 1;
1563 wf_queue.stck_itemtype(wf_queue.stck_ctr) := itemtype;
1564 wf_queue.stck_itemkey(wf_queue.stck_ctr) := itemkey;
1565 wf_queue.stck_actid(wf_queue.stck_ctr) := actid;
1566 wf_queue.stck_result(wf_queue.stck_ctr) := null;
1567 wf_queue.stck_AttrList(wf_queue.stck_ctr) := null;
1568
1569 exception
1570 when others then
1571 Wf_Core.Context('Wf_Queue', 'AddNewMsg',
1572 itemtype, itemkey, actid, to_char(stck_ctr));
1573 raise;
1574 end AddNewMsg;
1575
1576 --Name: SearchMsgStack (PRIVATE)
1577 --Desc: sequential search of the message stack
1578 -- starting from the top
1579 --
1580 function SearchMsgStack (
1581 itemtype in varchar2,
1582 itemkey in varchar2,
1583 actid in number) RETURN number
1584 is
1585
1586 i pls_integer;
1587
1588 begin
1589
1590 if ( nvl(wf_queue.stck_ctr, 0) > 0) then
1591 for i in reverse 1 .. wf_queue.stck_ctr loop
1592 if ((itemtype = wf_queue.stck_itemtype(i)) and
1593 (itemkey = wf_queue.stck_itemkey(i)) and
1594 (actid = wf_queue.stck_actid(i))) then
1595 -- Found a match.
1596 return(i);
1597 end if;
1598 end loop;
1599 end if;
1600
1601 -- not in the Stack so add it.
1602 AddNewMsg(itemtype,itemkey,actid);
1603 return (stck_ctr);
1604
1605 end SearchMsgStack;
1606
1607 --
1608 -- Generic_Queue_Display
1609 -- Produce list of generic_queues
1610 --
1611 -- MODIFICATION LOG:
1612 -- 06-JUN-2001 JWSMITH BUG 1819232 - added alt attrib for IMG tag for ADA
1613 -- - Added summary attr for table tags for ADA
1614 -- - Added ID attr for TD tags for ADA
1615 --
1616 procedure Generic_Queue_Display
1617 is
1618 username varchar2(320); -- Username to query
1619 admin_role varchar2(320); -- Role for admin mode
1620 admin_mode varchar2(1) := 'N';
1621 realname varchar2(360); -- Display name of username
1622 s0 varchar2(2000); -- Dummy
1623 l_error_msg varchar2(240);
1624 l_url varchar2(240);
1625 l_media varchar2(240) := wfa_html.image_loc;
1626 l_icon varchar2(40);
1627 l_text varchar2(240);
1628 l_onmouseover varchar2(240);
1629
1630
1631 cursor queues_cursor is
1632 select wfq.protocol,
1633 wfq.inbound_outbound,
1634 wfq.description,
1635 wfq.queue_count
1636 from wf_queues wfq
1637 where NVL(wfq.disable_flag, 'N') = 'N'
1638 order by wfq.protocol, wfq.inbound_outbound;
1639
1640 rowcount number;
1641
1642 begin
1643
1644 -- Check current user has admin authority
1645 wfa_sec.GetSession(username);
1646 username := upper(username);
1647 wf_directory.GetRoleInfo(username, realname, s0, s0, s0, s0);
1648
1649 admin_role := wf_core.translate('WF_ADMIN_ROLE');
1650 if (admin_role = '*' or
1651 Wf_Directory.IsPerformer(username, admin_role)) then
1652 admin_mode := 'Y';
1653 else
1654
1655 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
1656
1657 end if;
1658
1659 -- Set page title
1660 htp.htmlOpen;
1661 htp.headOpen;
1662 htp.p('<BASE TARGET="_top">');
1663 htp.title(wf_core.translate('WFGENERIC_QUEUE_TITLE'));
1664 wfa_html.create_help_function('wf/links/dmr.htm?DMREP');
1665 htp.headClose;
1666 wfa_sec.Header(FALSE, '',wf_core.translate('WFGENERIC_QUEUE_TITLE'), FALSE);
1667 htp.br;
1668
1669 IF (admin_mode = 'N') THEN
1670
1671 htp.center(htf.bold(l_error_msg));
1672 return;
1673
1674 END IF;
1675
1676 -- Column headers
1677 htp.tableOpen(cattributes=>'border=1 cellpadding=3 bgcolor=white width="100%" summary=""');
1678 htp.tableRowOpen(cattributes=>'bgcolor=#006699');
1679
1680
1681 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1682 wf_core.translate('PROTOCOL')||'</font>',
1683 calign=>'Center',
1684 cattributes=>'id="' || wf_core.translate('PROTOCOL') || '"');
1685 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1686 wf_core.translate('QUEUE_DESCRIPTION')||'</font>',
1687 calign=>'Center',
1688 cattributes=>'id="' || wf_core.translate('QUEUE_DESCRIPTION') || '"');
1689 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1690 wf_core.translate('INBOUND_PROMPT')||'</font>',
1691 calign=>'Center',
1692 cattributes=>'id="' || wf_core.translate('INBOUND_PROMPT') || '"');
1693 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1694 wf_core.translate('QUEUE_COUNT')||'</font>',
1695 calign=>'Center',
1696 cattributes=>'id="' || wf_core.translate('QUEUE_COUNT') || '"');
1697 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1698 wf_core.translate('VIEW_DETAIL')||'</font>',
1699 calign=>'Center',
1700 cattributes=>'id="' || wf_core.translate('VIEW_DETAIL') || '"');
1701 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1702 wf_core.translate('DELETE')||'</font>',
1703 calign=>'Center',
1704 cattributes=>'id="' || wf_core.translate('DELETE') || '"');
1705
1706 htp.tableRowClose;
1707 htp.tableRowOpen;
1708 htp.tableRowClose;
1709
1710 -- Show all nodes
1711 for queues in queues_cursor loop
1712
1713 htp.tableRowOpen(null, 'TOP');
1714
1715
1716 htp.tableData(htf.anchor2(
1717 curl=>wfa_html.base_url||
1718 '/wf_queue.generic_queue_edit?p_protocol='||
1719 queues.protocol||'&p_inbound_outbound='||
1720 queues.inbound_outbound,
1721 ctext=>queues.protocol, ctarget=>'_top'),
1722 'Left',
1723 cattributes=>'headers="' ||
1724 wf_core.translate('PROTOCOL') || '"');
1725
1726 htp.tableData(queues.description, 'left',
1727 cattributes=>'headers="' || wf_core.translate('QUEUE_DESCRIPTION') || '"');
1728
1729 htp.tableData(queues.inbound_outbound, 'left',
1730 cattributes=>'headers="' || wf_core.translate('INBOUND_PROMPT') || '"');
1731
1732 htp.tableData(queues.queue_count, 'left',
1733 cattributes=>'headers="' || wf_core.translate('QUEUE_COUNT') || '"');
1734
1735 htp.tableData(htf.anchor2(curl=>wfa_html.base_url||
1736 '/wf_queue.Generic_Queue_View_Detail?p_protocol='||
1737 queues.protocol||'&p_inbound_outbound='||
1738 queues.inbound_outbound,
1739 ctext=>'<IMG SRC="'||wfa_html.image_loc||'affind.gif" alt="'||wf_core.translate('FIND') || '"BORDER=0>'),
1740 'center', cattributes=>'valign="MIDDLE"
1741 headers="' || wf_core.translate('VIEW_DETAIL') || '"');
1742
1743 htp.tableData(htf.anchor2(curl=>wfa_html.base_url||
1744 '/wf_queue.generic_queue_confirm_delete?p_protocol='||
1745 queues.protocol||'&p_inbound_outbound='||
1746 queues.inbound_outbound,
1747 ctext=>'<IMG SRC="'||wfa_html.image_loc||'FNDIDELR.gif" alt="' || wf_core.translate('WFRTG_DELETE') || '" BORDER=0>'),
1748 'center', cattributes=>'valign="MIDDLE"
1749 headers="' || wf_core.translate('DELETE') || '"');
1750
1751
1752 end loop;
1753
1754 htp.tableclose;
1755
1756 htp.br;
1757
1758 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
1759
1760 --Add new node Button
1761 htp.tableRowOpen;
1762
1763 l_url := wfa_html.base_url||'/wf_queue.generic_queue_edit';
1764 l_icon := 'FNDJLFOK.gif';
1765 l_text := wf_core.translate ('WFQUEUE_CREATE');
1766 l_onmouseover := wf_core.translate ('WFQUEUE_CREATE');
1767
1768 htp.p('<TD ID="">');
1769
1770 wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
1771
1772 htp.p('</TD>');
1773
1774 htp.tableRowClose;
1775
1776 htp.tableclose;
1777
1778 wfa_sec.Footer;
1779 htp.htmlClose;
1780
1781 exception
1782 when others then
1783 wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_Display');
1784 raise;
1785 end Generic_Queue_Display;
1786
1787 --
1788 -- Generic_Queue_View_Detail
1789 -- Produce list of generic_queues
1790 --
1791 -- MODIFICATION LOG:
1792 -- 06-JUN-2001 JWSMITH BUG 1819232 - added alt attrib for IMG tag for ADA
1793 -- - Added summary attribute for table tags for ADA
1794 --
1795 procedure Generic_Queue_View_Detail (
1796 p_protocol IN VARCHAR2 DEFAULT NULL,
1797 p_inbound_outbound IN VARCHAR2 DEFAULT NULL
1798 ) IS
1799 l_count number := 0;
1800 username varchar2(320); -- Username to query
1801 admin_role varchar2(320); -- Role for admin mode
1802 admin_mode varchar2(1) := 'N';
1803 realname varchar2(360); -- Display name of username
1804 s0 varchar2(2000); -- Dummy
1805 l_error_msg varchar2(240);
1806 l_url varchar2(240);
1807 l_media varchar2(240) := wfa_html.image_loc;
1808 l_icon varchar2(40);
1809 l_text varchar2(240);
1810 l_onmouseover varchar2(240);
1811 l_sql varchar2(1000);
1812
1813 begin
1814
1815 -- Check current user has admin authority
1816 wfa_sec.GetSession(username);
1817 username := upper(username);
1818 wf_directory.GetRoleInfo(username, realname, s0, s0, s0, s0);
1819
1820 admin_role := wf_core.translate('WF_ADMIN_ROLE');
1821 if (admin_role = '*' or
1822 Wf_Directory.IsPerformer(username, admin_role)) then
1823 admin_mode := 'Y';
1824 else
1825
1826 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
1827
1828 end if;
1829
1830 -- Set page title
1831 htp.htmlOpen;
1832 htp.headOpen;
1833 htp.p('<BASE TARGET="_top">');
1834 htp.title(wf_core.translate('WFGENERIC_QUEUE_TITLE'));
1835 wfa_html.create_help_function('wf/links/dmr.htm?DMREP');
1836 htp.headClose;
1837 wfa_sec.Header(FALSE, '',wf_core.translate('WFGENERIC_QUEUE_TITLE'), FALSE);
1838 htp.br;
1839
1840 IF (admin_mode = 'N') THEN
1841
1842 htp.center(htf.bold(l_error_msg));
1843 return;
1844
1845 END IF;
1846
1847 SELECT queue_count
1848 INTO l_count
1849 FROM wf_queues
1850 WHERE UPPER(p_protocol) = protocol
1851 AND p_inbound_outbound = inbound_outbound;
1852
1853 -- Column headers
1854 htp.tableOpen(cattributes=>'border=1 cellpadding=3 bgcolor=white width="100%" summary=""');
1855 htp.tableRowOpen(cattributes=>'bgcolor=#006699');
1856
1857 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1858 wf_core.translate('PROTOCOL')||'</font>',
1859 calign=>'Center',
1860 cattributes=>'id="' ||
1861 wf_core.translate('PROTOCOL') || '"');
1862 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1863 wf_core.translate('QUEUE_NUMBER')||'</font>',
1864 calign=>'Center',
1865 cattributes=>'id="' ||
1866 wf_core.translate('QUEUE_NUMBER') || '"');
1867 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1868 wf_core.translate('QUEUE_NAME')||'</font>',
1869 calign=>'Center',
1870 cattributes=>'id="' ||
1871 wf_core.translate('QUEUE_NAME') || '"');
1872 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1873 wf_core.translate('QUEUE_COUNT')||'</font>',
1874 calign=>'Center',
1875 cattributes=>'id="' ||
1876 wf_core.translate('QUEUE_COUNT') || '"');
1877 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1878 wf_core.translate('VIEW_DETAIL')||'</font>',
1879 calign=>'Center',
1880 cattributes=>'id="' ||
1881 wf_core.translate('VIEW_DETAIL') || '"');
1882
1883 htp.tableRowClose;
1884 htp.tableRowOpen;
1885 htp.tableRowClose;
1886
1887 -- Show all queues for the given protocol
1888 for ii in 1..l_count loop
1889
1890 htp.tableRowOpen(null, 'TOP');
1891
1892 htp.tableData(p_protocol, 'left', cattributes=>'headers="' ||
1893 wf_core.translate('PROTOCOL') || '"');
1894
1895 htp.tableData(to_char(ii), 'left', cattributes=>'headers="' ||
1896 wf_core.translate('QUEUE_NUMBER') || '"');
1897
1898 -- p_protocol and p_inbound_outbound were verified above
1899 -- ii must be a number
1900 -- BINDVAR_SCAN_IGNORE
1901 htp.tableData(wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE', 'left', cattributes=>'headers="' || wf_core.translate('QUEUE_NAME') || '"');
1902
1903 /*
1904 ** Check to see if there are any messages in the specified queue
1905 */
1906 l_sql := 'SELECT COUNT(1) FROM WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE';
1907
1908 execute immediate l_sql INTO l_count;
1909
1910 htp.tableData(to_char(l_count), 'left', cattributes=>'headers="' ||
1911 wf_core.translate('QUEUE_COUNT') || '"');
1912
1913 htp.tableData(htf.anchor2(curl=>wfa_html.base_url||
1914 '/wf_queue.generic_queue_display_contents?p_protocol='||
1915 p_protocol||'&p_inbound_outbound='||
1916 p_inbound_outbound||'&p_queue_number='||
1917 to_char(ii)||'&p_message_number=1',
1918 ctext=>'<IMG SRC="'||wfa_html.image_loc||'affind.gif" alt="' || wf_core.translate('FIND') || '" BORDER=0>'),
1919 'center', cattributes=>'valign="MIDDLE" headers="' || wf_core.translate('VIEW_DETAIL') || '"');
1920
1921
1922 end loop;
1923
1924 htp.tableclose;
1925
1926 htp.br;
1927
1928 wfa_sec.Footer;
1929 htp.htmlClose;
1930
1931 exception
1932 when others then
1933 wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_View_Detail');
1934 raise;
1935 end Generic_Queue_View_Detail;
1936
1937
1938 -- MODIFICATION LOG:
1939 -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA
1940 -- - Added ID attr for TD tags for ADA
1941 procedure generic_queue_display_contents
1942 (p_protocol IN VARCHAR2 DEFAULT NULL,
1943 p_inbound_outbound IN VARCHAR2 DEFAULT NULL,
1944 p_queue_number IN NUMBER DEFAULT NULL,
1945 p_message_number IN NUMBER DEFAULT 1) IS
1946
1947 username varchar2(320); -- Username to query
1948 admin_role varchar2(320); -- Role for admin mode
1949 admin_mode varchar2(1) := 'N';
1950 l_media varchar2(240) := wfa_html.image_loc;
1951 l_icon varchar2(40) := 'FNDILOV.gif';
1952 l_text varchar2(240) := '';
1953 l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV');
1954 l_url varchar2(4000);
1955 l_error_msg varchar2(240);
1956
1957 l_more_data BOOLEAN := TRUE;
1958 l_message system.wf_message_payload_t;
1959 dequeue_options dbms_aq.dequeue_options_t;
1960 message_properties dbms_aq.message_properties_t;
1961 ii number := 0;
1962 l_loc number := 1;
1963 l_message_contents VARCHAR2(32000);
1964 l_message_offset binary_integer := 16000;
1965 l_queue_name varchar2(30);
1966 l_msg_id RAW(16);
1967
1968 begin
1969
1970 -- Check current user has admin authority
1971 wfa_sec.GetSession(username);
1972 username := upper(username);
1973
1974 admin_role := wf_core.translate('WF_ADMIN_ROLE');
1975 if (admin_role = '*' or
1976 Wf_Directory.IsPerformer(username, admin_role)) then
1977 admin_mode := 'Y';
1978 else
1979
1980 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
1981
1982 end if;
1983
1984 -- Set page title
1985 htp.htmlOpen;
1986 htp.headOpen;
1987 htp.title(wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'));
1988 wfa_html.create_help_function('wf/links/dmr.htm?DMREP');
1989
1990 wf_lov.OpenLovWinHtml;
1991
1992 htp.headClose;
1993
1994 -- Page header
1995 wfa_sec.Header(FALSE, '', wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'), TRUE);
1996
1997 IF (admin_mode = 'N') THEN
1998
1999 htp.center(htf.bold(l_error_msg));
2000 return;
2001
2002 END IF;
2003
2004 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
2005
2006 htp.p('<FORM NAME="FND_QUEUE_CONTENTS" ACTION="wf_queue.generic_queue_update" METHOD="POST">');
2007
2008 /*
2009 ** Create a page with a form field with the message payload
2010 */
2011 dequeue_options.dequeue_mode := dbms_aq.BROWSE;
2012 dequeue_options.wait := dbms_aq.NO_WAIT;
2013 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
2014
2015 l_queue_name := wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||
2016 to_char(p_queue_number)||'_QUEUE';
2017
2018 dbms_aq.dequeue
2019 (queue_name => l_queue_name,
2020 dequeue_options => dequeue_options,
2021 message_properties => message_properties,
2022 payload => l_message,
2023 msgid => l_msg_id
2024 );
2025
2026 dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
2027
2028 -- Loop until you reach the requested message
2029 for ii in 2..p_message_number loop
2030
2031 htp.p (to_char(ii));
2032
2033 dbms_aq.dequeue
2034 (queue_name => l_queue_name,
2035 dequeue_options => dequeue_options,
2036 message_properties => message_properties,
2037 payload => l_message,
2038 msgid => l_msg_id
2039 );
2040
2041 end loop;
2042
2043 -- Display the contents
2044 htp.tableRowOpen;
2045
2046 htp.p ('<TD ID="" ALIGN="Left">');
2047
2048 htp.p ('<TEXTAREA NAME="message_content" ROWS=26 COLS=120 WRAP="SOFT">');
2049
2050 while (l_more_data = TRUE) loop
2051
2052 BEGIN
2053
2054 dbms_lob.read(l_message.message, l_message_offset, l_loc, l_message_contents);
2055
2056 htp.p(l_message_contents);
2057
2058 l_loc := l_loc + l_message_offset;
2059
2060 if (l_message_offset < 16000) then
2061
2062 l_more_data := FALSE;
2063
2064 end if;
2065
2066 EXCEPTION
2067 WHEN NO_DATA_FOUND THEN
2068 l_more_data := FALSE;
2069 WHEN OTHERS THEN
2070 RAISE;
2071 END;
2072
2073 END LOOP;
2074
2075 htp.p ('</TEXTAREA>');
2076
2077 htp.p ('</TD>');
2078
2079 htp.tableRowClose;
2080
2081 htp.tableclose;
2082
2083 htp.formClose;
2084
2085 htp.br;
2086
2087 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
2088
2089 --Next Button
2090
2091 htp.tableRowOpen;
2092
2093 l_url := wfa_html.base_url||
2094 '/wf_queue.generic_queue_display_contents'||
2095 '?p_protocol='||p_protocol||
2096 '&p_inbound_outbound='||p_inbound_outbound||
2097 '&p_queue_number='||to_char(p_queue_number)||
2098 '&p_message_number='||to_char(p_message_number + 1);
2099
2100 l_icon := 'FNDJLFOK.gif';
2101 l_text := wf_core.translate ('NEXT');
2102 l_onmouseover := wf_core.translate ('NEXT');
2103
2104 htp.p('<TD ID="">');
2105
2106 wfa_html.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2107
2108 htp.p('</TD>');
2109
2110 if (p_message_number > 1) then
2111
2112 l_url := wfa_html.base_url||
2113 '/wf_queue.generic_queue_display_contents'||
2114 '?p_protocol='||p_protocol||
2115 '&p_inbound_outbound='||p_inbound_outbound||
2116 '&p_queue_number='||to_char(p_queue_number)||
2117 '&p_message_number='||to_char(p_message_number - 1);
2118
2119 l_icon := 'FNDJLFCN.gif';
2120 l_text := wf_core.translate ('PREVIOUS');
2121 l_onmouseover := wf_core.translate ('PREVIOUS');
2122
2123 htp.p('<TD ID="">');
2124
2125 wfa_html.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2126
2127 htp.p('</TD>');
2128
2129 end if;
2130
2131 htp.tableRowClose;
2132
2133 htp.tableclose;
2134
2135 wfa_sec.Footer;
2136
2137 htp.htmlClose;
2138
2139 exception
2140 when others then
2141 Wf_Core.Context('Wf_Queue', 'generic_queue_display_contents',
2142 p_protocol, p_inbound_outbound);
2143 raise;
2144
2145 end generic_queue_display_contents;
2146
2147
2148
2149 -- MODIFICATION LOG:
2150 -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA
2151 -- - Added ID attr for TD tags
2152 procedure Generic_Queue_Edit (
2153 p_protocol IN VARCHAR2 DEFAULT NULL,
2154 p_inbound_outbound IN VARCHAR2 DEFAULT NULL
2155 ) IS
2156
2157 username varchar2(320); -- Username to query
2158 admin_role varchar2(320); -- Role for admin mode
2159 admin_mode varchar2(1) := 'N';
2160 l_inbound_selected varchar2(1) := 'N';
2161 l_outbound_selected varchar2(1) := 'N';
2162 l_description VARCHAR2(240);
2163 l_queue_count NUMBER;
2164 l_media varchar2(240) := wfa_html.image_loc;
2165 l_icon varchar2(40) := 'FNDILOV.gif';
2166 l_text varchar2(240) := '';
2167 l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV');
2168 l_url varchar2(4000);
2169 l_error_msg varchar2(240);
2170
2171 BEGIN
2172
2173 -- Check current user has admin authority
2174 wfa_sec.GetSession(username);
2175 username := upper(username);
2176
2177 admin_role := wf_core.translate('WF_ADMIN_ROLE');
2178 if (admin_role = '*' or
2179 Wf_Directory.IsPerformer(username, admin_role)) then
2180 admin_mode := 'Y';
2181 else
2182
2183 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
2184
2185 end if;
2186
2187 /*
2188 ** If this protocol already exists then go fetch the definition
2189 */
2190 IF (p_protocol IS NOT NULL) THEN
2191
2192 SELECT description,
2193 queue_count
2194 INTO l_description,
2195 l_queue_count
2196 FROM wf_queues
2197 WHERE protocol = p_protocol
2198 AND inbound_outbound = p_inbound_outbound;
2199
2200 END IF;
2201
2202 -- Set page title
2203 htp.htmlOpen;
2204 htp.headOpen;
2205 htp.title(wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'));
2206 wfa_html.create_help_function('wf/links/dmr.htm?DMREP');
2207
2208 wf_lov.OpenLovWinHtml;
2209
2210 htp.headClose;
2211
2212 -- Page header
2213 wfa_sec.Header(FALSE, '', wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'), TRUE);
2214
2215 IF (admin_mode = 'N') THEN
2216
2217 htp.center(htf.bold(l_error_msg));
2218 return;
2219
2220 END IF;
2221
2222 htp.tableopen(calign=>'CENTER',cattributes=>'summary="' || wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE') || '"');
2223
2224 htp.p('<FORM NAME="FND_GENERIC_QUEUE" ACTION="wf_queue.generic_queue_update" METHOD="POST">');
2225
2226 -- Protocol Name
2227 htp.tableRowOpen;
2228 htp.tableData(cvalue=>'<LABEL FOR="i_protocol">' ||
2229 wf_core.translate('PROTOCOL') || '</LABEL>',
2230 calign=>'right',
2231 cattributes=>'id=""');
2232
2233 htp.tableData(htf.formText(cname=>'p_protocol', csize=>'30',
2234 cvalue=>p_protocol, cmaxlength=>'30',
2235 cattributes=>'id="i_protocol"'),
2236 cattributes=>'id=""');
2237
2238 htp.tableRowClose;
2239
2240 -- Inbound/outbound
2241 htp.tableRowOpen;
2242 htp.tableData(cvalue=>'<LABEL FOR="i_inbound_outbound">' ||
2243 wf_core.translate('INBOUND_OUTBOUND') || '</LABEL>',
2244 calign=>'right',
2245 cattributes=>'id=""');
2246
2247 if (NVL(p_inbound_outbound, 'OUTBOUND') = 'INBOUND') then
2248
2249 l_inbound_selected := 'Y';
2250 l_outbound_selected := NULL;
2251
2252 else
2253
2254 l_inbound_selected := NULL;
2255 l_outbound_selected := 'Y';
2256
2257 end if;
2258
2259 htp.p('<TD ID="">');
2260
2261 htp.formSelectOpen(cname=>'p_inbound_outbound',cattributes=>'id="i_inbound_outbound"');
2262
2263 htp.formSelectOption(cvalue=>wf_core.translate('INBOUND'),
2264 cattributes=>'value=INBOUND',
2265 cselected=>l_inbound_selected);
2266
2267 htp.formSelectOption(cvalue=>wf_core.translate('OUTBOUND'),
2268 cattributes=>'value=OUTBOUND',
2269 cselected=>l_outbound_selected);
2270
2271 htp.formSelectClose;
2272 htp.p('</TD>');
2273
2274 htp.tableRowClose;
2275
2276 -- Description
2277 htp.tableRowOpen;
2278 htp.tableData(cvalue=>'<LABEL FOR="i_description">' ||
2279 wf_core.translate('DESCRIPTION') || '"',
2280 calign=>'right',
2281 cattributes=>'id=""');
2282
2283 htp.tableData(htf.formText(cname=>'p_description', csize=>'30',
2284 cvalue=>l_description, cmaxlength=>'240',
2285 cattributes=>'id="i_description"'),
2286 cattributes=>'id=""');
2287
2288 htp.tableRowClose;
2289
2290 -- Count
2291 htp.tableRowOpen;
2292 htp.tableData(cvalue=>'<LABEL FOR="i_count">' ||
2293 wf_core.translate('COUNT') || '"',
2294 calign=>'right',
2295 cattributes=>'id=""');
2296
2297 htp.tableData(htf.formText(cname=>'p_queue_count', csize=>'10',
2298 cvalue=>l_queue_count, cmaxlength=>'20',
2299 cattributes=>'id="i_count"'),
2300 cattributes=>'id=""');
2301
2302 htp.tableRowClose;
2303
2304 -- keep track of the original protocol and the inbound/outbound
2305 -- value in case the name changes
2306
2307 htp.formHidden(cname=>'p_original_protocol', cvalue=>p_protocol);
2308 htp.formHidden(cname=>'p_original_inbound', cvalue=>p_inbound_outbound);
2309
2310 htp.tableclose;
2311
2312 htp.br;
2313
2314 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
2315
2316 --Submit Button
2317
2318 htp.tableRowOpen;
2319
2320 l_url := 'javascript:document.FND_GENERIC_QUEUE.submit()';
2321 l_icon := 'FNDJLFOK.gif';
2322 l_text := wf_core.translate ('WFMON_OK');
2323 l_onmouseover := wf_core.translate ('WFMON_OK');
2324
2325 htp.p('<TD ID="">');
2326
2327 wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2328
2329 htp.p('</TD>');
2330
2331 l_url := wfa_html.base_url||'/fnd_document_management.Generic_Queue_Display';
2332 l_icon := 'FNDJLFCN.gif';
2333 l_text := wf_core.translate ('CANCEL');
2334 l_onmouseover := wf_core.translate ('CANCEL');
2335
2336 htp.p('<TD ID="">');
2337
2338 wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2339
2340 htp.p('</TD>');
2341
2342 htp.tableRowClose;
2343
2344 htp.tableclose;
2345
2346 htp.formClose;
2347
2348 wfa_sec.Footer;
2349 htp.htmlClose;
2350
2351
2352 exception
2353 when others then
2354 wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_edit');
2355 raise;
2356
2357 END Generic_Queue_Edit;
2358
2359 procedure generic_queue_delete_check
2360 (p_protocol in varchar2,
2361 p_inbound_outbound in varchar2,
2362 p_queue_start_range in number,
2363 p_queue_end_range in number) IS
2364
2365 ii NUMBER := 0;
2366 l_count NUMBER := 0;
2367 l_sql varchar2(1000);
2368
2369 BEGIN
2370
2371 /*
2372 ** Check to make sure there are no messages in the queue before
2373 ** you delete it.
2374 */
2375 for ii in p_queue_start_range..p_queue_end_range loop
2376
2377 /*
2378 ** Check to see if there are any messages in the specified queue
2379 */
2380 -- p_protocol and p_inbound was verified before coming here.
2381 -- BINDVAR_SCAN_IGNORE
2382 l_sql := 'SELECT COUNT(1) INTO :a FROM WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE';
2383
2384 execute immediate l_sql using l_count;
2385
2386 /*
2387 ** If you find a row then error this call
2388 */
2389 if (l_count > 0) then
2390
2391 wf_core.token('PROTOCOL', p_protocol);
2392 wf_core.token('INBOUND_OUTBOUD', p_inbound_outbound);
2393 wf_core.token('QUEUE_NUMBER', to_char(ii));
2394 wf_core.raise('WFQUEUE_QUEUE_CONTENT');
2395
2396 end if;
2397
2398 end loop;
2399
2400 exception
2401 when others then
2402 Wf_Core.Context('Wf_Queue', 'generic_queue_delete_check',
2403 p_protocol, p_inbound_outbound);
2404 raise;
2405
2406 end generic_queue_delete_check;
2407
2408 procedure generic_queue_delete_queues
2409 (p_protocol in varchar2,
2410 p_inbound_outbound in varchar2,
2411 p_queue_start_range in number,
2412 p_queue_end_range in number) IS
2413
2414 ii NUMBER := 0;
2415 l_count NUMBER := 0;
2416
2417 BEGIN
2418
2419 /*
2420 ** Delete the queues and queue tables
2421 */
2422 for ii in p_queue_start_range..p_queue_end_range loop
2423
2424 /*
2425 ** Stop the queue
2426 */
2427 dbms_aqadm.stop_queue(queue_name => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||
2428 p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||
2429 to_char(ii)||'_QUEUE');
2430 /*
2431 ** Delete the Queues
2432 */
2433 dbms_aqadm.drop_queue(
2434 queue_name => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||
2435 substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE');
2436
2437 /*
2438 ** Delete the Queue Table
2439 */
2440 dbms_aqadm.drop_queue_table (
2441 queue_table => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE');
2442
2443 end loop;
2444
2445 exception
2446 when others then
2447 Wf_Core.Context('Wf_Queue', 'generic_queue_delete_queues',
2448 p_protocol, p_inbound_outbound);
2449 raise;
2450
2451 end generic_queue_delete_queues;
2452
2453
2454
2455 -- MODIFICATION LOG:
2456 -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA
2457 -- - Added ID attr for TD tags
2458 procedure Generic_Queue_Update (
2459 p_protocol IN VARCHAR2 DEFAULT NULL,
2460 p_inbound_outbound IN VARCHAR2 DEFAULT NULL,
2461 p_description IN VARCHAR2 DEFAULT NULL,
2462 p_queue_count IN VARCHAR2 DEFAULT NULL,
2463 p_original_protocol IN VARCHAR2 DEFAULT NULL,
2464 p_original_inbound IN VARCHAR2 DEFAULT NULL
2465 ) IS
2466
2467 username varchar2(320); -- Username to query
2468 admin_role varchar2(320); -- Role for admin mode
2469 admin_mode varchar2(1) := 'N';
2470 l_count number := 0;
2471 l_media varchar2(240) := wfa_html.image_loc;
2472 l_icon varchar2(30) := 'FNDILOV.gif';
2473 l_text varchar2(240) := '';
2474 l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV');
2475 l_url varchar2(4000);
2476 l_error_msg varchar2(240);
2477
2478 BEGIN
2479
2480 -- Check current user has admin authority
2481 wfa_sec.GetSession(username);
2482 username := upper(username);
2483
2484 admin_role := wf_core.translate('WF_ADMIN_ROLE');
2485
2486 if (admin_role = '*' or
2487 Wf_Directory.IsPerformer(username, admin_role)) then
2488 admin_mode := 'Y';
2489 else
2490
2491 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
2492
2493 end if;
2494
2495 IF (admin_mode = 'N') THEN
2496
2497 htp.center(htf.bold(l_error_msg));
2498 return;
2499
2500 END IF;
2501
2502 -- Check to make sure the protocol does not already exist
2503 IF (p_original_protocol IS NULL) THEN
2504
2505 SELECT count(1)
2506 INTO l_count
2507 FROM wf_queues
2508 WHERE UPPER(p_protocol) = protocol
2509 AND p_inbound_outbound = inbound_outbound;
2510
2511 if (l_count > 0) then
2512
2513 htp.p('<BODY bgcolor=#cccccc>');
2514 htp.center(htf.bold(wf_core.translate('WFQUEUE_ALREADY_EXISTS')));
2515 htp.br;
2516
2517 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
2518
2519 --Submit Button
2520
2521 htp.tableRowOpen;
2522
2523 l_url := wfa_html.base_url||
2524 '/wf_queue.generic_queue_edit';
2525 l_icon := 'FNDJLFOK.gif';
2526 l_text := wf_core.translate ('WFMON_OK');
2527 l_onmouseover := wf_core.translate ('WFMON_OK');
2528
2529 htp.p('<TD ID="">');
2530
2531 wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2532
2533 htp.p('</TD>');
2534 htp.tablerowclose;
2535 htp.tableclose;
2536 htp.p('</BODY>');
2537 return;
2538
2539 else
2540
2541 wf_queue.create_generic_queue (p_protocol=>p_protocol,
2542 p_inbound_outbound => p_inbound_outbound,
2543 p_description => p_description,
2544 p_queue_count => to_number(p_queue_count));
2545
2546 end if;
2547
2548 else
2549 null;
2550
2551 /*
2552 wf_queue.update_generic_queue (p_protocol=>p_protocol,
2553 p_inbound_outbound => p_inbound_outbound,
2554 p_description => p_description,
2555 p_queue_count => to_number(p_queue_count),
2556 p_original_protocol=> p_original_protocol,
2557 p_original_inbound=> p_original_inbound);
2558
2559 */
2560 end if;
2561
2562
2563 -- use owa_util.redirect_url to redirect the URL to the home page
2564 owa_util.redirect_url(curl=>wfa_html.base_url ||
2565 '/wf_queue.Generic_Queue_Display',
2566 bclose_header=>TRUE);
2567
2568
2569 exception
2570 when others then
2571 wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_update');
2572 raise;
2573
2574 END Generic_Queue_Update;
2575
2576
2577 /*
2578 ** Create a generic queue with the object type of WF_MESSAGE_PAYLOAD_T which
2579 ** is basically just a clob
2580 */
2581 procedure create_generic_queue
2582 (p_protocol IN VARCHAR2,
2583 p_inbound_outbound IN VARCHAR2,
2584 p_description IN VARCHAR2,
2585 p_queue_count IN NUMBER) IS
2586
2587 l_count NUMBER := 0;
2588
2589 begin
2590
2591 /*
2592 ** Check to see if the queue name already exists
2593 */
2594 select count(1)
2595 into l_count
2596 from wf_queues wfq
2597 where wfq.protocol = p_protocol
2598 and wfq.inbound_outbound = p_inbound_outbound;
2599
2600 /*
2601 ** If you find a row then error this call
2602 */
2603 if (l_count > 0) then
2604
2605 wf_core.token('PROTOCOL', p_protocol);
2606 wf_core.raise('WFQUEUE_UNIQUE_NAME');
2607
2608 end if;
2609
2610 for ii in 1..p_queue_count loop
2611
2612 /*
2613 ** Create New Queue Table
2614 */
2615 dbms_aqadm.create_queue_table (
2616 queue_table => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE',
2617 queue_payload_type => 'SYSTEM.WF_MESSAGE_PAYLOAD_T',
2618 storage_clause => 'storage (initial 1m next 1m pctincrease 0 )',
2619 sort_list => 'PRIORITY,ENQ_TIME',
2620 comment => wf_core.translate('WORKFLOW_USER_QUEUE_TABLE')||' - '||
2621 wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE');
2622
2623 /*
2624 ** Create New Queues
2625 */
2626 dbms_aqadm.create_queue(
2627 queue_name => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||
2628 substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE',
2629 queue_table => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||
2630 substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE',
2631 max_retries => 0,
2632 comment => wf_core.translate('WORKFLOW_USER_QUEUE')||' - '||
2633 wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||
2634 substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE');
2635
2636 /*
2637 ** Start the queue
2638 */
2639 dbms_aqadm.start_queue(queue_name => wf_core.translate('WF_SCHEMA')||'.'||
2640 'WF_'||p_protocol||'_'||
2641 substr(p_inbound_outbound, 1, 1)|| '_'||to_char(ii)||'_QUEUE');
2642
2643 end loop;
2644
2645 /*
2646 ** Create an entry in WF_QUEUES table
2647 */
2648 insert into wf_queues
2649 (protocol,
2650 inbound_outbound,
2651 description,
2652 queue_count,
2653 disable_flag)
2654 values
2655 (p_protocol,
2656 p_inbound_outbound,
2657 p_description,
2658 p_queue_count,
2659 'N');
2660
2661 exception
2662 when others then
2663 Wf_Core.Context('Wf_Queue', 'create_generic_queue', p_protocol,
2664 p_inbound_outbound);
2665 raise;
2666
2667 end create_generic_queue;
2668
2669 /*
2670 ** delete a generic queue with the object type of WF_MESSAGE_PAYLOAD_T which
2671 ** is basically just a clob
2672 */
2673 procedure delete_generic_queue
2674 (p_protocol IN VARCHAR2,
2675 p_inbound_outbound IN VARCHAR2) IS
2676
2677 l_queue_count NUMBER := 0;
2678
2679 begin
2680
2681 /*
2682 ** Check to see if the queue name already exists
2683 */
2684 begin
2685
2686 select queue_count
2687 into l_queue_count
2688 from wf_queues wfq
2689 where wfq.protocol = p_protocol
2690 and wfq.inbound_outbound = p_inbound_outbound;
2691
2692 exception
2693 when no_data_found then
2694 wf_core.token('PROTOCOL', p_protocol);
2695 wf_core.raise('WFQUEUE_NOEXIST');
2696 when others then
2697 raise;
2698
2699 end;
2700
2701 /*
2702 ** Make sure the queues are empty
2703 */
2704 wf_queue.generic_queue_delete_check (p_protocol, p_inbound_outbound,
2705 1, l_queue_count);
2706
2707 /*
2708 ** Delete the queues and queue tables
2709 */
2710 wf_queue.generic_queue_delete_queues(p_protocol, p_inbound_outbound,
2711 1, l_queue_count);
2712
2713 /*
2714 ** delete an entry in WF_QUEUES table
2715 */
2716 delete from wf_queues
2717 where protocol = p_protocol
2718 and inbound_outbound = p_inbound_outbound;
2719
2720 exception
2721 when others then
2722 Wf_Core.Context('Wf_Queue', 'delele_generic_queue', p_protocol,
2723 p_inbound_outbound);
2724 raise;
2725
2726 end delete_generic_queue;
2727
2728 /*
2729 ** Procedure: get_hash_queue_name
2730 **
2731 ** Description: Load all queue definitions into memory. The use a hashing algorithm
2732 ** to return a queue name
2733 */
2734 procedure get_hash_queue_name
2735 (p_protocol in varchar2,
2736 p_inbound_outbound in varchar2,
2737 p_queue_name out NOCOPY varchar2) IS
2738
2739 qii number := 1;
2740 ii number := 1;
2741 l_index number := 0;
2742 l_queue_name varchar2(30) := null;
2743
2744 cursor get_queues is
2745 select protocol, inbound_outbound, queue_count
2746 from wf_queues
2747 order by protocol, inbound_outbound;
2748
2749 begin
2750
2751 /*
2752 ** Check to see if queues loaded into memory. If they are not
2753 ** already loaded
2754 */
2755 if (wf_queue.queue_names_index.count < 1) then
2756
2757 -- Show all nodes
2758 for wf_queues_list in get_queues loop
2759
2760 wf_queue.queue_names_index(ii).protocol := wf_queues_list.protocol;
2761 wf_queue.queue_names_index(ii).inbound_outbound := wf_queues_list.inbound_outbound;
2762 wf_queue.queue_names_index(ii).queue_count := wf_queues_list.queue_count;
2763
2764 ii := ii + 1;
2765
2766 end loop;
2767
2768 end if;
2769
2770 -- Go find the locator in the queue list that matches the request
2771 for ii in 1..wf_queue.queue_names_index.count loop
2772
2773 if (wf_queue.queue_names_index(ii).protocol = p_protocol AND
2774 wf_queue.queue_names_index(ii).inbound_outbound = p_inbound_outbound) THEN
2775
2776 -- If there is more than 1 queue then choose the queue based on a random
2777 -- number generator
2778 if (wf_queue.queue_names_index(ii).queue_count > 1) then
2779
2780 l_index := mod(to_number(wf_core.random), wf_queue.queue_names_index(ii).queue_count) + 1;
2781
2782 else
2783
2784 l_index := 1;
2785
2786 end if;
2787
2788 end if;
2789
2790 end loop;
2791
2792 if (l_index > 0) then
2793
2794 p_queue_name := wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'|| SUBSTR(p_inbound_outbound, 1, 1)||
2795 '_'||to_char(l_index)||'_QUEUE';
2796
2797 end if;
2798
2799 exception
2800 when others then
2801 Wf_Core.Context('Wf_Queue', 'get_hash_generic_queue',
2802 p_protocol, p_inbound_outbound);
2803 raise;
2804
2805 end get_hash_queue_name;
2806
2807 --
2808 -- Function: enable_exception_queue
2809 --
2810 -- Enable the exception queue for the queue table for dequing
2811 -- Returns the name of the exception queue for the given queue name
2812 --
2813 function enable_Exception_Queue(p_queue_name in varchar2) return varchar2
2814 is
2815 l_schema_name varchar(320);
2816 l_queue_name varchar2(30);
2817 l_pos integer := 0;
2818 l_queue_table varchar2(30);
2819 l_dequeue_enabled varchar2(7) := '';
2820 l_exception_queue varchar2(100) := '';
2821
2822 begin
2823 -- Check to see if the name has a schema. Rove it for the check.
2824 l_pos := instrb(p_queue_name,'.');
2825 if l_pos > 0 then
2826 l_schema_name := substrb(p_queue_name, 1, l_pos-1);
2827 l_queue_name := substrb(p_queue_name, l_pos+1);
2828 else
2829 l_schema_name := wf_core.translate('WF_SCHEMA');
2830 l_queue_name := p_queue_name;
2831 end if;
2832 begin
2833 select queue_table, dequeue_enabled
2834 into l_queue_table, l_dequeue_enabled
2835 from all_queues
2836 where owner = l_schema_name
2837 and name = l_queue_name;
2838 l_exception_queue := l_schema_name||'.'||'AQ$_'||
2839 l_queue_table||'_E';
2840 exception
2841 when no_data_found then
2842 l_exception_queue := '';
2843 l_dequeue_enabled := '';
2844 when others then
2845 raise;
2846 end;
2847
2848 if l_exception_queue <> '' and l_dequeue_enabled = 'NO' then
2849 dbms_aqadm.start_queue(queue_name => l_exception_queue,
2850 enqueue => FALSE,
2851 dequeue => TRUE);
2852 end if;
2853 return l_exception_queue;
2854
2855 exception
2856 when others then
2857 WF_CORE.Context('WF_QUEUE','Enable_Exception_Queue',p_queue_name);
2858 raise;
2859
2860 end enable_Exception_Queue;
2861
2862 -- ====================================================================
2863 -- Add Subscriber to Queue (PUBLIC)
2864 -- ====================================================================
2865 procedure AddSubscriber(queuename in varchar2,
2866 name in varchar2)
2867 as
2868 lagent sys.aq$_agent;
2869 begin
2870 lagent := sys.aq$_agent(name,'',0);
2871
2872 DBMS_AQADM.Add_Subscriber(
2873 queue_name=>queuename,
2874 subscriber=>lagent,
2875 rule=>'CORRID like '''||name||'%'''
2876 );
2877
2878 exception
2879 when OTHERS then
2880 Wf_Core.Context('WF_QUEUE','AddSubscriber',queuename, name);
2881 raise;
2882 end AddSubscriber;
2883
2884 -- Bug 2307428
2885 -- ====================================================================
2886 -- Enable Inbound and defrerred queues for Background Engine.
2887 -- ====================================================================
2888 procedure EnableBackgroundQueues as
2889 schema varchar2(320);
2890 queue_name varchar2(80);
2891 l_qname varchar2(80);
2892 CURSOR q_disabled (schema varchar2, queue_name varchar2) is
2893 SELECT name
2894 FROM all_queues
2895 WHERE name = queue_name
2896 AND owner = schema
2897 AND ((trim(enqueue_enabled) = 'NO') OR (trim(dequeue_enabled) = 'NO'));
2898
2899 begin
2900 --If the queue names haven't been set,initialise them
2901 if (wf_queue.name_init = FALSE) then
2902 wf_queue.set_queue_names;
2903 end if;
2904
2905 --Obtain the schema
2906 schema := wf_core.translate('WF_SCHEMA');
2907
2908 --Enable deferred queue
2909 queue_name := substr(wf_queue.deferred_queue_name,length(schema)+2);
2910 OPEN q_disabled (schema, queue_name);
2911 LOOP
2912 FETCH q_disabled into l_qname;
2913 EXIT WHEN q_disabled%NOTFOUND;
2914 DBMS_AQADM.START_QUEUE(wf_queue.deferred_queue_name);
2915 END LOOP;
2916 CLOSE q_disabled;
2917
2918 --Enable inbound queue
2919 queue_name := substr(wf_queue.inbound_queue_name,length(schema)+2);
2920 OPEN q_disabled (schema, queue_name);
2921 LOOP
2922 FETCH q_disabled into l_qname;
2923 EXIT WHEN q_disabled%NOTFOUND;
2924 DBMS_AQADM.START_QUEUE(wf_queue.inbound_queue_name);
2925 END LOOP;
2926 CLOSE q_disabled;
2927 exception
2928 when others then
2929 Wf_core.Context('WF_QUEUE','EnableBackgroundQueues');
2930 raise;
2931 end EnableBackgroundQueues;
2932 -- ====================================================================
2933 -- get Count Message States (PUBLIC)
2934 -- ====================================================================
2935 procedure getCntMsgSt
2936 (p_agent IN VARCHAR2 DEFAULT '%',
2937 p_ready OUT NOCOPY NUMBER,
2938 p_wait OUT NOCOPY NUMBER,
2939 p_processed OUT NOCOPY NUMBER,
2940 p_expired OUT NOCOPY NUMBER,
2941 p_undeliverable OUT NOCOPY NUMBER,
2942 p_error OUT NOCOPY NUMBER)
2943 is
2944
2945 TYPE cntmsgst_t IS REF CURSOR;
2946 l_cntmsgst cntmsgst_t;
2947 l_sqlstmt varchar2(4000);
2948 l_count number := 0;
2949 l_msgstate varchar2(50);
2950 l_pos number := 0;
2951 l_qt varchar2(100);
2952 l_owner varchar2(100);
2953
2954 -- <rraheja:2786474> Gather schema and queue name once rather than in every call for perf.
2955 l_schema varchar2(100);
2956 l_qname varchar2(100);
2957
2958
2959 -- <rraheja:2786474> Changed upper(name) to name as queue_name should be recorded in upper case.
2960 cursor c_localagents(p_agent varchar2) is
2961 select queue_name
2962 from wf_agents
2963 where system_guid = hextoraw(wf_core.translate('WF_SYSTEM_GUID'))
2964 and name like upper(p_agent);
2965
2966 /*
2967 cursor c_qt is
2968 select owner
2969 from all_queue_tables
2970 where queue_table = l_qt;
2971 */
2972
2973 -- <rraheja:2786474> Changed non-cursor single row query to cursor based for improved perf.
2974 cursor c_qtable is
2975 select queue_table
2976 from all_queues
2977 where owner = l_schema
2978 and name = l_qname;
2979 --and queue_type = 'NORMAL_QUEUE';
2980
2981 TABLE_NOTFOUND exception;
2982 pragma EXCEPTION_INIT(TABLE_NOTFOUND,-942);
2983
2984 INVALID_TABLE exception;
2985 pragma EXCEPTION_INIT(INVALID_TABLE,-903);
2986
2987 begin
2988
2989 -- Initialize Out Parameters
2990 p_ready := 0;
2991 p_wait := 0;
2992 p_processed := 0;
2993 p_expired := 0;
2994 p_undeliverable := 0;
2995 p_error := 0;
2996
2997 for i in c_localagents(p_agent) loop
2998
2999 -- Get the Queue Table plus owner
3000 l_pos := nvl(instr(i.queue_name,'.',1,1),0);
3001
3002 -- <rraheja:2786474> Changed non-cursor single row query to cursor and used vars for freq used data
3003 l_schema := substr(i.queue_name,1,l_pos-1);
3004 l_qname := substr(i.queue_name,l_pos+1);
3005 open c_qtable;
3006 fetch c_qtable into l_qt;
3007 close c_qtable;
3008
3009
3010 -- Get the Owner of the Queue Table
3011 -- <rraheja:2786474> queue owner should be = queue table owner, so commenting out the code
3012 /*
3013 open c_qt;
3014 fetch c_qt into l_owner;
3015 exit when c_qt%notfound;
3016 close c_qt;
3017 */
3018 l_owner := l_schema;
3019
3020 -- l_owner and l_qt are selected/derived from our own cursor
3021 -- BINDVAR_SCAN_IGNORE[2]
3022 l_sqlstmt := 'select msg_state, count(*) from '||l_owner||'.'||'aq$'||l_qt
3023 ||' where (queue = :q or queue = :r) group by msg_state';
3024 begin
3025 --If the queue tables are not found then the
3026 --select should throw ORA 942.
3027 --Put the begin catch block of exception at the end
3028 --so that u don't have to use goto's to get out of loop
3029 open l_cntmsgst for l_sqlstmt using l_qname,'AQ$_'|| l_qname ||'_E';
3030 loop
3031 fetch l_cntmsgst into l_msgstate, l_count;
3032 if l_msgstate = 'READY'then
3033 --Bug 2382594
3034 --If the agent is WF_ERROR do not count p_error.
3035 if l_qname = 'WF_ERROR' and p_agent = '%' then
3036 p_error := p_error + l_count;
3037 else
3038 p_ready := p_ready + l_count;
3039 end if;
3040 elsif l_msgstate = 'WAIT' then
3041 p_wait := p_wait + l_count;
3042 elsif l_msgstate = 'PROCESSED' then
3043 p_processed := p_processed + l_count;
3044 elsif l_msgstate = 'EXPIRED' then
3045 p_expired := p_expired + l_count;
3046 elsif l_msgstate = 'UNDELIVERABLE' then
3047 p_undeliverable := p_undeliverable + l_count;
3048 end if;
3049 l_count := 0;
3050
3051 exit when l_cntmsgst%notfound;
3052 end loop;
3053
3054 close l_cntmsgst;
3055 exception
3056 when table_notfound then
3057 --return 0 count instead of throwing error to UI
3058 --all the returns are at their initialized value of 0
3059 --just ensure that the cursor is closed
3060 if (l_cntmsgst%ISOPEN) then
3061 close l_cntmsgst;
3062 end if;
3063 when invalid_table then
3064 --return 0 count instead of throwing error to UI
3065 --all the returns are at their initialized value of 0
3066 --just ensure that the cursor is closed
3067 if (l_cntmsgst%ISOPEN) then
3068 close l_cntmsgst;
3069 end if;
3070 end;
3071
3072
3073 end loop; -- end loop for c_localagents
3074
3075 exception
3076 when OTHERS then
3077 if (l_cntmsgst%ISOPEN)
3078 then
3079 close l_cntmsgst;
3080 end if;
3081
3082 Wf_Core.Context('WF_QUEUE','getCntMsgSt',p_agent);
3083 raise;
3084 end getCntMsgSt;
3085
3086 --
3087 -- move_msgs_excep2normal (CONCURRENT PROGRAM API)
3088 -- API to move messages from the exception queue to the normal queue
3089 -- of the given agent. Handles wf_event_t and JMS_TEXT_MESSAGE payloads.
3090 --
3091 -- OUT
3092 -- errbuf - CP error message
3093 -- retcode - CP return code (0 = success, 1 = warning, 2 = error)
3094 -- IN
3095 -- p_agent_name - Agent name
3096 --
3097 procedure move_msgs_excep2normal(errbuf out nocopy varchar2,
3098 retcode out nocopy varchar2,
3099 p_agent_name in varchar2)
3100 as
3101 l_queue_name varchar2(100);
3102 l_queue_handler varchar2(100);
3103 l_schema varchar2(100);
3104 l_qname varchar2(100);
3105 l_excp_qname varchar2(100);
3106 l_object_type varchar2(100);
3107 l_obj_type varchar2(100);
3108 l_pos number := 0;
3109 l_timeout integer;
3110 l_dequeue_options dbms_aq.dequeue_options_t;
3111 l_enqueue_options dbms_aq.enqueue_options_t;
3112 l_message_properties dbms_aq.message_properties_t;
3113 l_payload_evt wf_event_t;
3114 l_payload_jms sys.aq$_JMS_TEXT_MESSAGE;
3115 l_msg_id raw(16);
3116 invalid_agent exception;
3117 invalid_type exception;
3118 pragma EXCEPTION_INIT(invalid_agent, -20201);
3119 pragma EXCEPTION_INIT(invalid_type, -20202);
3120
3121 begin
3122
3123 begin
3124 SELECT TRIM(queue_name), TRIM(queue_handler)
3125 INTO l_queue_name, l_queue_handler
3126 FROM wf_agents
3127 WHERE name = upper(p_agent_name)
3128 AND SYSTEM_GUID = wf_event.local_system_guid;
3129 exception
3130 when no_data_found then
3131 raise_application_error(-20201, 'Agent not found');
3132 when others then
3133 raise;
3134 end;
3135
3136 l_pos := instr(l_queue_name, '.', 1, 1);
3137 l_schema := substr(l_queue_name, 1, l_pos-1);
3138 l_qname := substr(l_queue_name, l_pos+1);
3139 l_excp_qname := 'AQ$_' || l_qname || '_E';
3140
3141 SELECT TRIM(object_type)
3142 INTO l_object_type
3143 FROM all_queue_tables
3144 WHERE queue_table in
3145 (
3146 SELECT queue_table
3147 FROM all_queues
3148 WHERE name = l_qname
3149 AND owner = l_schema
3150 )
3151 AND owner=l_schema;
3152
3153 l_pos := instr(l_object_type, '.', 1, 1);
3154 l_obj_type := substr(l_object_type, l_pos+1);
3155
3156 l_timeout := 0;
3157 l_dequeue_options.dequeue_mode := dbms_aq.REMOVE;
3158 l_dequeue_options.wait := dbms_aq.NO_WAIT;
3159 l_dequeue_options.consumer_name := null;
3160 l_enqueue_options.visibility := dbms_aq.ON_COMMIT;
3161
3162 if l_obj_type = 'WF_EVENT_T' then
3163 wf_event_t.Initialize(l_payload_evt);
3164 while (l_timeout = 0) loop
3165 begin
3166 --Dequeue the message from the exception queue
3167 dbms_aq.Dequeue(queue_name => l_schema || '.' || l_excp_qname,
3168 dequeue_options => l_dequeue_options,
3169 message_properties => l_message_properties,
3170 payload => l_payload_evt,
3171 msgid => l_msg_id);
3172 l_timeout := 0;
3173 --Enqueue the message in the normal queue
3174 l_message_properties.expiration := dbms_aq.never;
3175 if (upper(p_agent_name) = 'WF_ERROR' OR upper(p_agent_name) = 'WF_IN'
3176 OR upper(p_agent_name) = 'WF_OUT') then
3177 l_message_properties.recipient_list(1) := sys.aq$_agent(p_agent_name,
3178 null,
3179 0);
3180 end if;
3181 dbms_aq.enqueue(queue_name => l_queue_name,
3182 enqueue_options => l_enqueue_options,
3183 message_properties => l_message_properties,
3184 payload => l_payload_evt,
3185 msgid => l_msg_id);
3186 commit;
3187
3188 exception
3189 when dequeue_timeout then
3190 l_timeout := 1;
3191 when others then
3192 raise;
3193 end;
3194 end loop; --End of while loop that handles wf_event_t payload
3195
3196 elsif l_obj_type = 'AQ$_JMS_TEXT_MESSAGE' then
3197 l_timeout := 0;
3198 while (l_timeout = 0) loop
3199 begin
3200 --Dequeue the message from the exception queue
3201 dbms_aq.Dequeue(queue_name => l_schema || '.' || l_excp_qname,
3202 dequeue_options => l_dequeue_options,
3203 message_properties => l_message_properties,
3204 payload => l_payload_jms,
3205 msgid => l_msg_id);
3206 l_timeout := 0;
3207 --Enqueue the message in the normal queue of the given agent
3208 l_message_properties.expiration := dbms_aq.never;
3209 dbms_aq.enqueue(queue_name => l_queue_name,
3210 enqueue_options => l_enqueue_options,
3211 message_properties => l_message_properties,
3212 payload => l_payload_jms,
3213 msgid => l_msg_id);
3214 commit;
3215
3216 exception
3217 when dequeue_timeout then
3218 l_timeout := 1;
3219 when others then
3220 raise;
3221 end;
3222 end loop; --End of while loop that handles AQ$_JMS_TEXT_MESSAGE payload
3223
3224 else
3225 -- Payload not supported by this API, raise application error
3226 raise_application_error(-20202, 'Invalid payload type');
3227 end if;
3228
3229 errbuf := '';
3230 retcode := '0';
3231
3232 exception
3233 when invalid_agent then
3234 errbuf := 'The agent ' || p_agent_name || ' is not found ';
3235 retcode := '2';
3236 when invalid_type then
3237 errbuf := 'This API does not support payload of type '
3238 || l_obj_type || ' for agent ' || p_agent_name;
3239 retcode := '2';
3240 when others then
3241 errbuf := sqlerrm;
3242 retcode := '2';
3243 end move_msgs_excep2normal;
3244
3245 --
3246 -- Overloaded Procedure 1 : Definition without the AGE parameter
3247 --
3248 -- clean_evt
3249 -- Procedure to purge the messages in the READY state of a Queue
3250 -- of WF_EVENT_T or AQ$_JMS_TEXT_MESSAGE payload type. Supports correlation id based purge.
3251 --
3252 -- IN
3253 -- p_agent_name - Agent Name
3254 -- p_correlation - Correlation ID (Default Value : NULL)
3255 -- p_commit_frequency - Commit Level (Default Value : 500)
3256 --
3257 -- OUT
3258 -- p_msg_count - Count of the number of purged messages
3259 --
3260 procedure clean_evt(p_agent_name in varchar2,
3261 p_correlation in varchar2 default NULL,
3262 p_commit_frequency in number default 500,
3263 p_msg_count out nocopy number)
3264 as
3265 l_xcount integer;
3266 l_timeout integer;
3267 l_pos number := 0;
3268 l_schema varchar2(80);
3269 l_qname varchar2(80);
3270 l_queue_name varchar2(80);
3271 l_account_name varchar2(30);
3272 l_payload wf_event_t;
3273 l_msgid raw(16);
3274 l_message_handle raw(16) := NULL;
3275 l_dequeue_options dbms_aq.dequeue_options_t;
3276 l_message_properties dbms_aq.message_properties_t;
3277
3278 -- Bug 6112028
3279 l_data_type VARCHAR2(106);
3280 l_payload_jms SYS.AQ$_JMS_TEXT_MESSAGE;
3281
3282 --Define the snapshot too old error
3283 snap_too_old exception;
3284 pragma exception_init(snap_too_old, -1555);
3285
3286 begin
3287 p_msg_count := 0;
3288 l_timeout := 0;
3289 l_xcount := 0;
3290
3291 SELECT queue_name
3292 INTO l_queue_name
3293 FROM wf_agents
3294 WHERE name = upper(p_agent_name)
3295 AND SYSTEM_GUID = wf_event.local_system_guid;
3296
3297 l_pos := instr(l_queue_name, '.', 1, 1);
3298 l_schema := substr(l_queue_name, 1, l_pos-1);
3299 l_qname := substr(l_queue_name, l_pos+1);
3300
3301 SELECT TRIM(object_type)
3302 INTO l_data_type
3303 FROM all_queue_tables
3304 WHERE queue_table in
3305 (
3306 SELECT queue_table
3307 FROM all_queues
3308 WHERE name = l_qname
3309 AND owner = l_schema
3310 )
3311 AND owner=l_schema;
3312
3313 l_pos := instr(l_data_type, '.', 1, 1);
3314 l_data_type := substr(l_data_type, l_pos+1);
3315
3316 --No processing is done on the payload data
3317 --So dequeue is done in the REMOVE_NODATA mode
3318 l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
3319 l_dequeue_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
3320 l_dequeue_options.wait := dbms_aq.NO_WAIT;
3321 l_dequeue_options.consumer_name := upper(p_agent_name);
3322
3323 --Set the Correlation ID for dequeue only if available
3324 --If the given agent is a Workflow Agent then append the
3325 --Account Name before the Correlation ID
3326 if ((p_correlation is not null) or (p_correlation <> '')) then
3327 -- Seeded WF agents
3328 if (upper(p_agent_name) like 'WF_%') then
3329 if (wf_event.account_name is null) then
3330 wf_event.SetAccountName;
3331 end if;
3332 l_dequeue_options.correlation := wf_event.account_name
3333 || ':'
3334 || p_correlation;
3335 else
3336 l_dequeue_options.correlation := p_correlation;
3337 end if;
3338 end if;
3339
3340 -- All the messages with the given correlation id are to be purged
3341 -- In this case, the $fnd/sql/wfevqcln.sql script logic is followed
3342 -- The dequeue is based on the given correlation id
3343 while (l_timeout = 0) loop
3344 begin
3345
3346 if (l_data_type = 'WF_EVENT_T') then
3347 dbms_aq.Dequeue(queue_name => l_queue_name,
3348 dequeue_options => l_dequeue_options,
3349 message_properties => l_message_properties, /* OUT */
3350 payload => l_payload, /* OUT */
3351 msgid => l_message_handle); /* OUT */
3352 elsif l_data_type = 'AQ$_JMS_TEXT_MESSAGE' then
3353 dbms_aq.Dequeue(queue_name => l_queue_name,
3354 dequeue_options => l_dequeue_options,
3355 message_properties => l_message_properties, /* OUT */
3356 payload => l_payload_jms, /* OUT */
3357 msgid => l_message_handle); /* OUT */
3358 else
3359 -- Payload not supported by this API, raise application error
3360 Wf_core.token('PAYLOAD', l_data_type);
3361 Wf_core.raise('WFE_PAYLOAD_UNSUPP');
3362 end if;
3363
3364 l_xcount := l_xcount + 1;
3365 l_timeout := 0;
3366
3367 exception
3368 when dequeue_disabled then
3369 raise;
3370 when dequeue_timeout then
3371 l_timeout := 1;
3372 --Capture the snapshot too old error
3373 when snap_too_old then
3374 --Workaround for AQ when receiving ORA-01555 using NEXT_MESSAGE as
3375 --navigation. We will try to set to FIRST_MESSAGE and dequeue to
3376 --silently handle this exception.
3377 if (l_dequeue_options.navigation = dbms_aq.FIRST_MESSAGE) then
3378 raise;
3379 else
3380 l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
3381
3382 if (l_data_type = 'WF_EVENT_T') then
3383 dbms_aq.Dequeue(queue_name => l_queue_name,
3384 dequeue_options => l_dequeue_options,
3385 message_properties => l_message_properties, /* OUT */
3386 payload => l_payload, /* OUT */
3387 msgid => l_message_handle); /* OUT */
3388
3389 elsif l_data_type = 'AQ$_JMS_TEXT_MESSAGE' then
3390 dbms_aq.Dequeue(queue_name => l_queue_name,
3391 dequeue_options => l_dequeue_options,
3392 message_properties => l_message_properties, /* OUT */
3393 payload => l_payload_jms, /* OUT */
3394 msgid => l_message_handle); /* OUT */
3395 else
3396 -- Payload not supported by this API, raise application error
3397 Wf_core.token('PAYLOAD', l_data_type);
3398 Wf_core.raise('WFE_PAYLOAD_UNSUPP');
3399 end if;
3400
3401 l_xcount := l_xcount + 1;
3402 l_timeout := 0;
3403 end if;
3404 when others then
3405 raise;
3406 end;
3407
3408 l_dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
3409 --Commit if commit frequency
3410 if l_xcount >= p_commit_frequency then
3411 commit;
3412 p_msg_count := p_msg_count + l_xcount;
3413 l_xcount := 0;
3414 end if;
3415 end loop;
3416
3417 commit;
3418 p_msg_count := p_msg_count + l_xcount;
3419
3420 exception
3421 when others then
3422 Wf_core.Context('WF_QUEUE', 'Clean_evt', p_agent_name,
3423 p_correlation, to_char(p_commit_frequency));
3424 raise;
3425 end clean_evt;
3426
3427 --
3428 -- Overloaded Procedure 2 : Definition with the AGE parameter
3429 --
3430 -- clean_evt
3431 -- Procedure to purge the messages in the READY state of a Queue
3432 -- of WF_EVENT_T or AQ$_JMS_TEXT_MESSAGE payload type. Supports time-based selective
3433 -- purge with correlation id.
3434 --
3435 -- IN
3436 -- p_agent_name - Agent Name
3437 -- p_correlation - Correlation ID (Default Value : NULL)
3438 -- p_commit_frequency - Commit Level (Default Value : 500)
3439 -- p_age - Age of the Messages (No default value
3440 -- as this is a overloaded procedure)
3441 --
3442 -- OUT
3443 -- p_msg_count - Count of the number of purged messages
3444 --
3445 procedure clean_evt(p_agent_name in varchar2,
3446 p_correlation in varchar2 default NULL,
3447 p_commit_frequency in number default 500,
3448 p_msg_count out nocopy number,
3449 p_age in number)
3450 as
3451 l_xcount integer;
3452 l_pos number := 0;
3453 l_schema varchar2(80);
3454 l_qname varchar2(80);
3455 l_corrid varchar2(128);
3456 l_queue_name varchar2(80);
3457 l_account_name varchar2(30);
3458 l_payload wf_event_t;
3459 l_msgid raw(16);
3460 l_message_handle raw(16) := NULL;
3461 l_dequeue_options dbms_aq.dequeue_options_t;
3462 l_message_properties dbms_aq.message_properties_t;
3463
3464 -- Bug 6112028
3465 l_data_type VARCHAR2(106);
3466 l_payload_jms SYS.AQ$_JMS_TEXT_MESSAGE;
3467
3468 -- Cursor to get all messages from the queue that were enqueued before
3469 -- a given date.
3470 TYPE c_msgs_typ IS REF CURSOR;
3471 c_msgs c_msgs_typ;
3472 --Define the snapshot too old error
3473 snap_too_old exception;
3474 pragma exception_init(snap_too_old, -1555);
3475 begin
3476 p_msg_count := 0;
3477 l_xcount := 0;
3478
3479 SELECT queue_name
3480 INTO l_queue_name
3481 FROM wf_agents
3482 WHERE name = upper(p_agent_name)
3483 AND SYSTEM_GUID = wf_event.local_system_guid;
3484
3485 l_pos := instr(l_queue_name, '.', 1, 1);
3486 l_schema := substr(l_queue_name, 1, l_pos-1);
3487 l_qname := substr(l_queue_name, l_pos+1);
3488
3489 SELECT TRIM(object_type)
3490 INTO l_data_type
3491 FROM all_queue_tables
3492 WHERE queue_table in
3493 (
3494 SELECT queue_table
3495 FROM all_queues
3496 WHERE name = l_qname
3497 AND owner = l_schema
3498 )
3499 AND owner=l_schema;
3500 -- Query from the AQ view table
3501 l_qname := l_schema || '.AQ$' || l_qname;
3502
3503 l_pos := instr(l_data_type, '.', 1, 1);
3504 l_data_type := substr(l_data_type, l_pos+1);
3505
3506 --No processing is done on the payload data
3507 --So dequeue is done in the REMOVE_NODATA mode
3508 l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
3509 l_dequeue_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
3510 l_dequeue_options.wait := dbms_aq.NO_WAIT;
3511 l_dequeue_options.consumer_name := upper(p_agent_name);
3512 --
3513 --Set the Correlation ID for dequeue only if available
3514 --If the given agent is a Workflow Agent then append the
3515 --Account Name before the Correlation ID
3516 --
3517 -- All the message ids older than the specified age are queried
3518 -- and the dequeue is done on the retrieved message ids
3519 --
3520 if ((p_correlation is not null) or (p_correlation <> '')) then
3521 -- Seeded WF agents
3522 if (upper(p_agent_name) like 'WF_%') then
3523 if (wf_event.account_name is null) then
3524 wf_event.SetAccountName;
3525 end if;
3526 l_corrid := wf_event.account_name
3527 || ':'
3528 || p_correlation;
3529 else
3530 l_corrid := p_correlation;
3531 end if;
3532 -- The dequeue should be based on the msg ids retrieved in
3533 -- the following query, not on any correlation id.
3534 -- So the l_dequeue_options.correlation is not set.
3535 OPEN c_msgs FOR
3536 'SELECT msg_id FROM '
3537 || l_qname
3538 || ' WHERE msg_state = ''' || 'READY'' '
3539 || ' AND enq_time < (sysdate - :1) '
3540 || ' AND corr_id like :2 ' using p_age,l_corrid;
3541 else
3542 -- If the given correlation is null then the query do not
3543 -- need it, as we consider a null correlation to be %
3544 -- The dequeue_options.correlation will be null by default
3545 OPEN c_msgs FOR
3546 'SELECT msg_id FROM '
3547 || l_qname
3548 || ' WHERE msg_state = ''' || 'READY'' '
3549 || ' AND enq_time < (sysdate - :1) ' using p_age;
3550 end if;
3551
3552 -- Dequeue messages based on the msg id
3553 loop
3554 fetch c_msgs into l_msgid;
3555 exit when c_msgs%notfound;
3556 l_dequeue_options.msgid := l_msgid;
3557 begin
3558
3559 if (l_data_type = 'WF_EVENT_T') then
3560 dbms_aq.Dequeue(queue_name => l_queue_name,
3561 dequeue_options => l_dequeue_options,
3562 message_properties => l_message_properties,
3563 payload => l_payload,
3564 msgid => l_message_handle);
3565 elsif l_data_type = 'AQ$_JMS_TEXT_MESSAGE' then
3566
3567 dbms_aq.Dequeue(queue_name => l_queue_name,
3568 dequeue_options => l_dequeue_options,
3569 message_properties => l_message_properties,
3570 payload => l_payload_jms,
3571 msgid => l_message_handle);
3572 else
3573 -- Payload not supported by this API, raise application error
3574 Wf_core.token('PAYLOAD', l_data_type);
3575 Wf_core.raise('WFE_PAYLOAD_UNSUPP');
3576 end if;
3577 l_xcount := l_xcount + 1;
3578
3579 exception
3580 when dequeue_disabled then
3581 raise;
3582 when snap_too_old then
3583 --Workaround for AQ when receiving ORA-01555 using NEXT_MESSAGE as
3584 --navigation. We will try to set to FIRST_MESSAGE and dequeue to
3585 --silently handle this exception.
3586 if (l_dequeue_options.navigation = dbms_aq.FIRST_MESSAGE) then
3587 raise;
3588 else
3589 l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
3590
3591 if (l_data_type = 'WF_EVENT_T') then
3592 dbms_aq.Dequeue(queue_name => l_queue_name,
3593 dequeue_options => l_dequeue_options,
3594 message_properties => l_message_properties, /* OUT */
3595 payload => l_payload, /* OUT */
3596 msgid => l_message_handle); /* OUT */
3597
3598 elsif l_data_type = 'AQ$_JMS_TEXT_MESSAGE' then
3599 dbms_aq.Dequeue(queue_name => l_queue_name,
3600 dequeue_options => l_dequeue_options,
3601 message_properties => l_message_properties, /* OUT */
3602 payload => l_payload_jms, /* OUT */
3603 msgid => l_message_handle); /* OUT */
3604 else
3605 -- Payload not supported by this API, raise application error
3606 Wf_core.token('PAYLOAD', l_data_type);
3607 Wf_core.raise('WFE_PAYLOAD_UNSUPP');
3608 end if;
3609
3610 l_xcount := l_xcount + 1;
3611 end if;
3612 when others then
3613 raise;
3614 end; -- cursor begin
3615
3616 -- Commit if commit frequency
3617 if l_xcount >= p_commit_frequency then
3618 commit;
3619 p_msg_count := p_msg_count + l_xcount;
3620 l_xcount := 0;
3621 end if;
3622 end loop;
3623
3624 commit;
3625 p_msg_count := p_msg_count + l_xcount;
3626
3627 exception
3628 when others then
3629 Wf_core.Context('WF_QUEUE', 'Clean_evt', p_agent_name, p_correlation,
3630 to_char(p_commit_frequency), to_char(p_age));
3631 raise;
3632 end clean_evt;
3633 --------------------------------------------------------------------------------
3634 /*
3635 ** Bug 4005674 - Populate Continuous Loop Global Variables
3636 */
3637 begin
3638 wf_queue.g_defer_occurrence := 100;
3639 wf_queue.g_add_delay_seconds := 300;
3640 wf_queue.g_max_delay_seconds := 3600;
3641 --------------------------------------------------------------------------------
3642 end WF_QUEUE;