1 PACKAGE BODY WF_QUEUE AS
2 /* $Header: wfqueb.pls 120.9 2011/08/10 18:04:40 alsosa ship $ */
3
4 --
5 -- Exceptions
6 --
7 dequeue_timeout exception;
8 pragma EXCEPTION_INIT(dequeue_timeout, -25228);
9
10 dequeue_disabled exception;
11 pragma EXCEPTION_INIT(dequeue_disabled, -25226);
12
13 dequeue_outofseq exception;
14 pragma EXCEPTION_INIT(dequeue_outofseq, -25237);
15
16 no_queue exception;
17 pragma EXCEPTION_INIT(no_queue, -24010);
18
19 shutdown_pending exception;
20
21 no_savepoint exception;
22 pragma EXCEPTION_INIT(no_savepoint, -1086);
23
24 msgid_notfound exception;
25 pragma EXCEPTION_INIT(msgid_notfound, -25263);
26
27 -- Bug 4005674
28 -- private global variables to store the item_type:item_key:actid at dequeue time
29 g_dequeue_item_type varchar2(8);
30 g_dequeue_item_key varchar2(240);
31 g_dequeue_actid number;
32 g_background_begin_date date;
33 g_Key number;
34
35 TYPE ActivityHistoryREC IS RECORD (
36 ITEM_TYPE VARCHAR2(8),
37 ITEM_KEY VARCHAR2(240),
38 ACTID NUMBER,
39 HISTORY_COUNT NUMBER );
40
41 TYPE ActivityHistoryCountTAB IS TABLE OF ActivityHistoryREC INDEX BY BINARY_INTEGER;
42 g_ActivityHistoryCount ActivityHistoryCountTAB;
43
44 -- ====================================================================
45 --
46 -- Private Routine to check for shutdown
47 --
48 -- ====================================================================
49
50 function check_instance return boolean
51 as
52 shutdown varchar2(3);
53 begin
54
55 select shutdown_pending into shutdown from v$instance;
56
57 if shutdown = 'YES' then
58 return(TRUE);
59 else
60 return(FALSE);
61 end if;
62
63 end;
64
65 -- ====================================================================
66 --
67 -- Private Procedure which processes the payload dequeued off the
68 -- Inbound queue .
69 --
70 -- ====================================================================
71
72 -- Process_Inbound_Event (PRIVATE)
73 -- Executes the payload dequeued off the inbound queue
74 -- IN
75 -- itemtype - itemtype,itemkey,actid to uniquely identify the
76 -- itemkey - activity
77 -- actid -
78 -- message_handle - pointer to queue message
79 -- p_payload - the message payload . Lets have it as in/out parameter
80 -- so that if callback (for which it is in/out) changes something
81 -- we can have it.
82
83 procedure Process_Inbound_Event(itemtype in varchar2,
84 itemkey in varchar2,
85 actid in number,
86 message_handle in raw,
87 p_payload in out nocopy system.wf_payload_t)
88 as
89 colon number;
90 status varchar2(30);
91
92 plist varchar2(4000);
93 attrpair varchar2(4000);
94 delimiter number;
95 aname varchar2(40);
96 avalue varchar2(4000);
97 lcorrelation varchar2(80);
98
99 nvalue number; --required but not used by wf_engine.CB
100 dvalue date; --required but not used by wf_engine.CB
101
102 begin
103
104 --process the parameter list.
105 plist:= p_payload.param_list;
106
107 if plist is not null then
108 loop
109 -- if plist is null then EXIT; end if;
110 delimiter:=instr(plist,'^');
111
112 if delimiter = 0 then
113 attrpair:=plist;
114 else
115 attrpair:=substr(plist,1,delimiter-1);
116 end if;
117
118 aname := upper(substr(attrpair,1,instr(attrpair,'=')-1));
119 avalue := substr(attrpair,instr(attrpair,'=')+1);
120
121 begin
122 --Set the value for the attribute
123 wf_engine.SetItemAttrText(itemtype, itemkey,
124 aname, avalue);
125 exception when others then
126 if ( wf_core.error_name = 'WFENG_ITEM_ATTR' ) then
127 --If the attribute does not exist first create it
128 --and then add the value
129 Wf_Engine.AddItemAttr(itemtype, itemkey, aname);
130 Wf_Engine.SetItemAttrText(itemtype, itemkey, aname, avalue);
131 else
132 raise;
133 end if;
134 end;
135
136 exit when delimiter = 0;
137
138 plist := substr(plist,delimiter+1);
139
140 end loop;
141 end if;
142
143 --if payload contains a colon, then its ERROR else its COMPLETE status
144
145 colon:= instr(p_payload.result,':');
146 if colon=0 or p_payload.result is null then
147 -- check if activity is already complete
148 wf_item_activity_status.status(itemtype,itemkey,actid,status);
149 if (status is not null)
150 and (status <> 'COMPLETE') then
151 -- mark activity as Complete:<result>
152 wf_engine.CB(command => 'COMPLETE',
153 context =>itemtype||':'||
154 itemkey ||':'||
155 actid,
156 text_value => p_payload.result,
157 number_value => nvalue,
158 date_value => dvalue);
159 end if;
160 else
161 -- at the moment we only accept :ERROR:<error text> (may add other statuses later)
162 if substr(p_payload.result,colon+1,5) = 'ERROR' then
163
164 begin
165 wf_core.clear;
166 -- set the function name for courtesy.
167 wf_core.token('FUNCTION_NAME',
168 Wf_Activity.activity_function(itemtype,
169 itemkey,actid));
170 wf_core.raise('WF_EXT_FUNCTION');
171 exception when others then null;
172 end;
173 --function name on payload is upto 200 char so use it to record error
174 wf_core.error_stack := p_payload.function_name;
175
176 wf_engine.CB(command => 'ERROR',
177 context =>itemtype||':'||
178 itemkey ||':'||
179 actid,
180 text_value => p_payload.result,
181 number_value => nvalue,
182 date_value => dvalue);
183 end if;
184 end if;
185
186 --If we came successfully till here let us purge off the
187 --data from the Q
188 wf_queue.PurgeEvent(wf_queue.InboundQueue, message_handle, FALSE);
189
190 exception
191 when others then
192 Wf_Core.Context('Wf_Queue', 'Process_Inbound_Event', itemtype,itemkey);
193
194 raise;
195
196 end Process_Inbound_Event;
197
198 -- ====================================================================
199 -- Queue Setup Functions (PUBLIC)
200 -- ====================================================================
201 function DeferredQueue return varchar2
202 as
203 begin
204 if (wf_queue.name_init = FALSE) then
205 wf_queue.set_queue_names;
206 end if;
207 return (wf_queue.deferred_queue_name);
208 exception
209 when others then
210 Wf_Core.Context('Wf_Queue', 'DeferredQueue');
211 raise;
212 end;
213
214 function OutboundQueue return varchar2
215 as
216 begin
217 if (wf_queue.name_init = FALSE) then
218 wf_queue.set_queue_names;
219 end if;
220 return (wf_queue.outbound_queue_name);
221 exception
222 when others then
223 Wf_Core.Context('Wf_Queue', 'OutboundQueue');
224 raise;
225 end;
226
227 function InboundQueue return varchar2
228 as
229 begin
230 if (wf_queue.name_init = FALSE) then
231 wf_queue.set_queue_names;
232 end if;
233 return (wf_queue.inbound_queue_name);
234 exception
235 when others then
236 Wf_Core.Context('Wf_Queue', 'InboundQueue');
237 raise;
238 end;
239
240 -- NAME: Set_queue_names (PRIVATE)
241 -- called once at the beginning of a session to set up queue names
242 -- when AQ supports db synonyms, remove this and use synonyms instead
243 procedure set_queue_names as
244
245 schema_name varchar2(320);
246
247 begin
248
249 --dont bother re-executing this if already initialized.
250 if wf_queue.name_init then
251 return;
252 end if;
253
254 schema_name := wf_core.translate('WF_SCHEMA');
255
256 -- Do not determine account name by STANDALONE vs. EMBEDDED any more
257
258 -- Current_schema is the schema in effect.
259 -- Sys_context is an 8i feature. Below allows us to tag on the
260 -- intended schema whether the install is with invoker's right or
261 -- definer's right (default).
262 begin
263 select sys_context('USERENV', 'CURRENT_SCHEMA')
264 into wf_queue.account_name
265 from sys.dual;
266 exception
267 when OTHERS then
268 wf_queue.account_name := NULL;
269 end;
270
271 wf_queue.deferred_queue_name := schema_name||'.WF_DEFERRED_QUEUE_M';
272 wf_queue.outbound_queue_name := schema_name||'.WF_OUTBOUND_QUEUE';
273 wf_queue.inbound_queue_name := schema_name||'.WF_INBOUND_QUEUE';
274 wf_queue.name_init := TRUE;
275 exception
276 when others then
277 Wf_Core.Context('Wf_Queue', 'Set_queue_names');
278 raise;
279 end set_queue_names;
280
281
282 -- ====================================================================
283 -- Public routines
284 -- ====================================================================
285
286 -- NAME: PurgeEvent
287 -- removes the event from the specified queue WITHOUT PROCESSING
288 -- queuename - the queue to purge
289 -- message_handle - the specific event to purge
290 --
291 procedure PurgeEvent(queuename in varchar2,
292 message_handle in raw,
293 multiconsumer in boolean default FALSE) as
294
295 event system.wf_payload_t;
296 dequeue_options dbms_aq.dequeue_options_t;
297 message_properties dbms_aq.message_properties_t;
298 msg_id raw(16);
299 begin
300 if message_handle is not null then
301
302 dequeue_options.dequeue_mode := dbms_aq.REMOVE;
303 dequeue_options.msgid := message_handle;
304 dequeue_options.wait := dbms_aq.NO_WAIT;
305 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
306
307 -- check if we need to have a consumer
308 if (multiconsumer) then
309 dequeue_options.consumer_name := wf_queue.account_name;
310 end if;
311
312 dbms_aq.dequeue
313 (
314 queue_name => queuename,
315 dequeue_options => dequeue_options,
316 message_properties => message_properties,
317 payload => event,
318 msgid => msg_id
319 );
320
321 end if;
322
323 exception
324 when dequeue_timeout then
325 null; -- not found on queue so must already be removed.
326
327 when msgid_notfound then
328 null; -- Already purged from the queue.
329
330 when others then
331 Wf_Core.Context('Wf_Queue', 'PurgeEvent', queuename,
332 rawtohex(message_handle));
333 raise;
334
335 end PurgeEvent;
336
337 -- NAME: PurgeItemtype
338 -- removes all events belonging to an itemtype from the specified queue
339 -- ** WARNING ** IT DOES NOT PROCESS THE EVENT
340 -- queuename - the queue to purge
341 -- itemtype - the itemtype to purge
342 --
343 procedure PurgeItemtype(queuename in varchar2,
344 itemtype in varchar2 default null,
345 correlation in varchar2 default null )
346 as
347 event system.wf_payload_t;
348 dequeue_options dbms_aq.dequeue_options_t;
349 message_properties dbms_aq.message_properties_t;
350 msg_id raw(16);
351
352 begin
353 dequeue_options.dequeue_mode := dbms_aq.REMOVE;
354 dequeue_options.wait := dbms_aq.NO_WAIT;
355 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
356 wf_queue.set_queue_names;
357
358 if correlation is not null then
359 dequeue_options.correlation := correlation;
360 else
361 dequeue_options.correlation := wf_queue.account_name||nvl(itemtype,'%');
362 end if;
363
364 LOOP
365 dbms_aq.dequeue
366 (
367 queue_name => queuename,
368 dequeue_options => dequeue_options,
369 message_properties => message_properties,
370 payload => event,
371 msgid => msg_id
372 );
373
374 END LOOP;
375
376 exception
377 when dequeue_timeout then
378 null; -- nothing left on queue to remove
379 when others then
380 Wf_Core.Context('Wf_Queue', 'PurgeItemtype', queuename, itemtype);
381 raise;
382 end PurgeItemtype;
383
384 -- ProcessDeferredEvent (PRIVATE)
385 -- Executes the event payload dequeued off the deferred queue
386 -- IN
387 -- itemtype - itemtype,itemkey,actid to uniquely identify the
388 -- itemkey - activity
389 -- actid -
390 -- message_handle - pointer to queue message
391 -- minthreshold - threshold levels of the background engine
392 -- maxthreshold
393 --
394 procedure ProcessDeferredEvent(itemtype in varchar2,
395 itemkey in varchar2,
396 actid in number,
397 message_handle in raw,
398 minthreshold in number,
399 maxthreshold in number)
400 as
401 begin
402 Wf_Item_Activity_Status.Create_Status(itemtype, itemkey, actid,
403 wf_engine.eng_active, null, null, null);
404
405 -- Continue processing on activity
406 begin
407
408 begin
409 begin
410
411 savepoint wf_savepoint;
412
413 Wf_Engine_Util.Process_Activity(itemtype, itemkey, actid,
414 maxthreshold, TRUE);
415
416 -- we successfully processed the activity so dequeue it.
417 wf_queue.PurgeEvent(wf_queue.DeferredQueue, message_handle, TRUE);
418
419
420 Exception
421 when others then
422 -- In the unlikely event this process thread raises an exception:
423 -- 1. rollback any work in this process thread
424 -- raise an error for the next excption handler to complete
425 -- remaining steps.
426
427 rollback to wf_savepoint;
428 raise;
429 end;
430 exception
431 when NO_SAVEPOINT then
432 -- Catch any savepoint error in case of a commit happened.
433 Wf_Core.Token('ACTIVITY', Wf_Engine.GetActivityLabel(actid));
434 Wf_Core.Raise('WFENG_COMMIT_IN_PROCESS');
435 end;
436 exception
437 when OTHERS then
438 -- Remaining steps for proces thread raises an exception:
439 -- 2. set this activity to error status
440 -- 3. execute the error process (if any)
441 -- 4. clear the error to continue with next activity
442 -- **note the error stack will refer to the actid that has been
443 -- rolled back!
444 Wf_Core.Context('Wf_Queue', 'ProcessDeferredEvent', itemtype,
445 to_char(minthreshold), to_char(maxthreshold));
446 Wf_Item_Activity_Status.Set_Error(itemtype, itemkey, actid,
447 wf_engine.eng_exception, FALSE);
448 Wf_Engine_Util.Execute_Error_Process(itemtype, itemkey,
449 actid, wf_engine.eng_exception);
450 Wf_Core.Clear;
451 end;
452
453 -- Commit work to insure this activity thread doesn't interfere
454 -- with others.
455 commit;
456
457 Fnd_Concurrent.Set_Preferred_RBS;
458
459 exception
460 when others then
461 Wf_Core.Context('Wf_Queue', 'ProcessDeferredEvent', itemtype,
462 to_char(minthreshold), to_char(maxthreshold));
463 raise;
464 end ProcessDeferredEvent;
465
466
467 --Name: EnqueueInbound (PUBLIC)
468 --Enqueues the result from an outbound event onto
469 --the inbound queue. Wf will mark this as complete with the
470 --given result when it processes the queue.
471
472 procedure EnqueueInbound(
473 itemtype in varchar2,
474 itemkey in varchar2,
475 actid in number,
476 result in varchar2 default null,
477 attrlist in varchar2 default null,
478 correlation in varchar2 default null,
479 error_stack in varchar2 default null)
480
481 as
482 handle raw(16);
483 lcorrelation varchar2(80);
484 lresult varchar2(30);
485 begin
486
487
488 if correlation is not null then
489 lcorrelation := correlation;
490 else
491 wf_queue.set_queue_names;
492 lcorrelation := wf_queue.account_name||itemtype;
493 end if;
494
495 -- if error stack is defined then set result to ERROR.
496 if error_stack is null then
497 lresult := result;
498 else
499 lresult := ':ERROR';
500 end if;
501
502
503 wf_queue.Enqueue_Event(queuename =>wf_queue.InboundQueue,
504 itemtype =>enqueueInbound.itemtype,
505 itemkey =>enqueueInbound.itemkey,
506 actid =>enqueueInbound.actid,
507 funcname =>enqueueInbound.error_stack,
508 correlation =>lcorrelation,
509 paramlist =>enqueueInbound.attrlist,
510 result =>lresult,
511 message_handle =>handle);
512 exception
513 when others then
514 Wf_Core.Context('Wf_Queue', 'EnqueueInbound', itemtype,
515 itemkey, actid);
516 raise;
517 end EnqueueInbound;
518
519
520 function Get_param_list (itemtype in varchar2,
521 itemkey in varchar2,
522 actid in number) return varchar2
523
524 as
525
526 startdate date;
527 paramlist varchar2(4000);
528 lvalue varchar2(4000);
529
530 cursor attr_list is
531 select aa.name,
532 aa.value_type, -- CONSTANT or ITEMATTR
533 aa.type, -- NUMBER/TEXT/DATE etc
534 aa.format,
535 av.TEXT_VALUE,
536 av.NUMBER_VALUE,
537 av.DATE_VALUE
538 from wf_activity_attr_values av,
539 wf_activity_attributes aa,
540 wf_activities a,
541 wf_process_activities pa
542 where pa.activity_item_type = a.item_type
543 and pa.activity_name = a.name
544 and pa.instance_id=actid
545 and a.begin_date< startdate and nvl(a.end_date,startdate) >= startdate
546 and a.item_type = aa.activity_item_type
547 and a.name = aa.activity_name
548 and a.version = aa.activity_version
549 and av.process_activity_id = actid
550 and av.name=aa.name
551 order by aa.sequence;
552
553 begin
554 paramlist:=null;
555 startdate:=wf_item.active_date(itemtype,itemkey);
556
557 for attr_row in attr_list loop
558 if (attr_row.value_type = 'ITEMATTR' and
559 attr_row.text_value is not null) then
560 -- null itemattr text_value means null value, not an error
561 lvalue := wf_engine.GetItemAttrText(itemtype,itemkey,
562 attr_row.text_value);
563 else --must be CONSTANT
564 if (attr_row.type = 'NUMBER') then
565 if (attr_row.format is null) then
566 lvalue := to_char(attr_row.NUMBER_VALUE);
567 else
568 lvalue := to_char(attr_row.NUMBER_VALUE, attr_row.format);
569 end if;
570 elsif (attr_row.type = 'DATE') then
571 if (attr_row.format is null) then
572 lvalue := to_char(attr_row.DATE_VALUE);
573 else
574 lvalue := to_char(attr_row.DATE_VALUE, attr_row.format);
575 end if;
576 else
577 lvalue := attr_row.text_value;
578 end if;
579 end if;
580
581 if paramlist is not null then
582 -- Overflow, cannot hold anymore attributes.
583 if (lengthb(paramlist||'^') > 4000) then
584 exit;
585 else
586 paramlist := paramlist||'^';
587 end if;
588 end if;
589
590 if (lengthb(paramlist||attr_row.name||'='||lvalue) > 4000) then
591 -- Overflow, cannot hold anymore attributes.
592 paramlist:=substrb(paramlist||attr_row.name||'='||lvalue, 1, 4000);
593 exit;
594 else
595 paramlist := paramlist||attr_row.name||'='||lvalue;
596 end if;
597 end loop;
598
599 return(paramlist);
600
601 exception
602 when others then
603 Wf_Core.Context('Wf_Queue', 'Get_param_list', itemtype,
604 itemkey, actid);
605 raise;
606 end Get_param_list;
607
608
609
610 --Name: DequeueOutbound (PUBLIC)
611
612 procedure DequeueOutbound(
613 dequeuemode in number,
614 navigation in number default 1,
615 correlation in varchar2 default null,
616 itemtype in varchar2 default null,
617 payload out NOCOPY system.wf_payload_t,
618 message_handle in out NOCOPY raw,
619 timeout out NOCOPY boolean)
620
621 as
622 lcorrelation varchar2(80);
623 begin
624 wf_queue.set_queue_names;
625
626 if correlation is not null then
627 lcorrelation := correlation;
628 else
629 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
630 end if;
631
632 wf_queue.Dequeue_Event(queuename =>wf_queue.OutboundQueue,
633 dequeuemode =>DequeueOutbound.dequeuemode,
634 navigation =>DequeueOutbound.navigation,
635 correlation =>lcorrelation,
636 payload =>DequeueOutbound.payload,
637 message_handle =>DequeueOutbound.message_handle,
638 timeout =>DequeueOutbound.timeout);
639
640
641
642 exception
643 when others then
644 Wf_Core.Context('Wf_Queue', 'DequeueOutbound', payload.itemtype,
645 payload.itemkey, payload.actid);
646 raise;
647
648 end DequeueOutbound;
649
650 --Name: DequeueEventDetail (PUBLIC)
651 --
652 --Wrapper to Dequeue_Event in which the payload is EXPanded out to avoid
653 --use of type itemtypes.
654
655 procedure DequeueEventDetail(
656 dequeuemode in number,
657 navigation in number default 1,
658 correlation in varchar2 default null,
659 itemtype in out NOCOPY varchar2,
660 itemkey out NOCOPY varchar2,
661 actid out NOCOPY number,
662 function_name out NOCOPY varchar2,
663 param_list out NOCOPY varchar2,
664 message_handle in out NOCOPY raw,
665 timeout out NOCOPY boolean)
666 as
667 event system.wf_payload_t;
668 lcorrelation varchar2(80);
669 begin
670 wf_queue.set_queue_names;
671
672 --use the correlation or default it if null
673 if DequeueEventDetail.correlation is not null then
674 lcorrelation := DequeueEventDetail.correlation;
675 else
676 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
677 end if;
678
679 -- call dequeue to retrieve the event
680 wf_queue.Dequeue_Event(queuename=>wf_queue.OutboundQueue,
681 dequeuemode=>DequeueEventDetail.dequeuemode,
682 navigation =>DequeueEventDetail.navigation,
683 correlation=>lcorrelation,
684 payload=>event,
685 message_handle=>DequeueEventDetail.message_handle,
686 timeout =>DequeueEventDetail.timeout);
687
688 --expand the payload structure
689 DequeueEventDetail.itemtype:=event.itemtype;
690 DequeueEventDetail.itemkey:=event.itemkey;
691 DequeueEventDetail.actid:=event.actid;
692 DequeueEventDetail.function_name:=event.function_name;
693 DequeueEventDetail.param_list:=event.param_list;
694
695
696 exception
697 when others then
698 Wf_Core.Context('Wf_Queue', 'DequeueEventDetail', itemtype||':'||itemkey,to_char(actid));
699 raise;
700
701 end DequeueEventDetail;
702
703
704 --Dequeue_Event (PRIVATE)
705 --
706 --Dequeues an event (message) from any queue
707 --IN
708 -- QueueName - the queue name, may contain owner or database
709 -- DeQueueMode - either 1 (Browse), 2 (Locked) or 3 (Remove)
710 -- Navigation - either First or Next
711 -- Correlation - helps restrict the queue
712 -- Payload - the event actually dequeued
713 -- message_handle - id for the event
714 -- timeout - determines if anything was found or if the q timedout.
715
716 procedure Dequeue_Event(queuename in varchar2,
717 dequeuemode in number,
718 navigation in number default 1,
719 correlation in varchar2 default null,
720 payload out NOCOPY system.wf_payload_t,
721 message_handle in out NOCOPY raw,
722 timeout out NOCOPY boolean,
723 multiconsumer in boolean default FALSE)
724 as
725
726 dequeue_options dbms_aq.dequeue_options_t;
727 message_properties dbms_aq.message_properties_t;
728 snap_too_old exception;
729 pragma exception_init(snap_too_old, -1555);
730 begin
731 -- find out the schema name
732 wf_queue.set_queue_names;
733
734 dequeue_options.dequeue_mode := dequeuemode;
735 dequeue_options.wait := dbms_aq.NO_WAIT;
736 dequeue_options.navigation := navigation;
737
738 -- if message_handle is set then use it instead of correlation
739 -- NOTE: if message_handle is set FIRST/NEXT_MESSAGE dont have effect
740
741 if message_handle is not null then
742 dequeue_options.msgid := message_handle;
743 dequeue_options.correlation := null;
744 message_handle := null;
745 else
746 -- set correlation to item_type or % if its null
747 if correlation is null then
748 dequeue_options.correlation := '%';
749 else
750 dequeue_options.correlation := correlation;
751 end if;
752
753 end if;
754 -- check if we need to have a consumer
755 if (multiconsumer) then
756 dequeue_options.consumer_name := wf_queue.account_name;
757 end if;
758
759 begin
760 dbms_aq.dequeue( queue_name => Dequeue_Event.queuename,
761 dequeue_options => dequeue_options,
762 message_properties => message_properties,
763 payload => Dequeue_Event.payload,
764 msgid => message_handle );
765
766 exception
767 when snap_too_old then
768 --Workaround for AQ when receiving ORA-01555 using NEXT_MESSAGE as
769 --navigation. We will try to set to FIRST_MESSAGE and dequeue to
770 --silently handle this exception.
771 if (dequeue_options.navigation = dbms_aq.FIRST_MESSAGE) then
772 raise;
773 else
774 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
775 dbms_aq.dequeue( queue_name => Dequeue_Event.queuename,
776 dequeue_options => dequeue_options,
777 message_properties => message_properties,
778 payload => Dequeue_Event.payload,
779 msgid => message_handle );
780 end if;
781
782 when OTHERS then
783 raise;
784
785 end;
786
787 timeout:= FALSE;
788
789 exception
790 when dequeue_timeout then
791 timeout := TRUE;
792 when others then
793 if correlation is null then
794 Wf_Core.Context('WF_QUEUE', 'Dequeue_Event', queuename, '%');
795 else
796 Wf_Core.Context('WF_QUEUE', 'Dequeue_Event', queuename, correlation);
797 end if;
798 timeout := FALSE;
799 raise;
800
801 end Dequeue_Event;
802
803 -- Activity_Valid (PRIVATE)
804 -- checks the deferred activity is valid for processing
805 --
806 -- IN
807 -- event - the event to check
808 -- message_handle of event in the deferred queue
809 -- maxthreshold - the threshold level
810 -- minthreshold
811 --
812 function activity_valid (event in system.wf_payload_t,
813 message_handle in raw,
814 maxthreshold in number default null,
815 minthreshold in number default null)
816 return BOOLEAN is
817 cost pls_integer;
818 litemtype varchar2(8);
819 l_begdate date; -- <dlam:3070112>
820
821 resource_busy exception;
822 pragma exception_init(resource_busy, -00054);
823
824 begin
825
826
827 -- Activity must be valid if
828 -- 1) in given cost range
829 -- 2) parent is not suspended
830 -- note: suspendprocess/resumeprocess will remove/add deferred jobs
831
832
833 -- <dlam:3070112> check begin date as well
834 -- move the BEGIN_DATE, SYSDATE comparion to a separate clause
835 SELECT CWA.COST, CWIAS.BEGIN_DATE
836 into cost, l_begdate
837 FROM WF_ITEM_ACTIVITY_STATUSES CWIAS,
838 WF_PROCESS_ACTIVITIES CWPA,
839 WF_ITEMS WI,
840 WF_ACTIVITIES CWA
841 where CWIAS.ACTIVITY_STATUS = 'DEFERRED'
842 and CWIAS.PROCESS_ACTIVITY = CWPA.INSTANCE_ID
843 and CWPA.ACTIVITY_ITEM_TYPE = CWA.ITEM_TYPE
844 and CWPA.ACTIVITY_NAME = CWA.NAME
845 and CWIAS.ITEM_TYPE = WI.ITEM_TYPE
846 and CWIAS.ITEM_KEY = WI.ITEM_KEY
847 and WI.BEGIN_DATE >= CWA.BEGIN_DATE
848 and WI.BEGIN_DATE < nvl(CWA.END_DATE, WI.BEGIN_DATE+1)
849 and CWIAS.ITEM_TYPE = event.itemtype
850 and CWIAS.ITEM_KEY = event.itemkey
851 and CWIAS.PROCESS_ACTIVITY = event.actid;
852
853 -- dont bother locking: the original msg has been locked on the queue
854 -- for update of CWIAS.ACTIVITY_STATUS NOWAIT;
855
856 -- dont bother checking if parent is suspended.
857 -- the suspend process should remove any jobs from the queue,
858 -- but if any get through, process_activity will manage it.
859
860 -- <dlam:3070112>
861 -- begin date has not reached yet, leave the message alone.
862 -- this is to work around a problem where the aq delay seems to
863 -- to be shorter than expected
864 if (l_begdate > sysdate) then
865 return(FALSE);
866 end if;
867
868 if cost < nvl(minthreshold,cost) or cost > nvl(maxthreshold,cost) then
869 return(FALSE);
870 else
871 return(TRUE);
872 end if;
873
874 exception
875 when no_data_found then
876 -- this event is no longer valid so remove it from the queue
877 -- happens when a rewind moved activity to history table
878 -- or the activity status is no longer defered
879 wf_queue.PurgeEvent(wf_queue.DeferredQueue,message_handle,TRUE);
880 return(FALSE);
881 when resource_busy then
882 return(FALSE);
883 when others then
884 Wf_Core.Context('Wf_Queue', 'Activity_valid', 'Invalid',
885 event.itemtype||':'||event.itemkey, to_char(event.actid));
886 return(FALSE);
887 end activity_valid;
888
889 --
890 -- ====================================================================
891 --
892 -- Enqueue_Event (PRIVATE)
893 -- Enqueues a message onto any WF queue (because all queues have same payload)
894 --
895
896 procedure Enqueue_Event(queuename in varchar2,
897 itemtype in varchar2,
898 itemkey in varchar2,
899 actid in number,
900 correlation in varchar2 default null,
901 delay in number default 0,
902 funcname in varchar2 default null,
903 paramlist in varchar2 default null,
904 result in varchar2 default null,
905 message_handle in out NOCOPY raw,
906 priority in number default null)
907
908 as
909 event system.wf_payload_t;
910 enqueue_options dbms_aq.enqueue_options_t;
911 message_properties dbms_aq.message_properties_t;
912 l_increment_delay number;
913 l_min_delay number;
914 l_background_occurrence number;
915 l_inst_id WF_ITEMS.INST_ID%TYPE;
916 l_process WF_ITEMS.ROOT_ACTIVITY%TYPE;
917
918 begin
919
920 l_increment_delay := delay;
921
922 -- Bug 4005674
923 -- Check the occurrence of item_type:item_key:actid. If this is the same
924 -- activity which we just dequeued, calculate the number of occurrence
925 -- from history table since the background engine started.
926 if (wf_queue.g_dequeue_item_type = enqueue_event.itemtype and
927 wf_queue.g_dequeue_item_key = enqueue_event.itemkey and
928 wf_queue.g_dequeue_actid = enqueue_event.actid) then
929
930 g_Key := WF_CACHE.HashKey(enqueue_event.itemtype||':'||
931 enqueue_event.itemkey||':'||enqueue_event.actid);
932
933 -- If hashkey does not exist or the itemtype:itemkey:actid do not match,
934 -- get the history count from base table, else increment the l_background_occurrence
935 if (not g_ActivityHistoryCount.EXISTS(g_Key) or
936 (g_ActivityHistoryCount(g_Key).ITEM_TYPE <> enqueue_event.itemtype) or
937 (g_ActivityHistoryCount(g_Key).ITEM_KEY <> enqueue_event.itemkey) or
938 (g_ActivityHistoryCount(g_Key).ACTID <> enqueue_event.actid)) then
939
940 select count(process_activity)
941 into l_background_occurrence
942 from wf_item_activity_statuses_h
943 where item_type = enqueue_event.itemtype
944 and item_key = enqueue_event.itemkey
945 and process_activity = enqueue_event.actid
946 and begin_date >= g_background_begin_date;
947 else
948 l_background_occurrence := g_ActivityHistoryCount(g_Key).HISTORY_COUNT + 1;
949 end if;
950
951 -- Record the itemtype:itemkey:actid:history_count in hash table
952 g_ActivityHistoryCount(g_Key).ITEM_TYPE := enqueue_event.itemtype;
953 g_ActivityHistoryCount(g_Key).ITEM_KEY := enqueue_event.itemkey;
954 g_ActivityHistoryCount(g_Key).ACTID := enqueue_event.actid;
955 g_ActivityHistoryCount(g_Key).HISTORY_COUNT := l_background_occurrence;
956
957 -- Bug 4005674
958 -- For every 100 occurrences, add 5 mins to the delay up to a max of 60 mins
959 l_min_delay := floor(l_background_occurrence/wf_queue.g_defer_occurrence)
960 * wf_queue.g_add_delay_seconds;
961
962 if (l_min_delay < wf_queue.g_max_delay_seconds) then
963 l_increment_delay := l_increment_delay + l_min_delay;
964 elsif (l_min_delay >= wf_queue.g_max_delay_seconds) then
965 l_increment_delay := l_increment_delay + wf_queue.g_max_delay_seconds;
966 end if;
967
968 begin
969 -- Add a run-time item attribute of #DELAY_ACTID_<actid> to track the
970 -- continuous loop
971 Wf_Engine.SetItemAttrNumber(itemtype=>enqueue_event.itemtype,
972 itemkey=>enqueue_event.itemkey,
973 aname=>'#DELAY_ACTID_'||enqueue_event.actid,
974 avalue=>l_increment_delay);
975 exception
976 when others then
977 if (wf_core.error_name = 'WFENG_ITEM_ATTR') then
978 Wf_Core.Clear;
979 Wf_Engine.AddItemAttr(itemtype=>enqueue_event.itemtype,
980 itemkey=>enqueue_event.itemkey,
981 aname=>'#DELAY_ACTID_'||enqueue_event.actid,
982 number_value=>l_increment_delay);
983 else
984 raise;
985 end if;
986 end;
987 end if;
988
989 wf_queue.set_queue_names;
990 -- construct the event object
991 event:=system.wf_payload_t(itemtype,itemkey,actid,funcname,paramlist,result);
992
993 -- dont make the data visible on the queue until a commit is issued
994 -- this way queue data and normal table data (wf statuses) are in synch.
995 enqueue_options.visibility := DBMS_AQ.ON_COMMIT;
996
997 -- Set the delay if any
998 if l_increment_delay < 0 then
999 message_properties.delay := 0;
1000 else
1001 -- message_properties.delay is BINARY_INTEGER, so check if delay is
1002 -- too big, and set the max delay to be (2**31)-1.
1003 if (l_increment_delay >= power(2,31)) then
1004 message_properties.delay := power(2,31)-1;
1005 else
1006 message_properties.delay := l_increment_delay;
1007 end if;
1008
1009 end if;
1010
1011 if correlation is not null then
1012 message_properties.correlation := enqueue_event.correlation;
1013 else
1014 message_properties.correlation := wf_queue.account_name||itemtype;
1015 l_inst_id := sys_context('WF_RAC_CTX','INST_ID');
1016 select ROOT_ACTIVITY into l_process
1017 from WF_ITEMS
1018 where ITEM_TYPE=itemtype
1019 and ITEM_KEY=itemkey;
1020 if l_inst_id is not null and WF_ENGINE_RAC.Process_Is_RAC_Enabled(itemtype, l_process) then
1021 message_properties.correlation := message_properties.correlation||':'||l_inst_id;
1022 end if;
1023 end if;
1024 -- check the correlation is always set to something
1025 -- else it wil never be dequeued because we always default the dequeue
1026 -- corellation to '%'
1027 if message_properties.correlation is null then
1028 -- this shouldnt happen.
1029 message_properties.correlation := '%';
1030 end if;
1031
1032 -- Set the priority so that we can dequeue by priority
1033 if priority is not null then
1034 message_properties.priority := priority;
1035 end if;
1036
1037 dbms_aq.enqueue
1038 (
1039 queue_name => Enqueue_Event.queuename,
1040 enqueue_options => enqueue_options,
1041 message_properties => message_properties,
1042 payload => event,
1043 msgid => message_handle
1044 );
1045
1046
1047 exception
1048 when others then
1049 Wf_Core.Context('Wf_Queue', 'Enqueue_event', itemtype,
1050 itemkey, to_char(actid), to_char(delay));
1051 raise;
1052
1053 end;
1054
1055
1056 -- ProcessInboundQueue (PUBLIC)
1057 -- reads everythig off the Inbound queue and records it as complete
1058 -- with the given result and updates item attributes as specified in
1059 -- the paramlist
1060
1061
1062 procedure ProcessInboundQueue (itemtype in varchar2 default null,
1063 correlation in varchar2 default null)
1064 as
1065
1066 payload system.wf_payload_t;
1067 navigation varchar2(10);
1068 timeout boolean:= FALSE;
1069 cursor_name number;
1070 row_processed integer;
1071 message_handle raw(16);
1072 -- first_time boolean := TRUE;
1073 plist varchar2(4000);
1074 lcorrelation varchar2(80);
1075 nothing_processed boolean := TRUE;
1076
1077 begin
1078 commit;
1079
1080 Fnd_Concurrent.Set_Preferred_RBS;
1081
1082 wf_queue.set_queue_names;
1083
1084 if correlation is not null then
1085 lcorrelation := correlation;
1086 else
1087 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
1088 end if;
1089
1090 -- loop through the inbound queue.
1091 loop --Process until nothing left on the queue
1092
1093 navigation := dbms_aq.FIRST_MESSAGE;
1094 nothing_processed :=TRUE;
1095
1096 loop -- Process till timeout
1097
1098 message_handle:=null;
1099 --Lets set a savepoint here
1100 --We would use this savepoint to rollback if we found that a
1101 --lock is not possible in this session for the reterived itemytype key
1102
1103 wf_queue.Dequeue_Event(wf_queue.InboundQueue,
1104 dbms_aq.LOCKED,
1105 navigation,
1106 lcorrelation,
1107 payload,
1108 message_handle,
1109 timeout);
1110
1111 -- if no message is found, the message may be enqueued with the
1112 -- old correlation format, so reset the correlation id and retry.
1113 if (navigation = dbms_aq.FIRST_MESSAGE and message_handle is null
1114 and correlation is null and lcorrelation <> nvl(itemtype,'%')) then
1115
1116 lcorrelation := nvl(itemtype,'%');
1117 goto nextmesg;
1118 end if;
1119
1120 --else check timeout
1121 if (timeout) then
1122 EXIT;
1123 end if;
1124
1125 --Bug 2607770
1126 --Ensure that we have got a message
1127 --Now try to acquire the lock
1128 --Check the parameterlist null/not within Process_Inbound_Event
1129
1130 if wf_item.acquire_lock(payload.itemtype, payload.itemkey) then
1131 --Process the payload
1132 wf_queue.Process_Inbound_Event(itemtype=>payload.itemtype,
1133 itemkey=>payload.itemkey,
1134 actid=>payload.actid,
1135 message_handle=>ProcessInboundQueue.message_handle,
1136 p_payload => payload);
1137
1138 -- bug 7828862 - Resynch apps context from cached values if it changed
1139 wfa_sec.Restore_Ctx();
1140
1141 nothing_processed:=FALSE;
1142
1143 end if;
1144
1145 -- commit any processing or any clean up
1146 commit;
1147 Fnd_Concurrent.Set_Preferred_RBS;
1148
1149 navigation := dbms_aq.NEXT_MESSAGE;
1150
1151 <<nextmesg>> -- This is for the case when we reset the corrid and verify
1152 null;
1153 end loop; -- process till timeout
1154
1155 exit when nothing_processed;
1156 end loop;
1157 exception
1158 when others then
1159 Wf_Core.Context('Wf_Queue', 'ProcessInboundQueue');
1160 raise;
1161 end ProcessInboundQueue;
1162
1163 procedure ProcessDeferredQueue (itemtype in varchar2 default null,
1164 minthreshold in number default null,
1165 maxthreshold in number default null,
1166 correlation in varchar2 default null)
1167
1168 as
1169 payload system.wf_payload_t;
1170 timeout boolean:= FALSE;
1171 navigation varchar2(10);
1172 row_processed integer;
1173 message_handle raw(16);
1174 -- first_time boolean := TRUE;
1175 nothing_processed boolean:=TRUE;
1176 lcorrelation varchar2(80);
1177 l_inst_id WF_ITEMS.INST_ID%TYPE;
1178
1179 begin
1180
1181 -- Bug 4005674
1182 -- Record the sysdate when background engine started.
1183 g_background_begin_date := sysdate;
1184
1185 wf_queue.set_queue_names;
1186
1187 if correlation is not null then
1188 lcorrelation := correlation;
1189
1190 -- for standalone, we first try the old correlation id format.
1191 elsif (wf_core.translate('WF_INSTALL') = 'STANDALONE'
1192 and itemtype is not null) then
1193 lcorrelation := itemtype;
1194
1195 -- for embedded, there was never the old format, so we are fine.
1196 -- or it is standalone with null item type, we cannot support the
1197 -- old correlation id format; otherwise, it will pick up everything.
1198 else
1199 l_inst_id := sys_context('WF_RAC_CTX','INST_ID');
1200 if l_inst_id is not null and WF_ENGINE_RAC.Item_Is_RAC_Enabled(itemtype) then
1201 -- This means the correlation of the message would like APPSOEOL:4
1202 lcorrelation := wf_queue.account_name||itemtype||':'||l_inst_id;
1203 else
1204 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
1205 end if;
1206 end if;
1207
1208 loop -- keep processing the queue until there is nothing left
1209
1210 navigation := dbms_aq.FIRST_MESSAGE;
1211 nothing_processed :=TRUE;
1212
1213 loop -- keep processing until a timeout.
1214
1215 message_handle:=null;
1216 wf_queue.Dequeue_Event(
1217 wf_queue.DeferredQueue,
1218 dbms_aq.LOCKED,
1219 navigation,
1220 lcorrelation,
1221 payload,
1222 message_handle,
1223 timeout,
1224 TRUE);
1225
1226 -- Bug 4005674
1227 -- Record the item_type:item_key:actid at dequeue time
1228 wf_queue.g_dequeue_item_type := payload.itemtype;
1229 wf_queue.g_dequeue_item_key := payload.itemkey;
1230 wf_queue.g_dequeue_actid := payload.actid;
1231
1232 -- if no message is found, the message may be enqueued with the
1233 -- new correlation format, so reset the correlation id and retry.
1234 if (navigation = dbms_aq.FIRST_MESSAGE and message_handle is null
1235 and correlation is null and lcorrelation = itemtype) then
1236
1237 lcorrelation := wf_queue.account_name||nvl(itemtype,'%');
1238
1239 -- otherwise, process the message
1240 else
1241 if (timeout) then
1242 EXIT;
1243 end if;
1244
1245 --
1246 -- Execute the PL/SQL call stored in the payload if this is valid
1247 --
1248 if activity_valid (payload,
1249 message_handle,
1250 maxthreshold,
1251 minthreshold )
1252 AND
1253 wf_item.acquire_lock(payload.itemtype,payload.itemkey) then
1254
1255 wf_queue.ProcessDeferredEvent(itemtype=>payload.itemtype,
1256 itemkey=>payload.itemkey,
1257 actid=>payload.actid,
1258 message_handle=>ProcessDeferredQueue.message_handle,
1259 minthreshold=>ProcessDeferredQueue.minthreshold,
1260 maxthreshold=>ProcessDeferredQueue.maxthreshold);
1261
1262 -- bug 7828862 - Resynch apps context from cached values if it changed
1263 wfa_sec.Restore_Ctx();
1264
1265 nothing_processed:=FALSE;
1266
1267 end if;
1268
1269 -- commit any processing or any clean up from activity_valid
1270 commit;
1271 Fnd_Concurrent.Set_Preferred_RBS;
1272
1273 --
1274 -- Test for Instance Shutdown
1275 --
1276 if wf_queue.check_instance then
1277 raise shutdown_pending;
1278 end if;
1279
1280 navigation := dbms_aq.NEXT_MESSAGE;
1281
1282 end if;
1283 end loop; -- process till time out
1284
1285 exit when nothing_processed;
1286
1287 end loop;
1288
1289 exception
1290 when dequeue_disabled then
1291 Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue', 'Queue shutdown');
1292 raise;
1293 when shutdown_pending then
1294 Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue', 'DB shutting down');
1295 raise;
1296 when others then
1297 Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue');
1298 raise;
1299 end ProcessDeferredQueue;
1300
1301
1302 --============================================================
1303 -- Support utilities. not sure if we want to release these
1304 --============================================================
1305 -- GetMessageHandle
1306 -- does a sequential search through the queue for the message handle
1307
1308 function GetMessageHandle(queuename in varchar2,
1309 itemtype in varchar2,
1310 itemkey in varchar2,
1311 actid in number,
1312 correlation in varchar2 default null,
1313 multiconsumer in boolean default FALSE) return raw
1314 is
1315 event system.wf_payload_t;
1316 dequeue_options dbms_aq.dequeue_options_t;
1317 message_properties dbms_aq.message_properties_t;
1318 msg_id raw(16);
1319 begin
1320 dequeue_options.dequeue_mode := dbms_aq.BROWSE;
1321 dequeue_options.wait := dbms_aq.NO_WAIT;
1322 wf_queue.set_queue_names;
1323 if correlation is not null then
1324 dequeue_options.correlation := correlation;
1325 else
1326 dequeue_options.correlation := wf_queue.account_name||nvl(itemtype,'%');
1327 end if;
1328
1329 if (multiconsumer) then
1330 dequeue_options.consumer_name := wf_queue.account_name;
1331 end if;
1332
1333 --execute first read
1334 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
1335 dbms_aq.dequeue
1336 (
1337 queue_name => queuename,
1338 dequeue_options => dequeue_options,
1339 message_properties => message_properties,
1340 payload => event,
1341 msgid => msg_id
1342 );
1343
1344 if event.itemtype = itemtype
1345 and event.itemkey = itemkey
1346 and event.actid = nvl(actid,event.actid) then
1347 return (msg_id);
1348 end if;
1349
1350 -- loop with next message
1351 LOOP
1352 dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
1353 dbms_aq.dequeue
1354 (
1355 queue_name => queuename,
1356 dequeue_options => dequeue_options,
1357 message_properties => message_properties,
1358 payload => event,
1359 msgid => msg_id
1360 );
1361
1362 if event.itemtype = itemtype
1363 and event.itemkey = itemkey
1364 and event.actid = actid then
1365 return (msg_id);
1366 end if;
1367
1368 END LOOP;
1369
1370 return(null);
1371
1372 exception -- timeout will fall to here
1373 when others then
1374 return(null);
1375 end GetMessageHandle;
1376 --=============================================================
1377 -- PUBLIC API to dequeue from exception queue to wf_error
1378 -- queue
1379 --=============================================================
1380 procedure DequeueException (queuename in varchar2)
1381 is
1382
1383 l_event wf_event_t;
1384 x_dequeue_options dbms_aq.dequeue_options_t;
1385 x_message_properties dbms_aq.message_properties_t;
1386 x_msgid RAW(16);
1387 erragt wf_agent_t;
1388 lsysname varchar2(30);
1389 cmd varchar2(1000);
1390 no_messages exception;
1391 pragma exception_init (no_messages, -25228);
1392
1393 begin
1394
1395 -- To Dequeue from Exception Queue, consumer name must be null
1396 x_dequeue_options.consumer_name := null;
1397 x_dequeue_options.wait := 1;
1398
1399 loop
1400 begin
1401 dbms_aq.dequeue(queue_name => queuename,
1402 dequeue_options => x_dequeue_options,
1403 message_properties => x_message_properties, /* OUT */
1404 payload => l_event, /* OUT */
1405 msgid => x_msgid); /* OUT */
1406
1407 /*
1408 ** Update the event to let everyone know it expired
1409 */
1410 l_event.SetErrorMessage(wf_core.translate('WFE_MESSAGE_EXPIRED'));
1411 l_event.addParameterToList('ERROR_NAME',
1412 wf_core.translate('WFE_MESSAGE_EXPIRED') );
1413 l_event.addParameterToList('ERROR_TYPE', 'ERROR');
1414
1415 /*
1416 ** As we can't use the private API SaveErrorToQueue
1417 ** we copy a little bit of code to do it
1418 */
1419 select name into lsysname
1420 from wf_systems
1421 where guid = hextoraw(wf_core.translate('WF_SYSTEM_GUID'));
1422
1423 erragt := wf_agent_t('WF_ERROR', lsysname);
1424 cmd := 'begin WF_ERROR_QH.enqueue(:v1, :v2); end;';
1425 execute immediate cmd using in l_event,
1426 in erragt;
1427
1428 commit;
1429
1430 exception
1431 when no_messages then
1432 if (wf_log_pkg.level_event >= fnd_log.g_current_runtime_level) then
1433 wf_log_pkg.string(wf_log_pkg.level_event,
1434 'wf.plsql.WF_QUEUE.DequeueException.queue_empty',
1435 'No more messages in ExceptionDequeue.');
1436 end if;
1437 exit;
1438 end;
1439 end loop;
1440 exception
1441 when others then
1442 Wf_Core.Context('Wf_Queue', 'DequeueException',queuename);
1443 raise;
1444 end DequeueException;
1445 --=============================================================
1446 -- Declare all developer APIs for Inbound queue manipulation
1447 --
1448 --=============================================================
1449
1450 --
1451 -- ClearMsgStack
1452 -- Clears runtime cache
1453 procedure ClearMsgStack
1454 is
1455 begin
1456 wf_queue.stck_itemtype(1) := '';
1457 wf_queue.stck_itemkey(1) := '';
1458 wf_queue.stck_actid(1) := 0;
1459 wf_queue.stck_ctr := 0;
1460 exception
1461 when others then
1462 Wf_Core.Context('Wf_Queue', 'ClearMsgStack');
1463 raise;
1464 end ClearMsgStack;
1465
1466
1467 --Name: WriteMsg
1468 --writes a message from stack to the queue
1469 procedure WriteMsg (
1470 itemtype in varchar2,
1471 itemkey in varchar2,
1472 actid in number)
1473 is
1474 i pls_integer;
1475 begin
1476
1477 i := wf_queue.SearchMsgStack(itemtype,itemkey,actid);
1478
1479 wf_queue.EnqueueInbound(
1480 itemtype=>wf_queue.stck_itemtype(i),
1481 itemkey =>wf_queue.stck_itemkey(i),
1482 actid =>wf_queue.stck_actid(i),
1483 result =>wf_queue.stck_result(i),
1484 attrlist=>wf_queue.stck_attrlist(i));
1485
1486
1487 exception
1488 when others then
1489 Wf_Core.Context('Wf_Queue', 'WriteMsg');
1490 raise;
1491
1492 end WriteMsg;
1493
1494 --Name: CreateMsg
1495 --creates a message on the stack
1496 --
1497 procedure CreateMsg (
1498 itemtype in varchar2,
1499 itemkey in varchar2,
1500 actid in number)
1501 is
1502 i pls_integer;
1503 begin
1504
1505 i := wf_queue.SearchMsgStack(itemtype,itemkey,actid);
1506
1507 exception
1508 when others then
1509 Wf_Core.Context('Wf_Queue', 'CreateMsg');
1510 raise;
1511
1512 end CreateMsg;
1513
1514
1515
1516 --Name: SetMsgAttr (PUBLIC)
1517 --Appends message attributes.
1518 --
1519 procedure SetMsgAttr(
1520 itemtype in varchar2,
1521 itemkey in varchar2,
1522 actid in number,
1523 attrName in varchar2,
1524 attrValue in varchar2)
1525 is
1526 i pls_integer;
1527 begin
1528 i := SearchMsgStack (itemtype, itemkey, actid);
1529
1530 if wf_queue.stck_attrlist(i) is null then
1531 wf_queue.stck_attrlist(i) := upper(attrName)||'='||AttrValue;
1532 else
1533 wf_queue.stck_attrlist(i) :=
1534 wf_queue.stck_attrlist(i) ||'^'||attrName||'='||AttrValue;
1535 end if;
1536
1537 exception
1538 when others then
1539 Wf_Core.Context('Wf_Queue', 'SetMsgAttr',
1540 itemtype, itemkey, actid, to_char(stck_ctr));
1541 raise;
1542 end SetMsgAttr;
1543
1544 --Name: SetMsgResult (PUBLIC)
1545 --Sets the result value for this message.
1546 --
1547 procedure SetMsgResult(
1548 itemtype in varchar2,
1549 itemkey in varchar2,
1550 actid in number,
1551 result in varchar2)
1552 is
1553 i pls_integer;
1554 begin
1555 i := SearchMsgStack (itemtype, itemkey, actid);
1556
1557 wf_queue.stck_result(i) :=result;
1558
1559 exception
1560 when others then
1561 Wf_Core.Context('Wf_Queue', 'AddResult',
1562 itemtype, itemkey, actid, to_char(stck_ctr));
1563 raise;
1564 end SetMsgResult;
1565
1566 --
1567 -- AddNewMsg (PRIVATE)
1568 -- Add a new message to the stack
1569 -- IN
1570 -- itemtype - item itemtype
1571 -- itemkey - item itemkey
1572 -- actid - instance id of process
1573 --
1574 procedure AddNewMsg(
1575 itemtype in varchar2,
1576 itemkey in varchar2,
1577 actid in number)
1578 is
1579 begin
1580
1581 -- Add the process to the stack
1582 wf_queue.stck_ctr := wf_queue.stck_ctr + 1;
1583 wf_queue.stck_itemtype(wf_queue.stck_ctr) := itemtype;
1584 wf_queue.stck_itemkey(wf_queue.stck_ctr) := itemkey;
1585 wf_queue.stck_actid(wf_queue.stck_ctr) := actid;
1586 wf_queue.stck_result(wf_queue.stck_ctr) := null;
1587 wf_queue.stck_AttrList(wf_queue.stck_ctr) := null;
1588
1589 exception
1590 when others then
1591 Wf_Core.Context('Wf_Queue', 'AddNewMsg',
1592 itemtype, itemkey, actid, to_char(stck_ctr));
1593 raise;
1594 end AddNewMsg;
1595
1596 --Name: SearchMsgStack (PRIVATE)
1597 --Desc: sequential search of the message stack
1598 -- starting from the top
1599 --
1600 function SearchMsgStack (
1601 itemtype in varchar2,
1602 itemkey in varchar2,
1603 actid in number) RETURN number
1604 is
1605
1606 i pls_integer;
1607
1608 begin
1609
1610 if ( nvl(wf_queue.stck_ctr, 0) > 0) then
1611 for i in reverse 1 .. wf_queue.stck_ctr loop
1612 if ((itemtype = wf_queue.stck_itemtype(i)) and
1613 (itemkey = wf_queue.stck_itemkey(i)) and
1614 (actid = wf_queue.stck_actid(i))) then
1615 -- Found a match.
1616 return(i);
1617 end if;
1618 end loop;
1619 end if;
1620
1621 -- not in the Stack so add it.
1622 AddNewMsg(itemtype,itemkey,actid);
1623 return (stck_ctr);
1624
1625 end SearchMsgStack;
1626
1627 --
1628 -- Generic_Queue_Display
1629 -- Produce list of generic_queues
1630 --
1631 -- MODIFICATION LOG:
1632 -- 06-JUN-2001 JWSMITH BUG 1819232 - added alt attrib for IMG tag for ADA
1633 -- - Added summary attr for table tags for ADA
1634 -- - Added ID attr for TD tags for ADA
1635 --
1636 procedure Generic_Queue_Display
1637 is
1638 username varchar2(320); -- Username to query
1639 admin_role varchar2(320); -- Role for admin mode
1640 admin_mode varchar2(1) := 'N';
1641 realname varchar2(360); -- Display name of username
1642 s0 varchar2(2000); -- Dummy
1643 l_error_msg varchar2(240);
1644 l_url varchar2(240);
1645 l_media varchar2(240) := wfa_html.image_loc;
1646 l_icon varchar2(40);
1647 l_text varchar2(240);
1648 l_onmouseover varchar2(240);
1649
1650
1651 cursor queues_cursor is
1652 select wfq.protocol,
1653 wfq.inbound_outbound,
1654 wfq.description,
1655 wfq.queue_count
1656 from wf_queues wfq
1657 where NVL(wfq.disable_flag, 'N') = 'N'
1658 order by wfq.protocol, wfq.inbound_outbound;
1659
1660 rowcount number;
1661
1662 begin
1663
1664 -- Check current user has admin authority
1665 wfa_sec.GetSession(username);
1666 username := upper(username);
1667 wf_directory.GetRoleInfo(username, realname, s0, s0, s0, s0);
1668
1669 admin_role := wf_core.translate('WF_ADMIN_ROLE');
1670 if (admin_role = '*' or
1671 Wf_Directory.IsPerformer(username, admin_role)) then
1672 admin_mode := 'Y';
1673 else
1674
1675 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
1676
1677 end if;
1678
1679 -- Set page title
1680 htp.htmlOpen;
1681 htp.headOpen;
1682 htp.p('<BASE TARGET="_top">');
1683 htp.title(wf_core.translate('WFGENERIC_QUEUE_TITLE'));
1684 wfa_html.create_help_function('wf/links/dmr.htm?DMREP');
1685 htp.headClose;
1686 wfa_sec.Header(FALSE, '',wf_core.translate('WFGENERIC_QUEUE_TITLE'), FALSE);
1687 htp.br;
1688
1689 IF (admin_mode = 'N') THEN
1690
1691 htp.center(htf.bold(l_error_msg));
1692 return;
1693
1694 END IF;
1695
1696 -- Column headers
1697 htp.tableOpen(cattributes=>'border=1 cellpadding=3 bgcolor=white width="100%" summary=""');
1698 htp.tableRowOpen(cattributes=>'bgcolor=#006699');
1699
1700
1701 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1702 wf_core.translate('PROTOCOL')||'</font>',
1703 calign=>'Center',
1704 cattributes=>'id="' || wf_core.translate('PROTOCOL') || '"');
1705 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1706 wf_core.translate('QUEUE_DESCRIPTION')||'</font>',
1707 calign=>'Center',
1708 cattributes=>'id="' || wf_core.translate('QUEUE_DESCRIPTION') || '"');
1709 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1710 wf_core.translate('INBOUND_PROMPT')||'</font>',
1711 calign=>'Center',
1712 cattributes=>'id="' || wf_core.translate('INBOUND_PROMPT') || '"');
1713 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1714 wf_core.translate('QUEUE_COUNT')||'</font>',
1715 calign=>'Center',
1716 cattributes=>'id="' || wf_core.translate('QUEUE_COUNT') || '"');
1717 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1718 wf_core.translate('VIEW_DETAIL')||'</font>',
1719 calign=>'Center',
1720 cattributes=>'id="' || wf_core.translate('VIEW_DETAIL') || '"');
1721 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1722 wf_core.translate('DELETE')||'</font>',
1723 calign=>'Center',
1724 cattributes=>'id="' || wf_core.translate('DELETE') || '"');
1725
1726 htp.tableRowClose;
1727 htp.tableRowOpen;
1728 htp.tableRowClose;
1729
1730 -- Show all nodes
1731 for queues in queues_cursor loop
1732
1733 htp.tableRowOpen(null, 'TOP');
1734
1735
1736 htp.tableData(htf.anchor2(
1737 curl=>wfa_html.base_url||
1738 '/wf_queue.generic_queue_edit?p_protocol='||
1739 queues.protocol||'&p_inbound_outbound='||
1740 queues.inbound_outbound,
1741 ctext=>queues.protocol, ctarget=>'_top'),
1742 'Left',
1743 cattributes=>'headers="' ||
1744 wf_core.translate('PROTOCOL') || '"');
1745
1746 htp.tableData(queues.description, 'left',
1747 cattributes=>'headers="' || wf_core.translate('QUEUE_DESCRIPTION') || '"');
1748
1749 htp.tableData(queues.inbound_outbound, 'left',
1750 cattributes=>'headers="' || wf_core.translate('INBOUND_PROMPT') || '"');
1751
1752 htp.tableData(queues.queue_count, 'left',
1753 cattributes=>'headers="' || wf_core.translate('QUEUE_COUNT') || '"');
1754
1755 htp.tableData(htf.anchor2(curl=>wfa_html.base_url||
1756 '/wf_queue.Generic_Queue_View_Detail?p_protocol='||
1757 queues.protocol||'&p_inbound_outbound='||
1758 queues.inbound_outbound,
1759 ctext=>'<IMG SRC="'||wfa_html.image_loc||'affind.gif" alt="'||wf_core.translate('FIND') || '"BORDER=0>'),
1760 'center', cattributes=>'valign="MIDDLE"
1761 headers="' || wf_core.translate('VIEW_DETAIL') || '"');
1762
1763 htp.tableData(htf.anchor2(curl=>wfa_html.base_url||
1764 '/wf_queue.generic_queue_confirm_delete?p_protocol='||
1765 queues.protocol||'&p_inbound_outbound='||
1766 queues.inbound_outbound,
1767 ctext=>'<IMG SRC="'||wfa_html.image_loc||'FNDIDELR.gif" alt="' || wf_core.translate('WFRTG_DELETE') || '" BORDER=0>'),
1768 'center', cattributes=>'valign="MIDDLE"
1769 headers="' || wf_core.translate('DELETE') || '"');
1770
1771
1772 end loop;
1773
1774 htp.tableclose;
1775
1776 htp.br;
1777
1778 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
1779
1780 --Add new node Button
1781 htp.tableRowOpen;
1782
1783 l_url := wfa_html.base_url||'/wf_queue.generic_queue_edit';
1784 l_icon := 'FNDJLFOK.gif';
1785 l_text := wf_core.translate ('WFQUEUE_CREATE');
1786 l_onmouseover := wf_core.translate ('WFQUEUE_CREATE');
1787
1788 htp.p('<TD ID="">');
1789
1790 wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
1791
1792 htp.p('</TD>');
1793
1794 htp.tableRowClose;
1795
1796 htp.tableclose;
1797
1798 wfa_sec.Footer;
1799 htp.htmlClose;
1800
1801 exception
1802 when others then
1803 wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_Display');
1804 raise;
1805 end Generic_Queue_Display;
1806
1807 --
1808 -- Generic_Queue_View_Detail
1809 -- Produce list of generic_queues
1810 --
1811 -- MODIFICATION LOG:
1812 -- 06-JUN-2001 JWSMITH BUG 1819232 - added alt attrib for IMG tag for ADA
1813 -- - Added summary attribute for table tags for ADA
1814 --
1815 procedure Generic_Queue_View_Detail (
1816 p_protocol IN VARCHAR2 DEFAULT NULL,
1817 p_inbound_outbound IN VARCHAR2 DEFAULT NULL
1818 ) IS
1819 l_count number := 0;
1820 username varchar2(320); -- Username to query
1821 admin_role varchar2(320); -- Role for admin mode
1822 admin_mode varchar2(1) := 'N';
1823 realname varchar2(360); -- Display name of username
1824 s0 varchar2(2000); -- Dummy
1825 l_error_msg varchar2(240);
1826 l_url varchar2(240);
1827 l_media varchar2(240) := wfa_html.image_loc;
1828 l_icon varchar2(40);
1829 l_text varchar2(240);
1830 l_onmouseover varchar2(240);
1831 l_sql varchar2(1000);
1832
1833 begin
1834
1835 -- Check current user has admin authority
1836 wfa_sec.GetSession(username);
1837 username := upper(username);
1838 wf_directory.GetRoleInfo(username, realname, s0, s0, s0, s0);
1839
1840 admin_role := wf_core.translate('WF_ADMIN_ROLE');
1841 if (admin_role = '*' or
1842 Wf_Directory.IsPerformer(username, admin_role)) then
1843 admin_mode := 'Y';
1844 else
1845
1846 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
1847
1848 end if;
1849
1850 -- Set page title
1851 htp.htmlOpen;
1852 htp.headOpen;
1853 htp.p('<BASE TARGET="_top">');
1854 htp.title(wf_core.translate('WFGENERIC_QUEUE_TITLE'));
1855 wfa_html.create_help_function('wf/links/dmr.htm?DMREP');
1856 htp.headClose;
1857 wfa_sec.Header(FALSE, '',wf_core.translate('WFGENERIC_QUEUE_TITLE'), FALSE);
1858 htp.br;
1859
1860 IF (admin_mode = 'N') THEN
1861
1862 htp.center(htf.bold(l_error_msg));
1863 return;
1864
1865 END IF;
1866
1867 SELECT queue_count
1868 INTO l_count
1869 FROM wf_queues
1870 WHERE UPPER(p_protocol) = protocol
1871 AND p_inbound_outbound = inbound_outbound;
1872
1873 -- Column headers
1874 htp.tableOpen(cattributes=>'border=1 cellpadding=3 bgcolor=white width="100%" summary=""');
1875 htp.tableRowOpen(cattributes=>'bgcolor=#006699');
1876
1877 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1878 wf_core.translate('PROTOCOL')||'</font>',
1879 calign=>'Center',
1880 cattributes=>'id="' ||
1881 wf_core.translate('PROTOCOL') || '"');
1882 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1883 wf_core.translate('QUEUE_NUMBER')||'</font>',
1884 calign=>'Center',
1885 cattributes=>'id="' ||
1886 wf_core.translate('QUEUE_NUMBER') || '"');
1887 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1888 wf_core.translate('QUEUE_NAME')||'</font>',
1889 calign=>'Center',
1890 cattributes=>'id="' ||
1891 wf_core.translate('QUEUE_NAME') || '"');
1892 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1893 wf_core.translate('QUEUE_COUNT')||'</font>',
1894 calign=>'Center',
1895 cattributes=>'id="' ||
1896 wf_core.translate('QUEUE_COUNT') || '"');
1897 htp.tableHeader(cvalue=>'<font color=#FFFFFF>'||
1898 wf_core.translate('VIEW_DETAIL')||'</font>',
1899 calign=>'Center',
1900 cattributes=>'id="' ||
1901 wf_core.translate('VIEW_DETAIL') || '"');
1902
1903 htp.tableRowClose;
1904 htp.tableRowOpen;
1905 htp.tableRowClose;
1906
1907 -- Show all queues for the given protocol
1908 for ii in 1..l_count loop
1909
1910 htp.tableRowOpen(null, 'TOP');
1911
1912 htp.tableData(p_protocol, 'left', cattributes=>'headers="' ||
1913 wf_core.translate('PROTOCOL') || '"');
1914
1915 htp.tableData(to_char(ii), 'left', cattributes=>'headers="' ||
1916 wf_core.translate('QUEUE_NUMBER') || '"');
1917
1918 -- p_protocol and p_inbound_outbound were verified above
1919 -- ii must be a number
1920 -- BINDVAR_SCAN_IGNORE
1921 htp.tableData(wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE', 'left', cattributes=>'headers="' || wf_core.translate('QUEUE_NAME') || '"');
1922
1923 /*
1924 ** Check to see if there are any messages in the specified queue
1925 */
1926 l_sql := 'SELECT COUNT(1) FROM WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE';
1927
1928 execute immediate l_sql INTO l_count;
1929
1930 htp.tableData(to_char(l_count), 'left', cattributes=>'headers="' ||
1931 wf_core.translate('QUEUE_COUNT') || '"');
1932
1933 htp.tableData(htf.anchor2(curl=>wfa_html.base_url||
1934 '/wf_queue.generic_queue_display_contents?p_protocol='||
1935 p_protocol||'&p_inbound_outbound='||
1936 p_inbound_outbound||'&p_queue_number='||
1937 to_char(ii)||'&p_message_number=1',
1938 ctext=>'<IMG SRC="'||wfa_html.image_loc||'affind.gif" alt="' || wf_core.translate('FIND') || '" BORDER=0>'),
1939 'center', cattributes=>'valign="MIDDLE" headers="' || wf_core.translate('VIEW_DETAIL') || '"');
1940
1941
1942 end loop;
1943
1944 htp.tableclose;
1945
1946 htp.br;
1947
1948 wfa_sec.Footer;
1949 htp.htmlClose;
1950
1951 exception
1952 when others then
1953 wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_View_Detail');
1954 raise;
1955 end Generic_Queue_View_Detail;
1956
1957
1958 -- MODIFICATION LOG:
1959 -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA
1960 -- - Added ID attr for TD tags for ADA
1961 procedure generic_queue_display_contents
1962 (p_protocol IN VARCHAR2 DEFAULT NULL,
1963 p_inbound_outbound IN VARCHAR2 DEFAULT NULL,
1964 p_queue_number IN NUMBER DEFAULT NULL,
1965 p_message_number IN NUMBER DEFAULT 1) IS
1966
1967 username varchar2(320); -- Username to query
1968 admin_role varchar2(320); -- Role for admin mode
1969 admin_mode varchar2(1) := 'N';
1970 l_media varchar2(240) := wfa_html.image_loc;
1971 l_icon varchar2(40) := 'FNDILOV.gif';
1972 l_text varchar2(240) := '';
1973 l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV');
1974 l_url varchar2(4000);
1975 l_error_msg varchar2(240);
1976
1977 l_more_data BOOLEAN := TRUE;
1978 l_message system.wf_message_payload_t;
1979 dequeue_options dbms_aq.dequeue_options_t;
1980 message_properties dbms_aq.message_properties_t;
1981 ii number := 0;
1982 l_loc number := 1;
1983 l_message_contents VARCHAR2(32000);
1984 l_message_offset binary_integer := 16000;
1985 l_queue_name varchar2(30);
1986 l_msg_id RAW(16);
1987
1988 begin
1989
1990 -- Check current user has admin authority
1991 wfa_sec.GetSession(username);
1992 username := upper(username);
1993
1994 admin_role := wf_core.translate('WF_ADMIN_ROLE');
1995 if (admin_role = '*' or
1996 Wf_Directory.IsPerformer(username, admin_role)) then
1997 admin_mode := 'Y';
1998 else
1999
2000 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
2001
2002 end if;
2003
2004 -- Set page title
2005 htp.htmlOpen;
2006 htp.headOpen;
2007 htp.title(wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'));
2008 wfa_html.create_help_function('wf/links/dmr.htm?DMREP');
2009
2010 wf_lov.OpenLovWinHtml;
2011
2012 htp.headClose;
2013
2014 -- Page header
2015 wfa_sec.Header(FALSE, '', wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'), TRUE);
2016
2017 IF (admin_mode = 'N') THEN
2018
2019 htp.center(htf.bold(l_error_msg));
2020 return;
2021
2022 END IF;
2023
2024 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
2025
2026 htp.p('<FORM NAME="FND_QUEUE_CONTENTS" ACTION="wf_queue.generic_queue_update" METHOD="POST">');
2027
2028 /*
2029 ** Create a page with a form field with the message payload
2030 */
2031 dequeue_options.dequeue_mode := dbms_aq.BROWSE;
2032 dequeue_options.wait := dbms_aq.NO_WAIT;
2033 dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
2034
2035 l_queue_name := wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||
2036 to_char(p_queue_number)||'_QUEUE';
2037
2038 dbms_aq.dequeue
2039 (queue_name => l_queue_name,
2040 dequeue_options => dequeue_options,
2041 message_properties => message_properties,
2042 payload => l_message,
2043 msgid => l_msg_id
2044 );
2045
2046 dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
2047
2048 -- Loop until you reach the requested message
2049 for ii in 2..p_message_number loop
2050
2051 htp.p (to_char(ii));
2052
2053 dbms_aq.dequeue
2054 (queue_name => l_queue_name,
2055 dequeue_options => dequeue_options,
2056 message_properties => message_properties,
2057 payload => l_message,
2058 msgid => l_msg_id
2059 );
2060
2061 end loop;
2062
2063 -- Display the contents
2064 htp.tableRowOpen;
2065
2066 htp.p ('<TD ID="" ALIGN="Left">');
2067
2068 htp.p ('<TEXTAREA NAME="message_content" ROWS=26 COLS=120 WRAP="SOFT">');
2069
2070 while (l_more_data = TRUE) loop
2071
2072 BEGIN
2073
2074 dbms_lob.read(l_message.message, l_message_offset, l_loc, l_message_contents);
2075
2076 htp.p(l_message_contents);
2077
2078 l_loc := l_loc + l_message_offset;
2079
2080 if (l_message_offset < 16000) then
2081
2082 l_more_data := FALSE;
2083
2084 end if;
2085
2086 EXCEPTION
2087 WHEN NO_DATA_FOUND THEN
2088 l_more_data := FALSE;
2089 WHEN OTHERS THEN
2090 RAISE;
2091 END;
2092
2093 END LOOP;
2094
2095 htp.p ('</TEXTAREA>');
2096
2097 htp.p ('</TD>');
2098
2099 htp.tableRowClose;
2100
2101 htp.tableclose;
2102
2103 htp.formClose;
2104
2105 htp.br;
2106
2107 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
2108
2109 --Next Button
2110
2111 htp.tableRowOpen;
2112
2113 l_url := wfa_html.base_url||
2114 '/wf_queue.generic_queue_display_contents'||
2115 '?p_protocol='||p_protocol||
2116 '&p_inbound_outbound='||p_inbound_outbound||
2117 '&p_queue_number='||to_char(p_queue_number)||
2118 '&p_message_number='||to_char(p_message_number + 1);
2119
2120 l_icon := 'FNDJLFOK.gif';
2121 l_text := wf_core.translate ('NEXT');
2122 l_onmouseover := wf_core.translate ('NEXT');
2123
2124 htp.p('<TD ID="">');
2125
2126 wfa_html.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2127
2128 htp.p('</TD>');
2129
2130 if (p_message_number > 1) then
2131
2132 l_url := wfa_html.base_url||
2133 '/wf_queue.generic_queue_display_contents'||
2134 '?p_protocol='||p_protocol||
2135 '&p_inbound_outbound='||p_inbound_outbound||
2136 '&p_queue_number='||to_char(p_queue_number)||
2137 '&p_message_number='||to_char(p_message_number - 1);
2138
2139 l_icon := 'FNDJLFCN.gif';
2140 l_text := wf_core.translate ('PREVIOUS');
2141 l_onmouseover := wf_core.translate ('PREVIOUS');
2142
2143 htp.p('<TD ID="">');
2144
2145 wfa_html.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2146
2147 htp.p('</TD>');
2148
2149 end if;
2150
2151 htp.tableRowClose;
2152
2153 htp.tableclose;
2154
2155 wfa_sec.Footer;
2156
2157 htp.htmlClose;
2158
2159 exception
2160 when others then
2161 Wf_Core.Context('Wf_Queue', 'generic_queue_display_contents',
2162 p_protocol, p_inbound_outbound);
2163 raise;
2164
2165 end generic_queue_display_contents;
2166
2167
2168
2169 -- MODIFICATION LOG:
2170 -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA
2171 -- - Added ID attr for TD tags
2172 procedure Generic_Queue_Edit (
2173 p_protocol IN VARCHAR2 DEFAULT NULL,
2174 p_inbound_outbound IN VARCHAR2 DEFAULT NULL
2175 ) IS
2176
2177 username varchar2(320); -- Username to query
2178 admin_role varchar2(320); -- Role for admin mode
2179 admin_mode varchar2(1) := 'N';
2180 l_inbound_selected varchar2(1) := 'N';
2181 l_outbound_selected varchar2(1) := 'N';
2182 l_description VARCHAR2(240);
2183 l_queue_count NUMBER;
2184 l_media varchar2(240) := wfa_html.image_loc;
2185 l_icon varchar2(40) := 'FNDILOV.gif';
2186 l_text varchar2(240) := '';
2187 l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV');
2188 l_url varchar2(4000);
2189 l_error_msg varchar2(240);
2190
2191 BEGIN
2192
2193 -- Check current user has admin authority
2194 wfa_sec.GetSession(username);
2195 username := upper(username);
2196
2197 admin_role := wf_core.translate('WF_ADMIN_ROLE');
2198 if (admin_role = '*' or
2199 Wf_Directory.IsPerformer(username, admin_role)) then
2200 admin_mode := 'Y';
2201 else
2202
2203 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
2204
2205 end if;
2206
2207 /*
2208 ** If this protocol already exists then go fetch the definition
2209 */
2210 IF (p_protocol IS NOT NULL) THEN
2211
2212 SELECT description,
2213 queue_count
2214 INTO l_description,
2215 l_queue_count
2216 FROM wf_queues
2217 WHERE protocol = p_protocol
2218 AND inbound_outbound = p_inbound_outbound;
2219
2220 END IF;
2221
2222 -- Set page title
2223 htp.htmlOpen;
2224 htp.headOpen;
2225 htp.title(wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'));
2226 wfa_html.create_help_function('wf/links/dmr.htm?DMREP');
2227
2228 wf_lov.OpenLovWinHtml;
2229
2230 htp.headClose;
2231
2232 -- Page header
2233 wfa_sec.Header(FALSE, '', wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'), TRUE);
2234
2235 IF (admin_mode = 'N') THEN
2236
2237 htp.center(htf.bold(l_error_msg));
2238 return;
2239
2240 END IF;
2241
2242 htp.tableopen(calign=>'CENTER',cattributes=>'summary="' || wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE') || '"');
2243
2244 htp.p('<FORM NAME="FND_GENERIC_QUEUE" ACTION="wf_queue.generic_queue_update" METHOD="POST">');
2245
2246 -- Protocol Name
2247 htp.tableRowOpen;
2248 htp.tableData(cvalue=>'<LABEL FOR="i_protocol">' ||
2249 wf_core.translate('PROTOCOL') || '</LABEL>',
2250 calign=>'right',
2251 cattributes=>'id=""');
2252
2253 htp.tableData(htf.formText(cname=>'p_protocol', csize=>'30',
2254 cvalue=>p_protocol, cmaxlength=>'30',
2255 cattributes=>'id="i_protocol"'),
2256 cattributes=>'id=""');
2257
2258 htp.tableRowClose;
2259
2260 -- Inbound/outbound
2261 htp.tableRowOpen;
2262 htp.tableData(cvalue=>'<LABEL FOR="i_inbound_outbound">' ||
2263 wf_core.translate('INBOUND_OUTBOUND') || '</LABEL>',
2264 calign=>'right',
2265 cattributes=>'id=""');
2266
2267 if (NVL(p_inbound_outbound, 'OUTBOUND') = 'INBOUND') then
2268
2269 l_inbound_selected := 'Y';
2270 l_outbound_selected := NULL;
2271
2272 else
2273
2274 l_inbound_selected := NULL;
2275 l_outbound_selected := 'Y';
2276
2277 end if;
2278
2279 htp.p('<TD ID="">');
2280
2281 htp.formSelectOpen(cname=>'p_inbound_outbound',cattributes=>'id="i_inbound_outbound"');
2282
2283 htp.formSelectOption(cvalue=>wf_core.translate('INBOUND'),
2284 cattributes=>'value=INBOUND',
2285 cselected=>l_inbound_selected);
2286
2287 htp.formSelectOption(cvalue=>wf_core.translate('OUTBOUND'),
2288 cattributes=>'value=OUTBOUND',
2289 cselected=>l_outbound_selected);
2290
2291 htp.formSelectClose;
2292 htp.p('</TD>');
2293
2294 htp.tableRowClose;
2295
2296 -- Description
2297 htp.tableRowOpen;
2298 htp.tableData(cvalue=>'<LABEL FOR="i_description">' ||
2299 wf_core.translate('DESCRIPTION') || '"',
2300 calign=>'right',
2301 cattributes=>'id=""');
2302
2303 htp.tableData(htf.formText(cname=>'p_description', csize=>'30',
2304 cvalue=>l_description, cmaxlength=>'240',
2305 cattributes=>'id="i_description"'),
2306 cattributes=>'id=""');
2307
2308 htp.tableRowClose;
2309
2310 -- Count
2311 htp.tableRowOpen;
2312 htp.tableData(cvalue=>'<LABEL FOR="i_count">' ||
2313 wf_core.translate('COUNT') || '"',
2314 calign=>'right',
2315 cattributes=>'id=""');
2316
2317 htp.tableData(htf.formText(cname=>'p_queue_count', csize=>'10',
2318 cvalue=>l_queue_count, cmaxlength=>'20',
2319 cattributes=>'id="i_count"'),
2320 cattributes=>'id=""');
2321
2322 htp.tableRowClose;
2323
2324 -- keep track of the original protocol and the inbound/outbound
2325 -- value in case the name changes
2326
2327 htp.formHidden(cname=>'p_original_protocol', cvalue=>p_protocol);
2328 htp.formHidden(cname=>'p_original_inbound', cvalue=>p_inbound_outbound);
2329
2330 htp.tableclose;
2331
2332 htp.br;
2333
2334 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
2335
2336 --Submit Button
2337
2338 htp.tableRowOpen;
2339
2340 l_url := 'javascript:document.FND_GENERIC_QUEUE.submit()';
2341 l_icon := 'FNDJLFOK.gif';
2342 l_text := wf_core.translate ('WFMON_OK');
2343 l_onmouseover := wf_core.translate ('WFMON_OK');
2344
2345 htp.p('<TD ID="">');
2346
2347 wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2348
2349 htp.p('</TD>');
2350
2351 l_url := wfa_html.base_url||'/fnd_document_management.Generic_Queue_Display';
2352 l_icon := 'FNDJLFCN.gif';
2353 l_text := wf_core.translate ('CANCEL');
2354 l_onmouseover := wf_core.translate ('CANCEL');
2355
2356 htp.p('<TD ID="">');
2357
2358 wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2359
2360 htp.p('</TD>');
2361
2362 htp.tableRowClose;
2363
2364 htp.tableclose;
2365
2366 htp.formClose;
2367
2368 wfa_sec.Footer;
2369 htp.htmlClose;
2370
2371
2372 exception
2373 when others then
2374 wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_edit');
2375 raise;
2376
2377 END Generic_Queue_Edit;
2378
2379 procedure generic_queue_delete_check
2380 (p_protocol in varchar2,
2381 p_inbound_outbound in varchar2,
2382 p_queue_start_range in number,
2383 p_queue_end_range in number) IS
2384
2385 ii NUMBER := 0;
2386 l_count NUMBER := 0;
2387 l_sql varchar2(1000);
2388
2389 BEGIN
2390
2391 /*
2392 ** Check to make sure there are no messages in the queue before
2393 ** you delete it.
2394 */
2395 for ii in p_queue_start_range..p_queue_end_range loop
2396
2397 /*
2398 ** Check to see if there are any messages in the specified queue
2399 */
2400 -- p_protocol and p_inbound was verified before coming here.
2401 -- BINDVAR_SCAN_IGNORE
2402 l_sql := 'SELECT COUNT(1) INTO :a FROM WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE';
2403
2404 execute immediate l_sql using l_count;
2405
2406 /*
2407 ** If you find a row then error this call
2408 */
2409 if (l_count > 0) then
2410
2411 wf_core.token('PROTOCOL', p_protocol);
2412 wf_core.token('INBOUND_OUTBOUD', p_inbound_outbound);
2413 wf_core.token('QUEUE_NUMBER', to_char(ii));
2414 wf_core.raise('WFQUEUE_QUEUE_CONTENT');
2415
2416 end if;
2417
2418 end loop;
2419
2420 exception
2421 when others then
2422 Wf_Core.Context('Wf_Queue', 'generic_queue_delete_check',
2423 p_protocol, p_inbound_outbound);
2424 raise;
2425
2426 end generic_queue_delete_check;
2427
2428 procedure generic_queue_delete_queues
2429 (p_protocol in varchar2,
2430 p_inbound_outbound in varchar2,
2431 p_queue_start_range in number,
2432 p_queue_end_range in number) IS
2433
2434 ii NUMBER := 0;
2435 l_count NUMBER := 0;
2436
2437 BEGIN
2438
2439 /*
2440 ** Delete the queues and queue tables
2441 */
2442 for ii in p_queue_start_range..p_queue_end_range loop
2443
2444 /*
2445 ** Stop the queue
2446 */
2447 dbms_aqadm.stop_queue(queue_name => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||
2448 p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||
2449 to_char(ii)||'_QUEUE');
2450 /*
2451 ** Delete the Queues
2452 */
2453 dbms_aqadm.drop_queue(
2454 queue_name => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||
2455 substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE');
2456
2457 /*
2458 ** Delete the Queue Table
2459 */
2460 dbms_aqadm.drop_queue_table (
2461 queue_table => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE');
2462
2463 end loop;
2464
2465 exception
2466 when others then
2467 Wf_Core.Context('Wf_Queue', 'generic_queue_delete_queues',
2468 p_protocol, p_inbound_outbound);
2469 raise;
2470
2471 end generic_queue_delete_queues;
2472
2473
2474
2475 -- MODIFICATION LOG:
2476 -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA
2477 -- - Added ID attr for TD tags
2478 procedure Generic_Queue_Update (
2479 p_protocol IN VARCHAR2 DEFAULT NULL,
2480 p_inbound_outbound IN VARCHAR2 DEFAULT NULL,
2481 p_description IN VARCHAR2 DEFAULT NULL,
2482 p_queue_count IN VARCHAR2 DEFAULT NULL,
2483 p_original_protocol IN VARCHAR2 DEFAULT NULL,
2484 p_original_inbound IN VARCHAR2 DEFAULT NULL
2485 ) IS
2486
2487 username varchar2(320); -- Username to query
2488 admin_role varchar2(320); -- Role for admin mode
2489 admin_mode varchar2(1) := 'N';
2490 l_count number := 0;
2491 l_media varchar2(240) := wfa_html.image_loc;
2492 l_icon varchar2(30) := 'FNDILOV.gif';
2493 l_text varchar2(240) := '';
2494 l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV');
2495 l_url varchar2(4000);
2496 l_error_msg varchar2(240);
2497
2498 BEGIN
2499
2500 -- Check current user has admin authority
2501 wfa_sec.GetSession(username);
2502 username := upper(username);
2503
2504 admin_role := wf_core.translate('WF_ADMIN_ROLE');
2505
2506 if (admin_role = '*' or
2507 Wf_Directory.IsPerformer(username, admin_role)) then
2508 admin_mode := 'Y';
2509 else
2510
2511 l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN');
2512
2513 end if;
2514
2515 IF (admin_mode = 'N') THEN
2516
2517 htp.center(htf.bold(l_error_msg));
2518 return;
2519
2520 END IF;
2521
2522 -- Check to make sure the protocol does not already exist
2523 IF (p_original_protocol IS NULL) THEN
2524
2525 SELECT count(1)
2526 INTO l_count
2527 FROM wf_queues
2528 WHERE UPPER(p_protocol) = protocol
2529 AND p_inbound_outbound = inbound_outbound;
2530
2531 if (l_count > 0) then
2532
2533 htp.p('<BODY bgcolor=#cccccc>');
2534 htp.center(htf.bold(wf_core.translate('WFQUEUE_ALREADY_EXISTS')));
2535 htp.br;
2536
2537 htp.tableopen(calign=>'CENTER',cattributes=>'summary=""');
2538
2539 --Submit Button
2540
2541 htp.tableRowOpen;
2542
2543 l_url := wfa_html.base_url||
2544 '/wf_queue.generic_queue_edit';
2545 l_icon := 'FNDJLFOK.gif';
2546 l_text := wf_core.translate ('WFMON_OK');
2547 l_onmouseover := wf_core.translate ('WFMON_OK');
2548
2549 htp.p('<TD ID="">');
2550
2551 wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text);
2552
2553 htp.p('</TD>');
2554 htp.tablerowclose;
2555 htp.tableclose;
2556 htp.p('</BODY>');
2557 return;
2558
2559 else
2560
2561 wf_queue.create_generic_queue (p_protocol=>p_protocol,
2562 p_inbound_outbound => p_inbound_outbound,
2563 p_description => p_description,
2564 p_queue_count => to_number(p_queue_count));
2565
2566 end if;
2567
2568 else
2569 null;
2570
2571 /*
2572 wf_queue.update_generic_queue (p_protocol=>p_protocol,
2573 p_inbound_outbound => p_inbound_outbound,
2574 p_description => p_description,
2575 p_queue_count => to_number(p_queue_count),
2576 p_original_protocol=> p_original_protocol,
2577 p_original_inbound=> p_original_inbound);
2578
2579 */
2580 end if;
2581
2582
2583 -- use owa_util.redirect_url to redirect the URL to the home page
2584 owa_util.redirect_url(curl=>wfa_html.base_url ||
2585 '/wf_queue.Generic_Queue_Display',
2586 bclose_header=>TRUE);
2587
2588
2589 exception
2590 when others then
2591 wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_update');
2592 raise;
2593
2594 END Generic_Queue_Update;
2595
2596
2597 /*
2598 ** Create a generic queue with the object type of WF_MESSAGE_PAYLOAD_T which
2599 ** is basically just a clob
2600 */
2601 procedure create_generic_queue
2602 (p_protocol IN VARCHAR2,
2603 p_inbound_outbound IN VARCHAR2,
2604 p_description IN VARCHAR2,
2605 p_queue_count IN NUMBER) IS
2606
2607 l_count NUMBER := 0;
2608
2609 begin
2610
2611 /*
2612 ** Check to see if the queue name already exists
2613 */
2614 select count(1)
2615 into l_count
2616 from wf_queues wfq
2617 where wfq.protocol = p_protocol
2618 and wfq.inbound_outbound = p_inbound_outbound;
2619
2620 /*
2621 ** If you find a row then error this call
2622 */
2623 if (l_count > 0) then
2624
2625 wf_core.token('PROTOCOL', p_protocol);
2626 wf_core.raise('WFQUEUE_UNIQUE_NAME');
2627
2628 end if;
2629
2630 for ii in 1..p_queue_count loop
2631
2632 /*
2633 ** Create New Queue Table
2634 */
2635 dbms_aqadm.create_queue_table (
2636 queue_table => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE',
2637 queue_payload_type => 'SYSTEM.WF_MESSAGE_PAYLOAD_T',
2638 storage_clause => 'storage (initial 1m next 1m pctincrease 0 )',
2639 sort_list => 'PRIORITY,ENQ_TIME',
2640 comment => wf_core.translate('WORKFLOW_USER_QUEUE_TABLE')||' - '||
2641 wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE');
2642
2643 /*
2644 ** Create New Queues
2645 */
2646 dbms_aqadm.create_queue(
2647 queue_name => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||
2648 substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE',
2649 queue_table => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||
2650 substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE',
2651 max_retries => 0,
2652 comment => wf_core.translate('WORKFLOW_USER_QUEUE')||' - '||
2653 wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||
2654 substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE');
2655
2656 /*
2657 ** Start the queue
2658 */
2659 dbms_aqadm.start_queue(queue_name => wf_core.translate('WF_SCHEMA')||'.'||
2660 'WF_'||p_protocol||'_'||
2661 substr(p_inbound_outbound, 1, 1)|| '_'||to_char(ii)||'_QUEUE');
2662
2663 end loop;
2664
2665 /*
2666 ** Create an entry in WF_QUEUES table
2667 */
2668 insert into wf_queues
2669 (protocol,
2670 inbound_outbound,
2671 description,
2672 queue_count,
2673 disable_flag)
2674 values
2675 (p_protocol,
2676 p_inbound_outbound,
2677 p_description,
2678 p_queue_count,
2679 'N');
2680
2681 exception
2682 when others then
2683 Wf_Core.Context('Wf_Queue', 'create_generic_queue', p_protocol,
2684 p_inbound_outbound);
2685 raise;
2686
2687 end create_generic_queue;
2688
2689 /*
2690 ** delete a generic queue with the object type of WF_MESSAGE_PAYLOAD_T which
2691 ** is basically just a clob
2692 */
2693 procedure delete_generic_queue
2694 (p_protocol IN VARCHAR2,
2695 p_inbound_outbound IN VARCHAR2) IS
2696
2697 l_queue_count NUMBER := 0;
2698
2699 begin
2700
2701 /*
2702 ** Check to see if the queue name already exists
2703 */
2704 begin
2705
2706 select queue_count
2707 into l_queue_count
2708 from wf_queues wfq
2709 where wfq.protocol = p_protocol
2710 and wfq.inbound_outbound = p_inbound_outbound;
2711
2712 exception
2713 when no_data_found then
2714 wf_core.token('PROTOCOL', p_protocol);
2715 wf_core.raise('WFQUEUE_NOEXIST');
2716 when others then
2717 raise;
2718
2719 end;
2720
2721 /*
2722 ** Make sure the queues are empty
2723 */
2724 wf_queue.generic_queue_delete_check (p_protocol, p_inbound_outbound,
2725 1, l_queue_count);
2726
2727 /*
2728 ** Delete the queues and queue tables
2729 */
2730 wf_queue.generic_queue_delete_queues(p_protocol, p_inbound_outbound,
2731 1, l_queue_count);
2732
2733 /*
2734 ** delete an entry in WF_QUEUES table
2735 */
2736 delete from wf_queues
2737 where protocol = p_protocol
2738 and inbound_outbound = p_inbound_outbound;
2739
2740 exception
2741 when others then
2742 Wf_Core.Context('Wf_Queue', 'delele_generic_queue', p_protocol,
2743 p_inbound_outbound);
2744 raise;
2745
2746 end delete_generic_queue;
2747
2748 /*
2749 ** Procedure: get_hash_queue_name
2750 **
2751 ** Description: Load all queue definitions into memory. The use a hashing algorithm
2752 ** to return a queue name
2753 */
2754 procedure get_hash_queue_name
2755 (p_protocol in varchar2,
2756 p_inbound_outbound in varchar2,
2757 p_queue_name out NOCOPY varchar2) IS
2758
2759 qii number := 1;
2760 ii number := 1;
2761 l_index number := 0;
2762 l_queue_name varchar2(30) := null;
2763
2764 cursor get_queues is
2765 select protocol, inbound_outbound, queue_count
2766 from wf_queues
2767 order by protocol, inbound_outbound;
2768
2769 begin
2770
2771 /*
2772 ** Check to see if queues loaded into memory. If they are not
2773 ** already loaded
2774 */
2775 if (wf_queue.queue_names_index.count < 1) then
2776
2777 -- Show all nodes
2778 for wf_queues_list in get_queues loop
2779
2780 wf_queue.queue_names_index(ii).protocol := wf_queues_list.protocol;
2781 wf_queue.queue_names_index(ii).inbound_outbound := wf_queues_list.inbound_outbound;
2782 wf_queue.queue_names_index(ii).queue_count := wf_queues_list.queue_count;
2783
2784 ii := ii + 1;
2785
2786 end loop;
2787
2788 end if;
2789
2790 -- Go find the locator in the queue list that matches the request
2791 for ii in 1..wf_queue.queue_names_index.count loop
2792
2793 if (wf_queue.queue_names_index(ii).protocol = p_protocol AND
2794 wf_queue.queue_names_index(ii).inbound_outbound = p_inbound_outbound) THEN
2795
2796 -- If there is more than 1 queue then choose the queue based on a random
2797 -- number generator
2798 if (wf_queue.queue_names_index(ii).queue_count > 1) then
2799
2800 l_index := mod(to_number(wf_core.random), wf_queue.queue_names_index(ii).queue_count) + 1;
2801
2802 else
2803
2804 l_index := 1;
2805
2806 end if;
2807
2808 end if;
2809
2810 end loop;
2811
2812 if (l_index > 0) then
2813
2814 p_queue_name := wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'|| SUBSTR(p_inbound_outbound, 1, 1)||
2815 '_'||to_char(l_index)||'_QUEUE';
2816
2817 end if;
2818
2819 exception
2820 when others then
2821 Wf_Core.Context('Wf_Queue', 'get_hash_generic_queue',
2822 p_protocol, p_inbound_outbound);
2823 raise;
2824
2825 end get_hash_queue_name;
2826
2827 --
2828 -- Function: enable_exception_queue
2829 --
2830 -- Enable the exception queue for the queue table for dequing
2831 -- Returns the name of the exception queue for the given queue name
2832 --
2833 function enable_Exception_Queue(p_queue_name in varchar2) return varchar2
2834 is
2835 l_schema_name varchar(320);
2836 l_queue_name varchar2(30);
2837 l_pos integer := 0;
2838 l_queue_table varchar2(30);
2839 l_dequeue_enabled varchar2(7) := '';
2840 l_exception_queue varchar2(100) := '';
2841
2842 begin
2843 -- Check to see if the name has a schema. Rove it for the check.
2844 l_pos := instrb(p_queue_name,'.');
2845 if l_pos > 0 then
2846 l_schema_name := substrb(p_queue_name, 1, l_pos-1);
2847 l_queue_name := substrb(p_queue_name, l_pos+1);
2848 else
2849 l_schema_name := wf_core.translate('WF_SCHEMA');
2850 l_queue_name := p_queue_name;
2851 end if;
2852 begin
2853 select queue_table, dequeue_enabled
2854 into l_queue_table, l_dequeue_enabled
2855 from all_queues
2856 where owner = l_schema_name
2857 and name = l_queue_name;
2858 l_exception_queue := l_schema_name||'.'||'AQ$_'||
2859 l_queue_table||'_E';
2860 exception
2861 when no_data_found then
2862 l_exception_queue := '';
2863 l_dequeue_enabled := '';
2864 when others then
2865 raise;
2866 end;
2867
2868 if l_exception_queue <> '' and l_dequeue_enabled = 'NO' then
2869 dbms_aqadm.start_queue(queue_name => l_exception_queue,
2870 enqueue => FALSE,
2871 dequeue => TRUE);
2872 end if;
2873 return l_exception_queue;
2874
2875 exception
2876 when others then
2877 WF_CORE.Context('WF_QUEUE','Enable_Exception_Queue',p_queue_name);
2878 raise;
2879
2880 end enable_Exception_Queue;
2881
2882 -- ====================================================================
2883 -- Add Subscriber to Queue (PUBLIC)
2884 -- ====================================================================
2885 procedure AddSubscriber(queuename in varchar2,
2886 name in varchar2)
2887 as
2888 lagent sys.aq$_agent;
2889 begin
2890 lagent := sys.aq$_agent(name,'',0);
2891
2892 DBMS_AQADM.Add_Subscriber(
2893 queue_name=>queuename,
2894 subscriber=>lagent,
2895 rule=>'CORRID like '''||name||'%'''
2896 );
2897
2898 exception
2899 when OTHERS then
2900 Wf_Core.Context('WF_QUEUE','AddSubscriber',queuename, name);
2901 raise;
2902 end AddSubscriber;
2903
2904 -- Bug 2307428
2905 -- ====================================================================
2906 -- Enable Inbound and defrerred queues for Background Engine.
2907 -- ====================================================================
2908 procedure EnableBackgroundQueues as
2909 schema varchar2(320);
2910 queue_name varchar2(80);
2911 l_qname varchar2(80);
2912 CURSOR q_disabled (schema varchar2, queue_name varchar2) is
2913 SELECT name
2914 FROM all_queues
2915 WHERE name = queue_name
2916 AND owner = schema
2917 AND ((trim(enqueue_enabled) = 'NO') OR (trim(dequeue_enabled) = 'NO'));
2918
2919 begin
2920 --If the queue names haven't been set,initialise them
2921 if (wf_queue.name_init = FALSE) then
2922 wf_queue.set_queue_names;
2923 end if;
2924
2925 --Obtain the schema
2926 schema := wf_core.translate('WF_SCHEMA');
2927
2928 --Enable deferred queue
2929 queue_name := substr(wf_queue.deferred_queue_name,length(schema)+2);
2930 OPEN q_disabled (schema, queue_name);
2931 LOOP
2932 FETCH q_disabled into l_qname;
2933 EXIT WHEN q_disabled%NOTFOUND;
2934 DBMS_AQADM.START_QUEUE(wf_queue.deferred_queue_name);
2935 END LOOP;
2936 CLOSE q_disabled;
2937
2938 --Enable inbound queue
2939 queue_name := substr(wf_queue.inbound_queue_name,length(schema)+2);
2940 OPEN q_disabled (schema, queue_name);
2941 LOOP
2942 FETCH q_disabled into l_qname;
2943 EXIT WHEN q_disabled%NOTFOUND;
2944 DBMS_AQADM.START_QUEUE(wf_queue.inbound_queue_name);
2945 END LOOP;
2946 CLOSE q_disabled;
2947 exception
2948 when others then
2949 Wf_core.Context('WF_QUEUE','EnableBackgroundQueues');
2950 raise;
2951 end EnableBackgroundQueues;
2952 -- ====================================================================
2953 -- get Count Message States (PUBLIC)
2954 -- ====================================================================
2955 procedure getCntMsgSt
2956 (p_agent IN VARCHAR2 DEFAULT '%',
2957 p_ready OUT NOCOPY NUMBER,
2958 p_wait OUT NOCOPY NUMBER,
2959 p_processed OUT NOCOPY NUMBER,
2960 p_expired OUT NOCOPY NUMBER,
2961 p_undeliverable OUT NOCOPY NUMBER,
2962 p_error OUT NOCOPY NUMBER)
2963 is
2964
2965 TYPE cntmsgst_t IS REF CURSOR;
2966 l_cntmsgst cntmsgst_t;
2967 l_sqlstmt varchar2(4000);
2968 l_count number := 0;
2969 l_msgstate varchar2(50);
2970 l_pos number := 0;
2971 l_qt varchar2(100);
2972 l_owner varchar2(100);
2973
2974 -- <rraheja:2786474> Gather schema and queue name once rather than in every call for perf.
2975 l_schema varchar2(100);
2976 l_qname varchar2(100);
2977
2978
2979 -- <rraheja:2786474> Changed upper(name) to name as queue_name should be recorded in upper case.
2980 cursor c_localagents(p_agent varchar2) is
2981 select queue_name
2982 from wf_agents
2983 where system_guid = hextoraw(wf_core.translate('WF_SYSTEM_GUID'))
2984 and name like upper(p_agent);
2985
2986 /*
2987 cursor c_qt is
2988 select owner
2989 from all_queue_tables
2990 where queue_table = l_qt;
2991 */
2992
2993 -- <rraheja:2786474> Changed non-cursor single row query to cursor based for improved perf.
2994 cursor c_qtable is
2995 select queue_table
2996 from all_queues
2997 where owner = l_schema
2998 and name = l_qname;
2999 --and queue_type = 'NORMAL_QUEUE';
3000
3001 TABLE_NOTFOUND exception;
3002 pragma EXCEPTION_INIT(TABLE_NOTFOUND,-942);
3003
3004 INVALID_TABLE exception;
3005 pragma EXCEPTION_INIT(INVALID_TABLE,-903);
3006
3007 begin
3008
3009 -- Initialize Out Parameters
3010 p_ready := 0;
3011 p_wait := 0;
3012 p_processed := 0;
3013 p_expired := 0;
3014 p_undeliverable := 0;
3015 p_error := 0;
3016
3017 for i in c_localagents(p_agent) loop
3018
3019 -- Get the Queue Table plus owner
3020 l_pos := nvl(instr(i.queue_name,'.',1,1),0);
3021
3022 -- <rraheja:2786474> Changed non-cursor single row query to cursor and used vars for freq used data
3023 l_schema := substr(i.queue_name,1,l_pos-1);
3024 l_qname := substr(i.queue_name,l_pos+1);
3025 open c_qtable;
3026 fetch c_qtable into l_qt;
3027 close c_qtable;
3028
3029
3030 -- Get the Owner of the Queue Table
3031 -- <rraheja:2786474> queue owner should be = queue table owner, so commenting out the code
3032 /*
3033 open c_qt;
3034 fetch c_qt into l_owner;
3035 exit when c_qt%notfound;
3036 close c_qt;
3037 */
3038 l_owner := l_schema;
3039
3040 -- l_owner and l_qt are selected/derived from our own cursor
3041 -- BINDVAR_SCAN_IGNORE[2]
3042 l_sqlstmt := 'select msg_state, count(*) from '||l_owner||'.'||'aq$'||l_qt
3043 ||' where (queue = :q or queue = :r) group by msg_state';
3044 begin
3045 --If the queue tables are not found then the
3046 --select should throw ORA 942.
3047 --Put the begin catch block of exception at the end
3048 --so that u don't have to use goto's to get out of loop
3049 open l_cntmsgst for l_sqlstmt using l_qname,'AQ$_'|| l_qname ||'_E';
3050 loop
3051 fetch l_cntmsgst into l_msgstate, l_count;
3052 if l_msgstate = 'READY'then
3053 --Bug 2382594
3054 --If the agent is WF_ERROR do not count p_error.
3055 if l_qname = 'WF_ERROR' and p_agent = '%' then
3056 p_error := p_error + l_count;
3057 else
3058 p_ready := p_ready + l_count;
3059 end if;
3060 elsif l_msgstate = 'WAIT' then
3061 p_wait := p_wait + l_count;
3062 elsif l_msgstate = 'PROCESSED' then
3063 p_processed := p_processed + l_count;
3064 elsif l_msgstate = 'EXPIRED' then
3065 p_expired := p_expired + l_count;
3066 elsif l_msgstate = 'UNDELIVERABLE' then
3067 p_undeliverable := p_undeliverable + l_count;
3068 end if;
3069 l_count := 0;
3070
3071 exit when l_cntmsgst%notfound;
3072 end loop;
3073
3074 close l_cntmsgst;
3075 exception
3076 when table_notfound then
3077 --return 0 count instead of throwing error to UI
3078 --all the returns are at their initialized value of 0
3079 --just ensure that the cursor is closed
3080 if (l_cntmsgst%ISOPEN) then
3081 close l_cntmsgst;
3082 end if;
3083 when invalid_table then
3084 --return 0 count instead of throwing error to UI
3085 --all the returns are at their initialized value of 0
3086 --just ensure that the cursor is closed
3087 if (l_cntmsgst%ISOPEN) then
3088 close l_cntmsgst;
3089 end if;
3090 end;
3091
3092
3093 end loop; -- end loop for c_localagents
3094
3095 exception
3096 when OTHERS then
3097 if (l_cntmsgst%ISOPEN)
3098 then
3099 close l_cntmsgst;
3100 end if;
3101
3102 Wf_Core.Context('WF_QUEUE','getCntMsgSt',p_agent);
3103 raise;
3104 end getCntMsgSt;
3105
3106 --
3107 -- move_msgs_excep2normal (CONCURRENT PROGRAM API)
3108 -- API to move messages from the exception queue to the normal queue
3109 -- of the given agent. Handles wf_event_t and JMS_TEXT_MESSAGE payloads.
3110 --
3111 -- OUT
3112 -- errbuf - CP error message
3113 -- retcode - CP return code (0 = success, 1 = warning, 2 = error)
3114 -- IN
3115 -- p_agent_name - Agent name
3116 --
3117 procedure move_msgs_excep2normal(errbuf out nocopy varchar2,
3118 retcode out nocopy varchar2,
3119 p_agent_name in varchar2)
3120 as
3121 l_queue_name varchar2(100);
3122 l_queue_handler varchar2(100);
3123 l_schema varchar2(100);
3124 l_qname varchar2(100);
3125 l_excp_qname varchar2(100);
3126 l_object_type varchar2(100);
3127 l_obj_type varchar2(100);
3128 l_pos number := 0;
3129 l_timeout integer;
3130 l_dequeue_options dbms_aq.dequeue_options_t;
3131 l_enqueue_options dbms_aq.enqueue_options_t;
3132 l_message_properties dbms_aq.message_properties_t;
3133 l_payload_evt wf_event_t;
3134 l_payload_jms sys.aq$_JMS_TEXT_MESSAGE;
3135 l_msg_id raw(16);
3136 invalid_agent exception;
3137 invalid_type exception;
3138 pragma EXCEPTION_INIT(invalid_agent, -20201);
3139 pragma EXCEPTION_INIT(invalid_type, -20202);
3140
3141 begin
3142
3143 begin
3144 SELECT TRIM(queue_name), TRIM(queue_handler)
3145 INTO l_queue_name, l_queue_handler
3146 FROM wf_agents
3147 WHERE name = upper(p_agent_name)
3148 AND SYSTEM_GUID = wf_event.local_system_guid;
3149 exception
3150 when no_data_found then
3151 raise_application_error(-20201, 'Agent not found');
3152 when others then
3153 raise;
3154 end;
3155
3156 l_pos := instr(l_queue_name, '.', 1, 1);
3157 l_schema := substr(l_queue_name, 1, l_pos-1);
3158 l_qname := substr(l_queue_name, l_pos+1);
3159 l_excp_qname := 'AQ$_' || l_qname || '_E';
3160
3161 SELECT TRIM(object_type)
3162 INTO l_object_type
3163 FROM all_queue_tables
3164 WHERE queue_table in
3165 (
3166 SELECT queue_table
3167 FROM all_queues
3168 WHERE name = l_qname
3169 AND owner = l_schema
3170 )
3171 AND owner=l_schema;
3172
3173 l_pos := instr(l_object_type, '.', 1, 1);
3174 l_obj_type := substr(l_object_type, l_pos+1);
3175
3176 l_timeout := 0;
3177 l_dequeue_options.dequeue_mode := dbms_aq.REMOVE;
3178 l_dequeue_options.wait := dbms_aq.NO_WAIT;
3179 l_dequeue_options.consumer_name := null;
3180 l_enqueue_options.visibility := dbms_aq.ON_COMMIT;
3181
3182 if l_obj_type = 'WF_EVENT_T' then
3183 wf_event_t.Initialize(l_payload_evt);
3184 while (l_timeout = 0) loop
3185 begin
3186 --Dequeue the message from the exception queue
3187 dbms_aq.Dequeue(queue_name => l_schema || '.' || l_excp_qname,
3188 dequeue_options => l_dequeue_options,
3189 message_properties => l_message_properties,
3190 payload => l_payload_evt,
3191 msgid => l_msg_id);
3192 l_timeout := 0;
3193 --Enqueue the message in the normal queue
3194 l_message_properties.expiration := dbms_aq.never;
3195 if (upper(p_agent_name) = 'WF_ERROR' OR upper(p_agent_name) = 'WF_IN'
3196 OR upper(p_agent_name) = 'WF_OUT') then
3197 l_message_properties.recipient_list(1) := sys.aq$_agent(p_agent_name,
3198 null,
3199 0);
3200 end if;
3201 dbms_aq.enqueue(queue_name => l_queue_name,
3202 enqueue_options => l_enqueue_options,
3203 message_properties => l_message_properties,
3204 payload => l_payload_evt,
3205 msgid => l_msg_id);
3206 commit;
3207
3208 exception
3209 when dequeue_timeout then
3210 l_timeout := 1;
3211 when others then
3212 raise;
3213 end;
3214 end loop; --End of while loop that handles wf_event_t payload
3215
3216 elsif l_obj_type = 'AQ$_JMS_TEXT_MESSAGE' then
3217 l_timeout := 0;
3218 while (l_timeout = 0) loop
3219 begin
3220 --Dequeue the message from the exception queue
3221 dbms_aq.Dequeue(queue_name => l_schema || '.' || l_excp_qname,
3222 dequeue_options => l_dequeue_options,
3223 message_properties => l_message_properties,
3224 payload => l_payload_jms,
3225 msgid => l_msg_id);
3226 l_timeout := 0;
3227 --Enqueue the message in the normal queue of the given agent
3228 l_message_properties.expiration := dbms_aq.never;
3229 dbms_aq.enqueue(queue_name => l_queue_name,
3230 enqueue_options => l_enqueue_options,
3231 message_properties => l_message_properties,
3232 payload => l_payload_jms,
3233 msgid => l_msg_id);
3234 commit;
3235
3236 exception
3237 when dequeue_timeout then
3238 l_timeout := 1;
3239 when others then
3240 raise;
3241 end;
3242 end loop; --End of while loop that handles AQ$_JMS_TEXT_MESSAGE payload
3243
3244 else
3245 -- Payload not supported by this API, raise application error
3246 raise_application_error(-20202, 'Invalid payload type');
3247 end if;
3248
3249 errbuf := '';
3250 retcode := '0';
3251
3252 exception
3253 when invalid_agent then
3254 errbuf := 'The agent ' || p_agent_name || ' is not found ';
3255 retcode := '2';
3256 when invalid_type then
3257 errbuf := 'This API does not support payload of type '
3258 || l_obj_type || ' for agent ' || p_agent_name;
3259 retcode := '2';
3260 when others then
3261 errbuf := sqlerrm;
3262 retcode := '2';
3263 end move_msgs_excep2normal;
3264
3265 --
3266 -- Overloaded Procedure 1 : Definition without the AGE parameter
3267 --
3268 -- clean_evt
3269 -- Procedure to purge the messages in the READY state of a Queue
3270 -- of WF_EVENT_T or AQ$_JMS_TEXT_MESSAGE payload type. Supports correlation id based purge.
3271 --
3272 -- IN
3273 -- p_agent_name - Agent Name
3274 -- p_correlation - Correlation ID (Default Value : NULL)
3275 -- p_commit_frequency - Commit Level (Default Value : 500)
3276 --
3277 -- OUT
3278 -- p_msg_count - Count of the number of purged messages
3279 --
3280 procedure clean_evt(p_agent_name in varchar2,
3281 p_correlation in varchar2 default NULL,
3282 p_commit_frequency in number default 500,
3283 p_msg_count out nocopy number)
3284 as
3285 l_xcount integer;
3286 l_timeout integer;
3287 l_pos number := 0;
3288 l_schema varchar2(80);
3289 l_qname varchar2(80);
3290 l_queue_name varchar2(80);
3291 l_account_name varchar2(30);
3292 l_payload wf_event_t;
3293 l_msgid raw(16);
3294 l_message_handle raw(16) := NULL;
3295 l_dequeue_options dbms_aq.dequeue_options_t;
3296 l_message_properties dbms_aq.message_properties_t;
3297
3298 -- Bug 6112028
3299 l_data_type VARCHAR2(106);
3300 l_payload_jms SYS.AQ$_JMS_TEXT_MESSAGE;
3301
3302 --Define the snapshot too old error
3303 snap_too_old exception;
3304 pragma exception_init(snap_too_old, -1555);
3305
3306 begin
3307 p_msg_count := 0;
3308 l_timeout := 0;
3309 l_xcount := 0;
3310
3311 SELECT queue_name
3312 INTO l_queue_name
3313 FROM wf_agents
3314 WHERE name = upper(p_agent_name)
3315 AND SYSTEM_GUID = wf_event.local_system_guid;
3316
3317 l_pos := instr(l_queue_name, '.', 1, 1);
3318 l_schema := substr(l_queue_name, 1, l_pos-1);
3319 l_qname := substr(l_queue_name, l_pos+1);
3320
3321 SELECT TRIM(object_type)
3322 INTO l_data_type
3323 FROM all_queue_tables
3324 WHERE queue_table in
3325 (
3326 SELECT queue_table
3327 FROM all_queues
3328 WHERE name = l_qname
3329 AND owner = l_schema
3330 )
3331 AND owner=l_schema;
3332
3333 l_pos := instr(l_data_type, '.', 1, 1);
3334 l_data_type := substr(l_data_type, l_pos+1);
3335
3336 --No processing is done on the payload data
3337 --So dequeue is done in the REMOVE_NODATA mode
3338 l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
3339 l_dequeue_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
3340 l_dequeue_options.wait := dbms_aq.NO_WAIT;
3341 l_dequeue_options.consumer_name := upper(p_agent_name);
3342
3343 --Set the Correlation ID for dequeue only if available
3344 --If the given agent is a Workflow Agent then append the
3345 --Account Name before the Correlation ID
3346 if ((p_correlation is not null) or (p_correlation <> '')) then
3347 -- Seeded WF agents
3348 if (upper(p_agent_name) like 'WF_%') then
3349 if (wf_event.account_name is null) then
3350 wf_event.SetAccountName;
3351 end if;
3352 l_dequeue_options.correlation := wf_event.account_name
3353 || ':'
3354 || p_correlation;
3355 else
3356 l_dequeue_options.correlation := p_correlation;
3357 end if;
3358 end if;
3359
3360 -- All the messages with the given correlation id are to be purged
3361 -- In this case, the $fnd/sql/wfevqcln.sql script logic is followed
3362 -- The dequeue is based on the given correlation id
3363 while (l_timeout = 0) loop
3364 begin
3365
3366 if (l_data_type = 'WF_EVENT_T') then
3367 dbms_aq.Dequeue(queue_name => l_queue_name,
3368 dequeue_options => l_dequeue_options,
3369 message_properties => l_message_properties, /* OUT */
3370 payload => l_payload, /* OUT */
3371 msgid => l_message_handle); /* OUT */
3372 elsif l_data_type = 'AQ$_JMS_TEXT_MESSAGE' then
3373 dbms_aq.Dequeue(queue_name => l_queue_name,
3374 dequeue_options => l_dequeue_options,
3375 message_properties => l_message_properties, /* OUT */
3376 payload => l_payload_jms, /* OUT */
3377 msgid => l_message_handle); /* OUT */
3378 else
3379 -- Payload not supported by this API, raise application error
3380 Wf_core.token('PAYLOAD', l_data_type);
3381 Wf_core.raise('WFE_PAYLOAD_UNSUPP');
3382 end if;
3383
3384 l_xcount := l_xcount + 1;
3385 l_timeout := 0;
3386
3387 exception
3388 when dequeue_disabled then
3389 raise;
3390 when dequeue_timeout then
3391 l_timeout := 1;
3392 --Capture the snapshot too old error
3393 when snap_too_old then
3394 --Workaround for AQ when receiving ORA-01555 using NEXT_MESSAGE as
3395 --navigation. We will try to set to FIRST_MESSAGE and dequeue to
3396 --silently handle this exception.
3397 if (l_dequeue_options.navigation = dbms_aq.FIRST_MESSAGE) then
3398 raise;
3399 else
3400 l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
3401
3402 if (l_data_type = 'WF_EVENT_T') then
3403 dbms_aq.Dequeue(queue_name => l_queue_name,
3404 dequeue_options => l_dequeue_options,
3405 message_properties => l_message_properties, /* OUT */
3406 payload => l_payload, /* OUT */
3407 msgid => l_message_handle); /* OUT */
3408
3409 elsif l_data_type = 'AQ$_JMS_TEXT_MESSAGE' then
3410 dbms_aq.Dequeue(queue_name => l_queue_name,
3411 dequeue_options => l_dequeue_options,
3412 message_properties => l_message_properties, /* OUT */
3413 payload => l_payload_jms, /* OUT */
3414 msgid => l_message_handle); /* OUT */
3415 else
3416 -- Payload not supported by this API, raise application error
3417 Wf_core.token('PAYLOAD', l_data_type);
3418 Wf_core.raise('WFE_PAYLOAD_UNSUPP');
3419 end if;
3420
3421 l_xcount := l_xcount + 1;
3422 l_timeout := 0;
3423 end if;
3424 when others then
3425 raise;
3426 end;
3427
3428 l_dequeue_options.navigation := dbms_aq.NEXT_MESSAGE;
3429 --Commit if commit frequency
3430 if l_xcount >= p_commit_frequency then
3431 commit;
3432 p_msg_count := p_msg_count + l_xcount;
3433 l_xcount := 0;
3434 end if;
3435 end loop;
3436
3437 commit;
3438 p_msg_count := p_msg_count + l_xcount;
3439
3440 exception
3441 when others then
3442 Wf_core.Context('WF_QUEUE', 'Clean_evt', p_agent_name,
3443 p_correlation, to_char(p_commit_frequency));
3444 raise;
3445 end clean_evt;
3446
3447 --
3448 -- Overloaded Procedure 2 : Definition with the AGE parameter
3449 --
3450 -- clean_evt
3451 -- Procedure to purge the messages in the READY state of a Queue
3452 -- of WF_EVENT_T or AQ$_JMS_TEXT_MESSAGE payload type. Supports time-based selective
3453 -- purge with correlation id.
3454 --
3455 -- IN
3456 -- p_agent_name - Agent Name
3457 -- p_correlation - Correlation ID (Default Value : NULL)
3458 -- p_commit_frequency - Commit Level (Default Value : 500)
3459 -- p_age - Age of the Messages (No default value
3460 -- as this is a overloaded procedure)
3461 --
3462 -- OUT
3463 -- p_msg_count - Count of the number of purged messages
3464 --
3465 procedure clean_evt(p_agent_name in varchar2,
3466 p_correlation in varchar2 default NULL,
3467 p_commit_frequency in number default 500,
3468 p_msg_count out nocopy number,
3469 p_age in number)
3470 as
3471 l_xcount integer;
3472 l_pos number := 0;
3473 l_schema varchar2(80);
3474 l_qname varchar2(80);
3475 l_corrid varchar2(128);
3476 l_queue_name varchar2(80);
3477 l_account_name varchar2(30);
3478 l_payload wf_event_t;
3479 l_msgid raw(16);
3480 l_message_handle raw(16) := NULL;
3481 l_dequeue_options dbms_aq.dequeue_options_t;
3482 l_message_properties dbms_aq.message_properties_t;
3483
3484 -- Bug 6112028
3485 l_data_type VARCHAR2(106);
3486 l_payload_jms SYS.AQ$_JMS_TEXT_MESSAGE;
3487
3488 -- Cursor to get all messages from the queue that were enqueued before
3489 -- a given date.
3490 TYPE c_msgs_typ IS REF CURSOR;
3491 c_msgs c_msgs_typ;
3492 --Define the snapshot too old error
3493 snap_too_old exception;
3494 pragma exception_init(snap_too_old, -1555);
3495 begin
3496 p_msg_count := 0;
3497 l_xcount := 0;
3498
3499 SELECT queue_name
3500 INTO l_queue_name
3501 FROM wf_agents
3502 WHERE name = upper(p_agent_name)
3503 AND SYSTEM_GUID = wf_event.local_system_guid;
3504
3505 l_pos := instr(l_queue_name, '.', 1, 1);
3506 l_schema := substr(l_queue_name, 1, l_pos-1);
3507 l_qname := substr(l_queue_name, l_pos+1);
3508
3509 SELECT TRIM(object_type)
3510 INTO l_data_type
3511 FROM all_queue_tables
3512 WHERE queue_table in
3513 (
3514 SELECT queue_table
3515 FROM all_queues
3516 WHERE name = l_qname
3517 AND owner = l_schema
3518 )
3519 AND owner=l_schema;
3520 -- Query from the AQ view table
3521 l_qname := l_schema || '.AQ$' || l_qname;
3522
3523 l_pos := instr(l_data_type, '.', 1, 1);
3524 l_data_type := substr(l_data_type, l_pos+1);
3525
3526 --No processing is done on the payload data
3527 --So dequeue is done in the REMOVE_NODATA mode
3528 l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
3529 l_dequeue_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
3530 l_dequeue_options.wait := dbms_aq.NO_WAIT;
3531 l_dequeue_options.consumer_name := upper(p_agent_name);
3532 --
3533 --Set the Correlation ID for dequeue only if available
3534 --If the given agent is a Workflow Agent then append the
3535 --Account Name before the Correlation ID
3536 --
3537 -- All the message ids older than the specified age are queried
3538 -- and the dequeue is done on the retrieved message ids
3539 --
3540 if ((p_correlation is not null) or (p_correlation <> '')) then
3541 -- Seeded WF agents
3542 if (upper(p_agent_name) like 'WF_%') then
3543 if (wf_event.account_name is null) then
3544 wf_event.SetAccountName;
3545 end if;
3546 l_corrid := wf_event.account_name
3547 || ':'
3548 || p_correlation;
3549 else
3550 l_corrid := p_correlation;
3551 end if;
3552 -- The dequeue should be based on the msg ids retrieved in
3553 -- the following query, not on any correlation id.
3554 -- So the l_dequeue_options.correlation is not set.
3555 OPEN c_msgs FOR
3556 'SELECT msg_id FROM '
3557 || l_qname
3558 || ' WHERE msg_state = ''' || 'READY'' '
3559 || ' AND enq_time < (sysdate - :1) '
3560 || ' AND corr_id like :2 ' using p_age,l_corrid;
3561 else
3562 -- If the given correlation is null then the query do not
3563 -- need it, as we consider a null correlation to be %
3564 -- The dequeue_options.correlation will be null by default
3565 OPEN c_msgs FOR
3566 'SELECT msg_id FROM '
3567 || l_qname
3568 || ' WHERE msg_state = ''' || 'READY'' '
3569 || ' AND enq_time < (sysdate - :1) ' using p_age;
3570 end if;
3571
3572 -- Dequeue messages based on the msg id
3573 loop
3574 fetch c_msgs into l_msgid;
3575 exit when c_msgs%notfound;
3576 l_dequeue_options.msgid := l_msgid;
3577 begin
3578
3579 if (l_data_type = 'WF_EVENT_T') then
3580 dbms_aq.Dequeue(queue_name => l_queue_name,
3581 dequeue_options => l_dequeue_options,
3582 message_properties => l_message_properties,
3583 payload => l_payload,
3584 msgid => l_message_handle);
3585 elsif l_data_type = 'AQ$_JMS_TEXT_MESSAGE' then
3586
3587 dbms_aq.Dequeue(queue_name => l_queue_name,
3588 dequeue_options => l_dequeue_options,
3589 message_properties => l_message_properties,
3590 payload => l_payload_jms,
3591 msgid => l_message_handle);
3592 else
3593 -- Payload not supported by this API, raise application error
3594 Wf_core.token('PAYLOAD', l_data_type);
3595 Wf_core.raise('WFE_PAYLOAD_UNSUPP');
3596 end if;
3597 l_xcount := l_xcount + 1;
3598
3599 exception
3600 when dequeue_disabled then
3601 raise;
3602 when snap_too_old then
3603 --Workaround for AQ when receiving ORA-01555 using NEXT_MESSAGE as
3604 --navigation. We will try to set to FIRST_MESSAGE and dequeue to
3605 --silently handle this exception.
3606 if (l_dequeue_options.navigation = dbms_aq.FIRST_MESSAGE) then
3607 raise;
3608 else
3609 l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE;
3610
3611 if (l_data_type = 'WF_EVENT_T') then
3612 dbms_aq.Dequeue(queue_name => l_queue_name,
3613 dequeue_options => l_dequeue_options,
3614 message_properties => l_message_properties, /* OUT */
3615 payload => l_payload, /* OUT */
3616 msgid => l_message_handle); /* OUT */
3617
3618 elsif l_data_type = 'AQ$_JMS_TEXT_MESSAGE' then
3619 dbms_aq.Dequeue(queue_name => l_queue_name,
3620 dequeue_options => l_dequeue_options,
3621 message_properties => l_message_properties, /* OUT */
3622 payload => l_payload_jms, /* OUT */
3623 msgid => l_message_handle); /* OUT */
3624 else
3625 -- Payload not supported by this API, raise application error
3626 Wf_core.token('PAYLOAD', l_data_type);
3627 Wf_core.raise('WFE_PAYLOAD_UNSUPP');
3628 end if;
3629
3630 l_xcount := l_xcount + 1;
3631 end if;
3632 when others then
3633 raise;
3634 end; -- cursor begin
3635
3636 -- Commit if commit frequency
3637 if l_xcount >= p_commit_frequency then
3638 commit;
3639 p_msg_count := p_msg_count + l_xcount;
3640 l_xcount := 0;
3641 end if;
3642 end loop;
3643
3644 commit;
3645 p_msg_count := p_msg_count + l_xcount;
3646
3647 exception
3648 when others then
3649 Wf_core.Context('WF_QUEUE', 'Clean_evt', p_agent_name, p_correlation,
3650 to_char(p_commit_frequency), to_char(p_age));
3651 raise;
3652 end clean_evt;
3653 --------------------------------------------------------------------------------
3654 /*
3655 ** Bug 4005674 - Populate Continuous Loop Global Variables
3656 */
3657 begin
3658 wf_queue.g_defer_occurrence := 100;
3659 wf_queue.g_add_delay_seconds := 300;
3660 wf_queue.g_max_delay_seconds := 3600;
3661 --------------------------------------------------------------------------------
3662 end WF_QUEUE;