[Home] [Help]
PACKAGE BODY: APPS.OKC_AQ_PVT
Source
1 Package body OKC_AQ_PVT AS
2 /* $Header: OKCRAQB.pls 120.0 2005/05/25 18:34:40 appldev noship $ */
3
4 l_debug VARCHAR2(1) := NVL(FND_PROFILE.VALUE('AFLOG_ENABLED'),'N');
5 ---------------------------------------------------------------------------
6 -- GLOBAL DATASTRUCTURES
7 ---------------------------------------------------------------------------
8 -- TYPES
9 ---------------------------------------------------------------------------
10 -- CONSTANTS
11 g_pkg_name constant varchar2(100) := 'OKC_AQ_PVT';
12 ---------------------------------------------------------------------------
13 -- PUBLIC VARIABLES
14 ---------------------------------------------------------------------------
15 -- EXCEPTIONS
16 ---------------------------------------------------------------------------
17
18 --
19
20
21 -----------------------------------
22 -- Private procedures and functions
23 -----------------------------------
24
25 FUNCTION get_acn_type (p_corrid IN VARCHAR2)
26 RETURN VARCHAR2
27 IS
28 CURSOR acn_cur
29 IS
30 SELECT acn_type
31 FROM okc_actions_b
32 WHERE correlation = p_corrid;
33 acn_rec acn_cur%ROWTYPE;
34 v_acn_type okc_actions_b.acn_type%TYPE;
35
36 --
37 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'get_acn_type';
38 --
39
40 BEGIN
41 IF (l_debug = 'Y') THEN
42 okc_debug.Set_Indentation(l_proc);
43 okc_debug.Log('10: Entering ',2);
44 END IF;
45
46 OPEN acn_cur;
47 FETCH acn_cur INTO acn_rec;
48 IF acn_cur%NOTFOUND THEN
49 IF (l_debug = 'Y') THEN
50 okc_debug.Log('1000: Leaving ',2);
51 okc_debug.Reset_Indentation;
52 END IF;
53 RETURN('Not Available');
54 ELSE
55 v_acn_type := acn_rec.acn_type;
56
57 IF (l_debug = 'Y') THEN
58 okc_debug.Log('2000: Leaving ',2);
59 okc_debug.Reset_Indentation;
60 END IF;
61
62 RETURN(v_acn_type);
63 END IF;
64
65
66 EXCEPTION
67 WHEN others THEN
68 IF (l_debug = 'Y') THEN
69 okc_debug.Log('3000: Leaving ',2);
70 okc_debug.Reset_Indentation;
71 END IF;
72 RETURN('Not Available');
73 END get_acn_type;
74
75
76 PROCEDURE enqueue_message (
77 p_clob_msg IN system.okc_aq_msg_typ
78 , p_corrid_rec IN corrid_rec_typ
79 , p_queue_name IN VARCHAR2
80 , p_delay IN NUMBER
81 , x_msg_handle OUT NOCOPY RAW)
82 IS
83 v_nq_options dbms_aq.enqueue_options_t;
84 v_msg_prop dbms_aq.message_properties_t;
85 v_msg_handle raw(16);
86 v_recipient_list DBMS_AQ.aq$_recipient_list_t;
87 --
88 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'enqueue_message';
89 --
90
91 BEGIN
92 IF (l_debug = 'Y') THEN
93 okc_debug.Set_Indentation(l_proc);
94 okc_debug.Log('10: Entering ',2);
95 END IF;
96
97
98 v_msg_prop.recipient_list := v_recipient_list;
99 v_msg_prop.expiration := okc_aq_pvt.g_msg_expire;
100 v_msg_prop.correlation := p_corrid_rec.corrid;
101 v_msg_prop.delay := p_delay;
102 v_nq_options.visibility := dbms_aq.ON_COMMIT;
103 DBMS_AQ.enqueue ( queue_name => p_queue_name
104 , enqueue_options => v_nq_options
105 , message_properties => v_msg_prop
106 , payload => p_clob_msg
107 , msgid => v_msg_handle );
108 x_msg_handle := v_msg_handle;
109
110 IF (l_debug = 'Y') THEN
111 okc_debug.Log('1000: Leaving ',2);
112 okc_debug.Reset_Indentation;
113 END IF;
114
115 END enqueue_message;
116
117
118
119 PROCEDURE build_char_clob ( p_msg IN VARCHAR2,
120 x_char_clob OUT NOCOPY system.okc_aq_msg_typ )
121 IS
122 l_char_clob system.okc_aq_msg_typ;
123 --
124 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'build_char_clob';
125 --
126
127 BEGIN
128 IF (l_debug = 'Y') THEN
129 okc_debug.Set_Indentation(l_proc);
130 okc_debug.Log('10: Entering ',2);
131 END IF;
132
133
134 l_char_clob := system.okc_aq_msg_typ(empty_clob());
135 DBMS_LOB.createtemporary ( l_char_clob.body
136 , TRUE
137 , DBMS_LOB.session );
138 DBMS_LOB.write ( l_char_clob.body
139 , length(p_msg)
140 , 1
141 , p_msg );
142 x_char_clob := l_char_clob;
143 IF (l_debug = 'Y') THEN
144 okc_debug.Log('1000: Leaving ',2);
145 okc_debug.Reset_Indentation;
146 END IF;
147
148
149 EXCEPTION
150 WHEN others THEN
151 IF (l_debug = 'Y') THEN
152 okc_debug.Log('2000: Leaving ',2);
153 okc_debug.Reset_Indentation;
154 END IF;
155 null;
156 END build_char_clob;
157
158 PROCEDURE clob_to_char ( p_clob_msg IN system.okc_aq_msg_typ
159 , x_char_msg OUT NOCOPY varchar2)
160 IS
161 l_amount INTEGER;
162 l_clob_msg system.okc_aq_msg_typ;
163 --
164 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'clob_to_char';
165 --
166
167 BEGIN
168 IF (l_debug = 'Y') THEN
169 okc_debug.Set_Indentation(l_proc);
170 okc_debug.Log('10: Entering ',2);
171 END IF;
172
173
174 l_clob_msg := system.okc_aq_msg_typ (empty_clob());
175 dbms_lob.createtemporary(l_clob_msg.body,TRUE,dbms_lob.session);
176 l_clob_msg := p_clob_msg;
177 l_amount := DBMS_LOB.getlength ( l_clob_msg.body );
178 -- read clob message into char variable
179 DBMS_LOB.read ( l_clob_msg.body
180 , l_amount
181 , 1
182 , x_char_msg);
183 IF (l_debug = 'Y') THEN
184 okc_debug.Log('1000: Leaving ',2);
185 okc_debug.Reset_Indentation;
186 END IF;
187
188 END clob_to_char;
189
190 PROCEDURE code_dots (
191 p_name IN VARCHAR2 ,
192 x_name OUT NOCOPY VARCHAR2 )
193 IS
194 --
195 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'code_dots';
196 --
197
198 BEGIN
199 IF (l_debug = 'Y') THEN
200 okc_debug.Set_Indentation(l_proc);
201 okc_debug.Log('10: Entering ',2);
202 okc_debug.Log('20: p_name : '||p_name,2);
203 END IF;
204
205 x_name := translate (p_name, '.', '#');
206
207 IF (l_debug = 'Y') THEN
208 okc_debug.Log('50: x_name : '||x_name,2);
209 okc_debug.Log('1000: Leaving ',2);
210 okc_debug.Reset_Indentation;
211 END IF;
212
213 END code_dots;
214
215 PROCEDURE decode_dots (
216 p_name IN VARCHAR2
217 , x_name OUT NOCOPY VARCHAR2 )
218 IS
219 --
220 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'decode_dots';
221 --
222
223 BEGIN
224 IF (l_debug = 'Y') THEN
225 okc_debug.Set_Indentation(l_proc);
226 okc_debug.Log('10: Entering ',2);
227 okc_debug.Log('20: p_name : '||p_name,2);
228 END IF;
229
230 x_name := translate (p_name, '#', '.');
231
232 IF (l_debug = 'Y') THEN
233 okc_debug.Log('50: x_name : '||x_name,2);
234 okc_debug.Log('1000: Leaving ',2);
235 okc_debug.Reset_Indentation;
236 END IF;
237
238
239 END decode_dots;
240
241 ---------------------------------------------------------------------------
242 -- Procedures and Functions
243 ---------------------------------------------------------------------------
244
245 PROCEDURE send_message
246 (p_api_version IN NUMBER,
247 p_init_msg_list IN VARCHAR2 ,
248 p_commit IN VARCHAR2 ,
249 x_msg_count OUT NOCOPY NUMBER,
250 x_msg_data OUT NOCOPY VARCHAR2,
251 x_return_status OUT NOCOPY VARCHAR2,
252 p_corrid_rec IN corrid_rec_typ,
253 p_msg_tab IN msg_tab_typ,
254 p_queue_name IN VARCHAR2,
255 p_delay IN INTEGER
256 )
257 IS
258 l_api_name CONSTANT VARCHAR2(30) := 'SEND_MESSAGE';
259 l_api_version CONSTANT NUMBER := 1.0;
260 l_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
261 l_xml_clob system.okc_aq_msg_typ;
262 x_msg_handle RAW(16);
263 OKC_ENQUEUE_FAILED EXCEPTION;
264 PRAGMA EXCEPTION_INIT (OKC_ENQUEUE_FAILED, -99095 );
265 --
266 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'send_message';
267 --
268
269 BEGIN
270 IF (l_debug = 'Y') THEN
271 okc_debug.Set_Indentation(l_proc);
272 okc_debug.Log('10: Entering ',2);
273 END IF;
274
275 -- Call start_activity to create savepoint, check compatibility
276 -- and initialize message list
277 l_return_status := OKC_API.START_ACTIVITY ( l_api_name
278 , g_pkg_name
279 , p_init_msg_list
280 , l_api_version
281 , p_api_version
282 , '_PVT'
283 , x_return_status
284 );
285
286 IF (l_debug = 'Y') THEN
287 okc_debug.Log('20: l_return_status : '||l_return_status,2);
288 END IF;
289
290 -- Check if activity started successfully
291 IF (l_return_status = OKC_API.G_RET_STS_UNEXP_ERROR) THEN
292 RAISE OKC_API.G_EXCEPTION_UNEXPECTED_ERROR;
293 ELSIF (l_return_status = OKC_API.G_RET_STS_ERROR) THEN
294 RAISE OKC_API.G_EXCEPTION_ERROR;
295 END IF;
296 IF p_corrid_rec.corrid IS NOT NULL AND
297 p_msg_tab.count > 0 THEN
298
299 -- build XML clob
300 IF (l_debug = 'Y') THEN
301 okc_debug.Log('50: Calling okc_xml_pvt.build_xml_clob ',2);
302 END IF;
303
304 okc_xml_pvt.build_xml_clob (
305 p_corrid_rec
306 , p_msg_tab
307 , l_xml_clob );
308
309 IF (l_debug = 'Y') THEN
310 okc_debug.Log('55: After Call To okc_xml_pvt.build_xml_clob ',2);
311 END IF;
312
313 END IF;
314
315 -- enqueue message
316 IF (l_debug = 'Y') THEN
317 okc_debug.Log('60: Calling enqueue_message ',2);
318 END IF;
319
320 enqueue_message ( l_xml_clob
321 , p_corrid_rec
322 , p_queue_name
323 , p_delay
324 , x_msg_handle);
325
326 IF (l_debug = 'Y') THEN
327 okc_debug.Log('70: After Call To enqueue_message ',2);
328 END IF;
329
330 IF x_msg_handle IS NULL THEN
331 IF (l_debug = 'Y') THEN
332 okc_debug.Log('100: Raising OKC_ENQUEUE_FAILED ',2);
333 okc_debug.Log('100: Leaving ',2);
334 okc_debug.Reset_Indentation;
335 END IF;
336 RAISE OKC_ENQUEUE_FAILED;
337 END IF;
338 -- end activity
339 OKC_API.END_ACTIVITY ( x_msg_count
340 , x_msg_data );
341 IF (l_debug = 'Y') THEN
342 okc_debug.Log('1000: Leaving ',2);
343 okc_debug.Reset_Indentation;
344 END IF;
345
346
347 EXCEPTION
348 WHEN OKC_API.G_EXCEPTION_UNEXPECTED_ERROR THEN
349 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
350 l_api_name
351 , g_pkg_name
352 , 'OKC_API.G_EXCEPTION_UNEXPECTED_ERROR'
353 , x_msg_count
354 , x_msg_data
355 , '_PVT'
356 );
357 IF (l_debug = 'Y') THEN
358 okc_debug.Log('2000: Leaving ',2);
359 okc_debug.Reset_Indentation;
360 END IF;
361 WHEN OTHERS THEN
362 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
363 l_api_name
364 , g_pkg_name
365 , 'OTHERS'
366 , x_msg_count
367 , x_msg_data
368 , '_PVT'
369 );
370 IF (l_debug = 'Y') THEN
371 okc_debug.Log('3000: Leaving ',2);
372 okc_debug.Reset_Indentation;
373 END IF;
374 END send_message;
375
376
377 /*PROCEDURE send_message
378 (p_api_version IN NUMBER,
379 p_init_msg_list IN VARCHAR2 ,
380 p_commit IN VARCHAR2 ,
381 x_msg_count OUT NOCOPY NUMBER,
382 x_msg_data OUT NOCOPY VARCHAR2,
383 x_return_status OUT NOCOPY VARCHAR2,
384 p_msg IN VARCHAR2,
385 p_queue_name IN VARCHAR2,
386 p_delay IN NUMBER
387 )
388 IS
389 l_api_name CONSTANT VARCHAR2(30) := 'SEND_MESSAGE';
390 l_api_version CONSTANT NUMBER := 1.0;
391 l_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
392 l_char_clob system.okc_aq_msg_typ;
393 x_msg_handle RAW(16);
394 OKC_ENQUEUE_FAILED EXCEPTION;
395 PRAGMA EXCEPTION_INIT (OKC_ENQUEUE_FAILED, -99095 );
396 --
397 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'send_message';
398 --
399
400 BEGIN
401 IF (l_debug = 'Y') THEN
402 okc_debug.Set_Indentation(l_proc);
403 okc_debug.Log('10: Entering ',2);
404 END IF;
405
406 IF p_msg IS NOT NULL THEN
407 -- Call start_activity to create savepoint, check compatibility
408 -- and initialize message list
409 l_return_status := OKC_API.START_ACTIVITY ( l_api_name
410 , g_pkg_name
411 , p_init_msg_list
412 , l_api_version
413 , p_api_version
414 , '_PVT'
415 , x_return_status
416 );
417 -- Check if activity started successfully
418 IF (l_return_status = OKC_API.G_RET_STS_UNEXP_ERROR) THEN
419 RAISE OKC_API.G_EXCEPTION_UNEXPECTED_ERROR;
420 ELSIF (l_return_status = OKC_API.G_RET_STS_ERROR) THEN
421 RAISE OKC_API.G_EXCEPTION_ERROR;
422 END IF;
423
424 -- build character clob
425 build_char_clob ( p_msg, l_char_clob );
426
427 -- enqueue message
428 enqueue_message ( l_char_clob
429 , p_queue_name
430 , p_delay
431 , x_msg_handle);
432
433 IF x_msg_handle IS NULL THEN
434 RAISE OKC_ENQUEUE_FAILED;
435 END IF;
436 -- end activity
437 OKC_API.END_ACTIVITY ( x_msg_count
438 , x_msg_data );
439 END IF;
440 IF (l_debug = 'Y') THEN
441 okc_debug.Log('1000: Leaving ',2);
442 okc_debug.Reset_Indentation;
443 END IF;
444
445
446 EXCEPTION
447 WHEN OKC_API.G_EXCEPTION_UNEXPECTED_ERROR THEN
448 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
449 l_api_name
450 , g_pkg_name
451 , 'OKC_API.G_RET_STS_ERROR'
452 , x_msg_count
453 , x_msg_data
454 , '_PVT'
455 );
456 IF (l_debug = 'Y') THEN
457 okc_debug.Log('2000: Leaving ',2);
458 okc_debug.Reset_Indentation;
459 END IF;
460 WHEN OTHERS THEN
461 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
462 l_api_name
463 , g_pkg_name
464 , 'OTHERS'
465 , x_msg_count
466 , x_msg_data
467 , '_PVT'
468 );
469 IF (l_debug = 'Y') THEN
470 okc_debug.Log('3000: Leaving ',2);
471 okc_debug.Reset_Indentation;
472 END IF;
473 END send_message;*/
474
475 -- This function checks for messages of correlation 'SHUTDOWN'
476 -- if it finds one then it removes it from the queue and returns true
477 -- else returns false.
478 FUNCTION shutdown_event_listener(p_queue IN VARCHAR2) RETURN boolean
479 IS
480
481 l_dq_options dbms_aq.dequeue_options_t;
482 l_msg_prop dbms_aq.message_properties_t;
483 l_msg_handle raw(16);
484 l_msg system.okc_aq_msg_typ;
485 CURSOR queue_cur
486 IS
487 SELECT h.msgid msgid ,
488 decode(h.subscriber#,0
489 ,decode(h.name,'0',NULL,h.name),s.name) consumer_name
490 from AQ$_OKC_AQ_EV_TAB_H h,
491 aq$_okc_aq_ev_tab_s s,
492 okc_aq_ev_tab q
493 WHERE q.q_name = substr(p_queue,5)
494 and q.STATE = 0
495 and h.msgid = q.msgid
496 AND ( (h.subscriber# <> 0 AND h.subscriber# = s.subscriber_id)
497 OR
498 (h.subscriber# = 0 AND h.address# = s.subscriber_id))
499 AND q.CORRID = 'SHUTDOWN';
500 /*
501 changed cursor for performance bug# 1563692
502 SELECT msg_id,consumer_name
503 FROM aq$okc_aq_ev_tab
504 WHERE 'OKC.'||queue = p_queue
505 AND msg_state = 'READY'
506 AND UPPER(corr_id) = 'SHUTDOWN';*/
507 queue_rec queue_cur%ROWTYPE;
508
509
510 --
511 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'shutdown_event_listener';
512 --
513
514 BEGIN
515 -- okc_debug.Set_Indentation(l_proc);
516 -- okc_debug.Log('10: Entering ',2);
517
518 IF queue_cur%ISOPEN THEN
519 CLOSE queue_cur;
520 END IF;
521
522 OPEN queue_cur;
523 FETCH queue_cur INTO queue_rec;
524 IF queue_cur%FOUND THEN
525 l_dq_options.consumer_name := queue_rec.consumer_name;
526 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
527 l_dq_options.msgid := queue_rec.msgid;
528 DBMS_AQ.dequeue ( queue_name => p_queue
529 , dequeue_options => l_dq_options
530 , message_properties => l_msg_prop
531 , payload => l_msg
532 , msgid => l_msg_handle );
533 commit;
534 CLOSE queue_cur;
535
536 -- okc_debug.Log('1000: Leaving ',2);
537 -- okc_debug.Reset_Indentation;
538 RETURN(TRUE);
539 ELSE
540 CLOSE queue_cur;
541 -- okc_debug.Log('2000: Leaving ',2);
542 -- okc_debug.Reset_Indentation;
543 RETURN(FALSE);
544 END IF;
545
546 END shutdown_event_listener;
547
548 -- This function checks for messages of correlation 'SHUTDOWN'
549 -- if it finds one then it removes it from the outcome queue and returns true
550 -- else returns false.
551 FUNCTION shutdown_outcome_listener(p_queue IN VARCHAR2) RETURN boolean
552 IS
553
554 l_dq_options dbms_aq.dequeue_options_t;
555 l_msg_prop dbms_aq.message_properties_t;
556 l_msg_handle raw(16);
557 l_msg system.okc_aq_msg_typ;
558 CURSOR queue_cur
559 IS
560 SELECT h.msgid msgid ,
561 decode(h.subscriber#,0
562 ,decode(h.name,'0',NULL,h.name),s.name) consumer_name
563 from AQ$_OKC_AQ_EV_TAB_H h,
564 aq$_okc_aq_ev_tab_s s,
565 okc_aq_ev_tab q
566 WHERE q.q_name = substr(p_queue,5)
567 and q.STATE = 0
568 and h.msgid = q.msgid
569 AND ( (h.subscriber# <> 0 AND h.subscriber# = s.subscriber_id)
570 OR
571 (h.subscriber# = 0 AND h.address# = s.subscriber_id))
572 AND q.CORRID = 'SHUTDOWN';
573 --changed cursor for performance bug# 1563692
574 /*SELECT msg_id,consumer_name
575 FROM aq$okc_aq_ev_tab
576 WHERE 'OKC.'||queue = p_queue
577 AND msg_state = 'READY'
578 AND UPPER(corr_id) = 'SHUTDOWN';*/
579 queue_rec queue_cur%ROWTYPE;
580
581
582 --
583 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'shutdown_outcome_listener';
584 --
585
586 BEGIN
587 -- okc_debug.Set_Indentation(l_proc);
588 -- okc_debug.Log('10: Entering ',2);
589
590 IF queue_cur%ISOPEN THEN
591 CLOSE queue_cur;
592 END IF;
593
594 OPEN queue_cur;
595 FETCH queue_cur INTO queue_rec;
596 IF queue_cur%FOUND THEN
597
598 l_dq_options.consumer_name := queue_rec.consumer_name;
599 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
600 l_dq_options.msgid := queue_rec.msgid;
601 DBMS_AQ.dequeue ( queue_name => p_queue
602 , dequeue_options => l_dq_options
603 , message_properties => l_msg_prop
604 , payload => l_msg
605 , msgid => l_msg_handle );
606 commit;
607 CLOSE queue_cur;
608
609 -- okc_debug.Log('1000: Leaving ',2);
610 -- okc_debug.Reset_Indentation;
611 RETURN(TRUE);
612 ELSE
613 CLOSE queue_cur;
614 -- okc_debug.Log('2000: Leaving ',2);
615 -- okc_debug.Reset_Indentation;
616 RETURN(FALSE);
617 END IF;
618
619 END shutdown_outcome_listener;
620
621 PROCEDURE listen_event (
622 errbuf OUT NOCOPY VARCHAR2
623 ,retcode OUT NOCOPY VARCHAR2
624 ,p_wait IN INTEGER
625 ,p_sleep IN NUMBER
626 )
627 IS
628 l_agent_list DBMS_AQ.aq$_agent_list_t;
629 l_agent SYS.aq$_agent;
630 l_index integer;
631 l_procedure VARCHAR2(200);
632 l_retval INTEGER;
633 e_listen_timeout EXCEPTION;
634
635 l_listener_iterations NUMBER;
636
637 PRAGMA EXCEPTION_INIT ( e_listen_timeout, -25254);
638 -- bug 3621354 changed cursor to query subscriber view rather than table
639 CURSOR c_subscriber IS
640 SELECT name
641 FROM AQ$OKC_AQ_EV_TAB_S
642 WHERE queue = substr(OKC_AQ_PVT.g_event_queue_name,5);
643 --
644 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'listen_event';
645 --
646
647 BEGIN
648 -- okc_debug.Set_Indentation(l_proc);
649 -- okc_debug.Log('10: Entering ',2);
650
651 retcode := 0;
652 l_index := 1;
653
654 -- okc_debug.Log('50: Before subscriber LOOP',2);
655
656 FOR r_subscriber IN c_subscriber LOOP
657 l_agent_list(l_index) := SYS.aq$_agent ( r_subscriber.name
658 , OKC_AQ_PVT.g_event_queue_name
659 , NULL );
660 l_index := l_index + 1;
661 END LOOP;
662
663 -- okc_debug.Log('60: After subscriber LOOP',2);
664
665
666 -- changed to unconditional loop
667
668 -- okc_debug.Log('70: Before unconditional LOOP',2);
669 /* LOOP -- limit loop in order to enable faster shutdown of listeners */
670 l_listener_iterations := nvl(fnd_profile.value('OKC_LISTEN_EVENT'),100);
671
672 FOR i IN 1..l_listener_iterations LOOP
673
674 /* bug 2258913 - shut down cursor performs poorly when queue load is high
675 Shutdown process no longer needed as listener no longer runs infinately.
676
677 IF shutdown_event_listener(OKC_AQ_PVT.g_event_queue_name) THEN
678 EXIT;
679 ELSE
680 */
681
682 BEGIN
683 DBMS_AQ.listen ( agent_list => l_agent_list
684 , wait => p_wait
685 , agent => l_agent );
686 decode_dots ( l_agent.name, l_procedure );
687 execute immediate 'begin '||l_procedure||'; end;';
688 EXCEPTION
689 WHEN e_listen_timeout THEN
690 exit;
691 null;
692 END;
693 /* END IF; bug 2258913 */
694 /* DBMS_LOCK.sleep (p_sleep ); */
695 END LOOP;
696
697 -- okc_debug.Log('70: After unconditional LOOP',2);
698 -- okc_debug.Log('1000: Leaving ',2);
699 -- okc_debug.Reset_Indentation;
700
701
702 exception
703 when others then
704 retcode := 2;
705 errbuf := substr(sqlerrm,1,250);
706 -- okc_debug.Log('2000: errbuf : '||errbuf,2);
707 -- okc_debug.Log('2000: Leaving ',2);
708 -- okc_debug.Reset_Indentation;
709 END listen_event;
710
711 PROCEDURE listen_outcome(
712 errbuf OUT NOCOPY VARCHAR2
713 ,retcode OUT NOCOPY VARCHAR2
714 ,p_wait IN INTEGER
715 ,p_sleep IN NUMBER
716 )
717
718 IS
719 l_agent_list DBMS_AQ.aq$_agent_list_t;
720 l_agent SYS.aq$_agent;
721 l_index integer;
722 l_procedure VARCHAR2(200);
723 l_retval INTEGER;
724 e_listen_timeout EXCEPTION;
725
726 l_listener_iterations NUMBER;
727
728 PRAGMA EXCEPTION_INIT ( e_listen_timeout, -25254);
729
730 -- bug 3621354 changed cursor to query subscriber view rather than table
731 CURSOR c_subscriber IS
732 SELECT name
733 FROM AQ$OKC_AQ_EV_TAB_S
734 WHERE queue = substr(OKC_AQ_PVT.g_outcome_queue_name,5);
735 --
736 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'listen_outcome';
737 --
738
739 BEGIN
740 -- okc_debug.Set_Indentation(l_proc);
741 -- okc_debug.Log('10: Entering ',2);
742
743 retcode := 0;
744 l_index := 1;
745
746 -- okc_debug.Log('20: Before subscriber LOOP',2);
747
748 FOR r_subscriber IN c_subscriber LOOP
749 l_agent_list(l_index) := SYS.aq$_agent ( r_subscriber.name
750 , OKC_AQ_PVT.g_outcome_queue_name
751 , NULL );
752
753 -- okc_debug.Log('21: Index : '||l_index,2);
754
755 l_index := l_index + 1;
756 END LOOP;
757
758 -- okc_debug.Log('30: After subscriber LOOP',2);
759
760 -- okc_debug.Log('40: Before unconditional LOOP',2);
761
762 l_listener_iterations := nvl(fnd_profile.value('OKC_LISTEN_OUTCOME'),100);
763 /* changed to unconditional loop
764 Changed to conditional loop to allow faster shutdown of listeners
765 and to reduce CPU usage by host system.
766 */
767 FOR i IN 1..l_listener_iterations LOOP
768
769 /* bug 2258913 - shut down cursor performs poorly when queue load is high
770 Shutdown process no longer needed as listener no longer runs infinately.
771
772 IF shutdown_outcome_listener(OKC_AQ_PVT.g_outcome_queue_name) THEN
773 IF (l_debug = 'Y') THEN
774 okc_debug.Log('45: Inside unconditional LOOP - exiting ',2);
775 END IF;
776 EXIT;
777 ELSE
778 */
779 -- okc_debug.Log('46: Inside unconditional LOOP at 46 ',2);
780 BEGIN
781 -- okc_debug.Log('52: Calling DBMS_AQ.listen',2);
782
783 DBMS_AQ.listen ( agent_list => l_agent_list
784 , wait => p_wait
785 , agent => l_agent );
786
787 -- okc_debug.Log('53: Calling decode_dots',2);
788
789 decode_dots ( l_agent.name, l_procedure );
790
791 -- okc_debug.Log('54: After Calling decode_dots',2);
792
793 execute immediate 'begin '||l_procedure||'; end;';
794
795 EXCEPTION
796 WHEN e_listen_timeout THEN
797 exit;
798 null;
799 END;
800 /* END IF; bug 2258913 */
801 /* DBMS_LOCK.sleep (p_sleep ); */
802 END LOOP;
803
804 -- okc_debug.Log('50: After unconditional LOOP',2);
805
806 -- okc_debug.Log('1000: Leaving ',2);
807 -- okc_debug.Reset_Indentation;
808
809
810 exception
811 when others then
812 retcode := 2;
813 errbuf := substr(sqlerrm,1,250);
814 -- okc_debug.Log('2000: Error : '||errbuf,2);
815 -- okc_debug.Log('2000: Leaving ',2);
816 -- okc_debug.Reset_Indentation;
817 END listen_outcome;
818
819
820 PROCEDURE dequeue_event
821 IS
822 l_dq_options dbms_aq.dequeue_options_t;
823 l_msg_prop dbms_aq.message_properties_t;
824 l_msg_handle raw(16);
825 l_msg system.okc_aq_msg_typ;
826 l_sub_name varchar2(30);
827 l_msg_tab OKC_AQ_PVT.msg_tab_typ;
828 l_corrid OKC_AQ_PVT.corrid_rec_typ;
829 l_acn_id OKC_ACTIONS_V.ID%TYPE;
830 l_msg_text VARCHAR2(1000);
831 l_msg_data VARCHAR2(1000);
832 l_msg_count NUMBER;
833 v_msg_data VARCHAR2(1000);
834 v_msg_count NUMBER;
835 v_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
836 l_api_name CONSTANT VARCHAR2(30) := 'DEQUEUE_EVENT';
837 l_api_version CONSTANT NUMBER := 1.0;
838 l_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
839 x_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
840 x_msg_count NUMBER;
841 x_msg_data VARCHAR2(1000);
842 l_token1 VARCHAR2(50);
843 l_token1_value VARCHAR2(50);
844 l_token2 VARCHAR2(50);
845 l_token2_value VARCHAR2(50);
846 CURSOR acn_cur IS
847 SELECT acn.id id
848 FROM okc_actions_b acn
849 WHERE acn.correlation = l_corrid.corrid;
850 acn_rec acn_cur%ROWTYPE;
851 e_dequeue_timeout EXCEPTION;
852 PRAGMA EXCEPTION_INIT (e_dequeue_timeout, -25228 );
853 OKC_PROCESS_FAILED EXCEPTION;
854 OKC_DEQUEUE_FAILED EXCEPTION;
855 OKC_REMOVE_MSG EXCEPTION;
856
857 --
858 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'dequeue_event';
859 --
860
861 BEGIN
862
863 IF (l_debug = 'Y') THEN
864 okc_debug.Set_Indentation(l_proc);
865 okc_debug.Log('10: Entering ',2);
866 END IF;
867
868 -- Call start_activity to create savepoint, check compatibility
869 -- and initialize message list
870 l_return_status := OKC_API.START_ACTIVITY ( l_api_name
871 , g_pkg_name
872 , OKC_API.G_FALSE
873 , l_api_version
874 , 1.0
875 , '_PVT'
876 , x_return_status
877 );
878 IF (l_debug = 'Y') THEN
879 okc_debug.Log('20: l_return_status : '||l_return_status,2);
880 END IF;
881
882 -- Check if activity started successfully
883 IF (l_return_status = OKC_API.G_RET_STS_UNEXP_ERROR) THEN
884 RAISE OKC_API.G_EXCEPTION_UNEXPECTED_ERROR;
885 ELSIF (l_return_status = OKC_API.G_RET_STS_ERROR) THEN
886 RAISE OKC_API.G_EXCEPTION_ERROR;
887 END IF;
888 -- convert dots to hashes to make sub name legal
889 code_dots ('OKC_AQ_PVT.DEQUEUE_EVENT', l_sub_name);
890 l_dq_options.consumer_name := l_sub_name;
891 l_dq_options.wait := OKC_AQ_PVT.g_dequeue_wait;
892 l_dq_options.navigation := dbms_aq.first_message;
893 l_dq_options.dequeue_mode := dbms_aq.LOCKED;
894 l_dq_options.visibility := dbms_aq.on_commit;
895
896
897 IF (l_debug = 'Y') THEN
898 okc_debug.Log('30: Calling DBMS_AQ.dequeue with following parameters',2);
899 okc_debug.Log('30: queue_name : '||OKC_AQ_PVT.g_event_queue_name,2);
900 END IF;
901
902 -- get the message from the queue
903 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_event_queue_name
904 , dequeue_options => l_dq_options
905 , message_properties => l_msg_prop
906 , payload => l_msg
907 , msgid => l_msg_handle );
908
909 IF (l_debug = 'Y') THEN
910 okc_debug.Log('40: After Call to DBMS_AQ.dequeue',2);
911 END IF;
912
913 -- if the payload is null then remove message from the queue and commit
914 IF l_msg IS NULL THEN
915 OKC_API.SET_MESSAGE(p_app_name => g_app_name
916 ,p_msg_name => 'OKC_REMOVE_MSG'
917 ,p_token1 => 'MSG_ID'
918 ,p_token1_value => RAWTOHEX(l_msg_handle)
919 ,p_token2 => 'CORRID'
920 ,p_token2_value => l_dq_options.correlation
921 );
922 raise OKC_REMOVE_MSG;
923 END IF;
924
925 IF (l_debug = 'Y') THEN
926 okc_debug.Log('50: Calling OKC_XML_PVT.get_element_vals ',2);
927 END IF;
928
929 -- convert the clob message to a table of element names and values
930 OKC_XML_PVT.get_element_vals ( l_msg
931 , l_msg_tab
932 , l_corrid );
933 -- call the event APIs
934 OPEN acn_cur;
935 FETCH acn_cur INTO acn_rec;
936 l_acn_id := acn_rec.id;
937
938 IF (l_debug = 'Y') THEN
939 okc_debug.Log('60: Calling OKC_CONDITION_EVAL_PUB.evaluate_condition ',2);
940 okc_debug.Log('60: l_acn_id : '||l_acn_id,2);
941 END IF;
942
943 OKC_CONDITION_EVAL_PUB.evaluate_condition (
944 p_api_version => 1.0,
945 x_return_status =>l_return_status,
946 x_msg_count =>l_msg_count,
947 x_msg_data =>l_msg_data,
948 p_acn_id =>l_acn_id ,
949 p_msg_tab =>l_msg_tab
950 );
951
952 IF (l_debug = 'Y') THEN
953 okc_debug.Log('70: After Call To OKC_CONDITION_EVAL_PUB.evaluate_condition ',2);
954 okc_debug.Log('70: l_return_status : '||l_return_status,2);
955 END IF;
956
957 -- if evaluation is successfull then remove message from the queue
958 -- and commit the transaction
959 IF NVL(l_return_status,'X') = OKC_API.G_RET_STS_SUCCESS THEN
960 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
961 l_dq_options.msgid := l_msg_handle;
962 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_event_queue_name
963 , dequeue_options => l_dq_options
964 , message_properties => l_msg_prop
965 , payload => l_msg
966 , msgid => l_msg_handle );
967 commit;
968 -------------------------------------------------------------------------
969 -- if the evaluation is not successfull then remove message from the queue
970 -- and rollback so that message will be again available on the queue
971 -- after 30 minutes of delay_retry time period and can be evaluated
972 -- again for a max_retry of 5 times. After 5th try if the evaluation
973 -- still fails then message is moved to exception queue.
974 -------------------------------------------------------------------------
975 ELSIF NVL(l_return_status,'X') <> OKC_API.G_RET_STS_SUCCESS THEN
976 OKC_API.SET_MESSAGE(p_app_name => g_app_name
977 ,p_msg_name => 'OKC_PROCESS_FAILED'
978 ,p_token1 => 'SOURCE'
979 ,p_token1_value => 'Condition Evaluator'
980 ,p_token2 => 'PROCESS'
981 ,p_token2_value => 'Evaluate Condition'
982 );
983 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
984 l_dq_options.msgid := l_msg_handle;
985 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_event_queue_name
986 , dequeue_options => l_dq_options
987 , message_properties => l_msg_prop
988 , payload => l_msg
989 , msgid => l_msg_handle );
990 raise OKC_PROCESS_FAILED;
991 END IF;
992 -- end activity
993 OKC_API.END_ACTIVITY ( l_msg_count
994 , l_msg_data );
995
996 IF (l_debug = 'Y') THEN
997 okc_debug.Log('100: l_msg_count : '||l_msg_count,2);
998 okc_debug.Log('100: l_msg_data : '||l_msg_data,2);
999 END IF;
1000
1001 IF (l_debug = 'Y') THEN
1002 okc_debug.Log('1000: Leaving ',2);
1003 okc_debug.Reset_Indentation;
1004 END IF;
1005
1006
1007 EXCEPTION
1008
1009 WHEN e_dequeue_timeout THEN
1010 IF (l_debug = 'Y') THEN
1011 okc_debug.Log('2000: Leaving ',2);
1012 okc_debug.Reset_Indentation;
1013 END IF;
1014 null;
1015 WHEN OKC_REMOVE_MSG THEN
1016 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
1017 l_dq_options.msgid := l_msg_handle;
1018 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_event_queue_name
1019 , dequeue_options => l_dq_options
1020 , message_properties => l_msg_prop
1021 , payload => l_msg
1022 , msgid => l_msg_handle );
1023 commit;
1024 OKC_AQ_WRITE_ERROR_PVT.WRITE_MSGDATA(
1025 p_api_version => 1.0,
1026 p_init_msg_list => OKC_API.G_TRUE,
1027 p_source_name => 'Advanced Queuing',
1028 p_datetime => sysdate,
1029 p_msg_tab => l_msg_tab,
1030 p_q_name => 'Events Queue',
1031 p_corrid => l_corrid.corrid,
1032 p_msgid => l_msg_handle,
1033 p_msg_count => l_msg_count,
1034 p_msg_data => l_msg_data
1035 );
1036 IF (l_debug = 'Y') THEN
1037 okc_debug.Log('3000: Leaving ',2);
1038 okc_debug.Reset_Indentation;
1039 END IF;
1040
1041 WHEN OTHERS THEN
1042 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
1043 l_api_name
1044 , g_pkg_name
1045 , 'OTHERS'
1046 , x_msg_count
1047 , x_msg_data
1048 , '_PVT'
1049 );
1050 OKC_AQ_WRITE_ERROR_PVT.WRITE_MSGDATA(
1051 p_api_version => 1.0,
1052 p_init_msg_list => OKC_API.G_TRUE,
1053 p_source_name => 'Advanced Queuing',
1054 p_datetime => sysdate,
1055 p_msg_tab => l_msg_tab,
1056 p_q_name => 'Events Queue',
1057 p_corrid => l_corrid.corrid,
1058 p_msgid => l_msg_handle,
1059 p_msg_count => l_msg_count,
1060 p_msg_data => l_msg_data
1061 );
1062 IF (l_debug = 'Y') THEN
1063 okc_debug.Log('4000: Leaving ',2);
1064 okc_debug.Reset_Indentation;
1065 END IF;
1066 END dequeue_event;
1067
1068
1069 -- This procedure deques all actions that are Date Based Actions
1070 PROCEDURE dequeue_date_event
1071 IS
1072 l_dq_options dbms_aq.dequeue_options_t;
1073 l_msg_prop dbms_aq.message_properties_t;
1074 l_msg_handle raw(16);
1075 l_msg system.okc_aq_msg_typ;
1076 l_sub_name varchar2(30);
1077 l_msg_tab OKC_AQ_PVT.msg_tab_typ;
1078 l_corrid OKC_AQ_PVT.corrid_rec_typ;
1079 l_cnh_id OKC_CONDITION_HEADERS_B.ID%TYPE;
1080 l_msg_text VARCHAR2(1000);
1081 l_msg_data VARCHAR2(1000);
1082 l_msg_count NUMBER;
1083 v_msg_data VARCHAR2(1000);
1084 v_msg_count NUMBER;
1085 v_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1086 l_api_name CONSTANT VARCHAR2(30) := 'DEQUEUE_DATE_EVENT';
1087 l_api_version CONSTANT NUMBER := 1.0;
1088 l_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1089 x_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1090 x_msg_count NUMBER;
1091 x_msg_data VARCHAR2(1000);
1092 l_token1 VARCHAR2(50);
1093 l_token1_value VARCHAR2(50);
1094 l_token2 VARCHAR2(50);
1095 l_token2_value VARCHAR2(50);
1096 e_dequeue_timeout EXCEPTION;
1097 PRAGMA EXCEPTION_INIT (e_dequeue_timeout, -25228 );
1098 OKC_PROCESS_FAILED EXCEPTION;
1099 OKC_DEQUEUE_FAILED EXCEPTION;
1100 OKC_REMOVE_MSG EXCEPTION;
1101
1102 --
1103 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'dequeue_date_event';
1104 --
1105
1106 BEGIN
1107 IF (l_debug = 'Y') THEN
1108 okc_debug.Set_Indentation(l_proc);
1109 okc_debug.Log('10: Entering ',2);
1110 END IF;
1111
1112
1113 -- Call start_activity to create savepoint, check compatibility
1114 -- and initialize message list
1115 l_return_status := OKC_API.START_ACTIVITY ( l_api_name
1116 , g_pkg_name
1117 , OKC_API.G_FALSE
1118 , l_api_version
1119 , 1.0
1120 , '_PVT'
1121 , x_return_status
1122 );
1123 -- Check if activity started successfully
1124 IF (l_return_status = OKC_API.G_RET_STS_UNEXP_ERROR) THEN
1125 RAISE OKC_API.G_EXCEPTION_UNEXPECTED_ERROR;
1126 ELSIF (l_return_status = OKC_API.G_RET_STS_ERROR) THEN
1127 RAISE OKC_API.G_EXCEPTION_ERROR;
1128 END IF;
1129 -- convert dots to hashes to make sub name legal
1130 code_dots ('OKC_AQ_PVT.DEQUEUE_DATE_EVENT', l_sub_name);
1131 l_dq_options.consumer_name := l_sub_name;
1132 l_dq_options.wait := OKC_AQ_PVT.g_dequeue_wait;
1133 l_dq_options.navigation := dbms_aq.first_message;
1134 l_dq_options.dequeue_mode := dbms_aq.LOCKED;
1135 l_dq_options.visibility := dbms_aq.on_commit;
1136
1137
1138 -- get the message from the queue
1139
1140 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_event_queue_name
1141 , dequeue_options => l_dq_options
1142 , message_properties => l_msg_prop
1143 , payload => l_msg
1144 , msgid => l_msg_handle );
1145 -- if the payload is null then remove message from the queue and commit
1146 IF l_msg IS NULL THEN
1147 OKC_API.SET_MESSAGE(p_app_name => g_app_name
1148 ,p_msg_name => 'OKC_REMOVE_MSG'
1149 ,p_token1 => 'MSG_ID'
1150 ,p_token1_value => RAWTOHEX(l_msg_handle)
1151 ,p_token2 => 'CORRID'
1152 ,p_token2_value => l_dq_options.correlation
1153 );
1154 raise OKC_REMOVE_MSG;
1155 END IF;
1156
1157 -- convert the clob message to a table of element names and values
1158 OKC_XML_PVT.get_element_vals ( l_msg
1159 , l_msg_tab
1160 , l_corrid );
1161 -- get the condition header id from msg table
1162 FOR i IN 1..l_msg_tab.COUNT LOOP
1163 IF l_msg_tab(i).element_name = 'CNH_ID' THEN
1164 l_cnh_id := l_msg_tab(i).element_value;
1165 END IF;
1166 END LOOP;
1167 -- call the event APIs
1168 OKC_CONDITION_EVAL_PUB.evaluate_date_condition (
1169 p_api_version => 1.0,
1170 x_return_status =>l_return_status,
1171 x_msg_count =>l_msg_count,
1172 x_msg_data =>l_msg_data,
1173 p_cnh_id =>l_cnh_id ,
1174 p_msg_tab =>l_msg_tab
1175 );
1176 -- if evaluation is successfull then remove message from the queue
1177 -- and commit the transaction
1178 IF NVL(l_return_status,'X') = OKC_API.G_RET_STS_SUCCESS THEN
1179 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
1180 l_dq_options.msgid := l_msg_handle;
1181 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_event_queue_name
1182 , dequeue_options => l_dq_options
1183 , message_properties => l_msg_prop
1184 , payload => l_msg
1185 , msgid => l_msg_handle );
1186 commit;
1187 -------------------------------------------------------------------------
1188 -- if the evaluation is not successfull then remove message from the queue
1189 -- and rollback so that message will be again available on the queue
1190 -- after 30 minutes of delay_retry time period and can be evaluated
1191 -- again for a max_retry of 5 times. After 5th try if the evaluation
1192 -- still fails then message is moved to exception queue.
1193 -------------------------------------------------------------------------
1194 ELSIF NVL(l_return_status,'X') <> OKC_API.G_RET_STS_SUCCESS THEN
1195 OKC_API.SET_MESSAGE(p_app_name => g_app_name
1196 ,p_msg_name => 'OKC_PROCESS_FAILED'
1197 ,p_token1 => 'SOURCE'
1198 ,p_token1_value => 'Condition Evaluator'
1199 ,p_token2 => 'PROCESS'
1200 ,p_token2_value => 'Evaluate Date Condition'
1201 );
1202 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
1203 l_dq_options.msgid := l_msg_handle;
1204 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_event_queue_name
1205 , dequeue_options => l_dq_options
1206 , message_properties => l_msg_prop
1207 , payload => l_msg
1208 , msgid => l_msg_handle );
1209 raise OKC_PROCESS_FAILED;
1210 END IF;
1211 -- end activity
1212 OKC_API.END_ACTIVITY ( l_msg_count
1213 , l_msg_data );
1214
1215 IF (l_debug = 'Y') THEN
1216 okc_debug.Log('1000: Leaving ',2);
1217 okc_debug.Reset_Indentation;
1218 END IF;
1219
1220
1221 EXCEPTION
1222
1223 WHEN e_dequeue_timeout THEN
1224 IF (l_debug = 'Y') THEN
1225 okc_debug.Log('2000: Leaving ',2);
1226 okc_debug.Reset_Indentation;
1227 END IF;
1228 null;
1229 WHEN OKC_REMOVE_MSG THEN
1230 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
1231 l_dq_options.msgid := l_msg_handle;
1232 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_event_queue_name
1233 , dequeue_options => l_dq_options
1234 , message_properties => l_msg_prop
1235 , payload => l_msg
1236 , msgid => l_msg_handle );
1237 commit;
1238 OKC_AQ_WRITE_ERROR_PVT.WRITE_MSGDATA(
1239 p_api_version => 1.0,
1240 p_init_msg_list => OKC_API.G_TRUE,
1241 p_source_name => 'Advanced Queuing',
1242 p_datetime => sysdate,
1243 p_msg_tab => l_msg_tab,
1244 p_q_name => 'Events Queue',
1245 p_corrid => l_corrid.corrid,
1246 p_msgid => l_msg_handle,
1247 p_msg_count => l_msg_count,
1248 p_msg_data => l_msg_data
1249 );
1250 IF (l_debug = 'Y') THEN
1251 okc_debug.Log('3000: Leaving ',2);
1252 okc_debug.Reset_Indentation;
1253 END IF;
1254
1255 WHEN OTHERS THEN
1256 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
1257 l_api_name
1258 , g_pkg_name
1259 , 'OTHERS'
1260 , x_msg_count
1261 , x_msg_data
1262 , '_PVT'
1263 );
1264 OKC_AQ_WRITE_ERROR_PVT.WRITE_MSGDATA(
1265 p_api_version => 1.0,
1266 p_init_msg_list => OKC_API.G_TRUE,
1267 p_source_name => 'Advanced Queuing',
1268 p_datetime => sysdate,
1269 p_msg_tab => l_msg_tab,
1270 p_q_name => 'Events Queue',
1271 p_corrid => l_corrid.corrid,
1272 p_msgid => l_msg_handle,
1273 p_msg_count => l_msg_count,
1274 p_msg_data => l_msg_data
1275 );
1276 IF (l_debug = 'Y') THEN
1277 okc_debug.Log('4000: Leaving ',2);
1278 okc_debug.Reset_Indentation;
1279 END IF;
1280 END dequeue_date_event;
1281
1282 PROCEDURE dequeue_outcome
1283 IS
1284 l_dq_options dbms_aq.dequeue_options_t;
1285 l_msg_prop dbms_aq.message_properties_t;
1286 l_msg_handle raw(16);
1287 l_msg system.okc_aq_msg_typ;
1288 l_sub_name varchar2(30);
1289 l_msg_text varchar2(1000);
1290 l_msg_data varchar2(1000);
1291 l_msg_count number;
1292 l_return_status varchar2(1) := OKC_API.G_RET_STS_SUCCESS;
1293 x_msg_data varchar2(1000);
1294 x_msg_count number;
1295 x_return_status varchar2(1) := OKC_API.G_RET_STS_SUCCESS;
1296 v_msg_data varchar2(1000);
1297 v_msg_count number;
1298 v_return_status varchar2(1) := OKC_API.G_RET_STS_SUCCESS;
1299 l_api_name CONSTANT VARCHAR2(30) := 'DEQUEUE_OUTCOME';
1300 l_api_version CONSTANT NUMBER := 1.0;
1301 l_msg_tab OKC_AQ_PVT.msg_tab_typ;
1302 l_corrid OKC_AQ_PVT.corrid_rec_typ;
1303 l_init_msg_list VARCHAR2(1) := okc_api.G_FALSE;
1304 l_token1 VARCHAR2(50);
1305 l_token1_value VARCHAR2(50);
1306 l_token2 VARCHAR2(50);
1307 l_token2_value VARCHAR2(50);
1308 e_dequeue_timeout EXCEPTION;
1309 PRAGMA EXCEPTION_INIT (e_dequeue_timeout, -25228 );
1310 OKC_PROCESS_FAILED EXCEPTION;
1311 OKC_DEQUEUE_FAILED EXCEPTION;
1312 OKC_REMOVE_MSG EXCEPTION;
1313
1314 --
1315 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'dequeue_outcome';
1316 --
1317
1318 BEGIN
1319 IF (l_debug = 'Y') THEN
1320 okc_debug.Set_Indentation(l_proc);
1321 okc_debug.Log('10: Entering ',2);
1322 END IF;
1323
1324
1325 -- Call start_activity to create savepoint, check compatibility
1326 -- and initialize message list
1327 l_return_status := OKC_API.START_ACTIVITY ( l_api_name
1328 , g_pkg_name
1329 , OKC_API.G_FALSE
1330 , l_api_version
1331 , 1.0
1332 , '_PVT'
1333 , x_return_status
1334 );
1335 -- Check if activity started successfully
1336 IF (l_return_status = OKC_API.G_RET_STS_UNEXP_ERROR) THEN
1337 RAISE OKC_API.G_EXCEPTION_UNEXPECTED_ERROR;
1338 ELSIF (l_return_status = OKC_API.G_RET_STS_ERROR) THEN
1339 RAISE OKC_API.G_EXCEPTION_ERROR;
1340 END IF;
1341
1342 IF (l_debug = 'Y') THEN
1343 okc_debug.Log('50: Calling code_dots ',2);
1344 END IF;
1345
1346 -- convert dots to hashes to make sub name legal
1347 code_dots ('OKC_AQ_PVT.DEQUEUE_OUTCOME', l_sub_name);
1348
1349 l_dq_options.consumer_name := l_sub_name;
1350 l_dq_options.wait := OKC_AQ_PVT.g_dequeue_wait;
1351 l_dq_options.navigation := dbms_aq.next_message;
1352 l_dq_options.dequeue_mode := dbms_aq.LOCKED;
1353 l_dq_options.visibility := dbms_aq.ON_COMMIT;
1354
1355
1356 IF (l_debug = 'Y') THEN
1357 okc_debug.Log('100: Calling DBMS_AQ.dequeue',2);
1358 END IF;
1359
1360 -- get the message from the queue
1361
1362 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_outcome_queue_name
1363 , dequeue_options => l_dq_options
1364 , message_properties => l_msg_prop
1365 , payload => l_msg
1366 , msgid => l_msg_handle );
1367
1368 IF (l_debug = 'Y') THEN
1369 okc_debug.Log('110: After Calling DBMS_AQ.dequeue',2);
1370 END IF;
1371
1372 IF l_msg IS NULL THEN
1373 OKC_API.SET_MESSAGE(p_app_name => g_app_name
1374 ,p_msg_name => 'OKC_DEQUEUE_FAILED'
1375 ,p_token1 => 'MSG_ID'
1376 ,p_token1_value => RAWTOHEX(l_msg_handle)
1377 ,p_token2 => 'QUEUE'
1378 ,p_token2_value => 'Outcome Queue'
1379 );
1380 raise OKC_REMOVE_MSG;
1381 END IF;
1382
1383 IF (l_debug = 'Y') THEN
1384 okc_debug.Log('120: Calling OKC_XML_PVT.get_element_vals ',2);
1385 END IF;
1386
1387 -- converts message from clob to table of element names and values
1388 OKC_XML_PVT.get_element_vals ( l_msg
1389 , l_msg_tab
1390 , l_corrid );
1391
1392 IF (l_debug = 'Y') THEN
1393 okc_debug.Log('130: After Calling OKC_XML_PVT.get_element_vals ',2);
1394 okc_debug.Log('130: l_corrid.corrid : '||l_corrid.corrid,2);
1395 okc_debug.Log('130: l_msg_tab.count : '||l_msg_tab.count,2);
1396 END IF;
1397
1398 -- call the outcome APIs
1399 IF l_corrid.corrid IS NOT NULL AND
1400 l_msg_tab.count <> 0 THEN
1401 OKC_OUTCOME_INIT_PVT.Launch_outcome(p_api_version => 1.0,
1402 p_init_msg_list => OKC_API.G_FALSE,
1403 p_corrid_rec => l_corrid,
1404 p_msg_tab_typ => l_msg_tab,
1405 x_msg_count => l_msg_count,
1406 x_msg_data => l_msg_data,
1407 x_return_status => l_return_status
1408 );
1409 END IF;
1410 IF NVL(l_return_status,'X') = OKC_API.G_RET_STS_SUCCESS THEN
1411 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
1412 l_dq_options.msgid := l_msg_handle;
1413 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_outcome_queue_name
1414 , dequeue_options => l_dq_options
1415 , message_properties => l_msg_prop
1416 , payload => l_msg
1417 , msgid => l_msg_handle );
1418 commit;
1419 ELSE
1420 OKC_API.SET_MESSAGE(p_app_name => g_app_name
1421 ,p_msg_name => 'OKC_PROCESS_FAILED'
1422 ,p_token1 => 'SOURCE'
1423 ,p_token1_value => 'Outcome Initiator'
1424 ,p_token2 => 'PROCESS'
1425 ,p_token2_value => 'Launch Outcome'
1426 );
1427 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
1428 l_dq_options.msgid := l_msg_handle;
1429 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_outcome_queue_name
1430 , dequeue_options => l_dq_options
1431 , message_properties => l_msg_prop
1432 , payload => l_msg
1433 , msgid => l_msg_handle );
1434 RAISE OKC_PROCESS_FAILED;
1435 END IF;
1436
1437 -- end activity
1438 OKC_API.END_ACTIVITY ( l_msg_count
1439 , l_msg_data );
1440
1441
1442 IF (l_debug = 'Y') THEN
1443 okc_debug.Log('500: l_msg_count : '||l_msg_count,2);
1444 okc_debug.Log('500: l_msg_data : '||l_msg_data,2);
1445 okc_debug.Log('1000: Leaving ',2);
1446 okc_debug.Reset_Indentation;
1447 END IF;
1448
1449
1450 EXCEPTION
1451
1452 WHEN e_dequeue_timeout THEN
1453 IF (l_debug = 'Y') THEN
1454 okc_debug.Log('2000: e_dequeue_timeout EXCEPTION',2);
1455 okc_debug.Log('2000: l_msg_count : '||l_msg_count,2);
1456 okc_debug.Log('2000: l_msg_data : '||l_msg_data,2);
1457 okc_debug.Log('2000: Leaving ',2);
1458 okc_debug.Reset_Indentation;
1459 END IF;
1460 null;
1461 WHEN OKC_REMOVE_MSG THEN
1462 l_dq_options.dequeue_mode := dbms_aq.REMOVE_NODATA;
1463 l_dq_options.msgid := l_msg_handle;
1464 DBMS_AQ.dequeue ( queue_name => OKC_AQ_PVT.g_outcome_queue_name
1465 , dequeue_options => l_dq_options
1466 , message_properties => l_msg_prop
1467 , payload => l_msg
1468 , msgid => l_msg_handle );
1469 commit;
1470 OKC_AQ_WRITE_ERROR_PVT.WRITE_MSGDATA(
1471 p_api_version => 1.0,
1472 p_init_msg_list => OKC_API.G_TRUE,
1473 p_source_name => 'Advanced Queuing',
1474 p_datetime => sysdate,
1475 p_msg_tab => l_msg_tab,
1476 p_q_name => 'Outcome Queue',
1477 p_corrid => l_corrid.corrid,
1478 p_msgid => l_msg_handle,
1479 p_msg_count => l_msg_count,
1480 p_msg_data => l_msg_data
1481 );
1482 IF (l_debug = 'Y') THEN
1483 okc_debug.Log('3000: Leaving ',2);
1484 okc_debug.Reset_Indentation;
1485 END IF;
1486
1487 WHEN OTHERS THEN
1488 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
1489 l_api_name
1490 , g_pkg_name
1491 , 'OTHERS'
1492 , x_msg_count
1493 , x_msg_data
1494 , '_PVT'
1495 );
1496 OKC_AQ_WRITE_ERROR_PVT.WRITE_MSGDATA(
1497 p_api_version => 1.0,
1498 p_init_msg_list => OKC_API.G_TRUE,
1499 p_source_name => 'Advanced Queuing',
1500 p_datetime => sysdate,
1501 p_msg_tab => l_msg_tab,
1502 p_q_name => 'Outcome Queue',
1503 p_corrid => l_corrid.corrid,
1504 p_msgid => l_msg_handle,
1505 p_msg_count => l_msg_count,
1506 p_msg_data => l_msg_data
1507 );
1508 IF (l_debug = 'Y') THEN
1509 okc_debug.Log('4000: Leaving ',2);
1510 okc_debug.Reset_Indentation;
1511 END IF;
1512 END dequeue_outcome;
1513
1514 PROCEDURE add_subscriber (
1515 p_sub_name IN VARCHAR2
1516 , p_queue_name IN VARCHAR2
1517 , p_rule IN VARCHAR2 )
1518 IS
1519 l_subscriber sys.aq$_agent;
1520 l_sub_name VARCHAR2(2000);
1521 --
1522 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'add_subscriber';
1523 --
1524
1525 BEGIN
1526 IF (l_debug = 'Y') THEN
1527 okc_debug.Set_Indentation(l_proc);
1528 okc_debug.Log('10: Entering ',2);
1529 END IF;
1530
1531 -- code the sub name by replacing dots with hash
1532 code_dots (p_sub_name, l_sub_name);
1533 l_subscriber := sys.aq$_agent (l_sub_name,null,null);
1534 DBMS_AQADM.add_subscriber ( p_queue_name
1535 , l_subscriber
1536 , p_rule );
1537 END add_subscriber;
1538
1539
1540 PROCEDURE remove_subscriber (
1541 p_sub_name IN VARCHAR2
1542 , p_queue_name IN VARCHAR2 )
1543 IS
1544 l_sub_name VARCHAR2(2000);
1545 l_subscriber SYS.aq$_agent;
1546 --
1547 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'remove_subscriber';
1548 --
1549
1550 BEGIN
1551 IF (l_debug = 'Y') THEN
1552 okc_debug.Set_Indentation(l_proc);
1553 okc_debug.Log('10: Entering ',2);
1554 END IF;
1555
1556 code_dots (p_sub_name, l_sub_name);
1557 l_subscriber := sys.aq$_agent (l_sub_name,null,null);
1558 DBMS_AQADM.remove_subscriber ( p_queue_name, l_subscriber);
1559
1560 IF (l_debug = 'Y') THEN
1561 okc_debug.Log('1000: Leaving ',2);
1562 okc_debug.Reset_Indentation;
1563 END IF;
1564
1565 END remove_subscriber;
1566
1567
1568 PROCEDURE dequeue_exception (
1569 errbuf OUT NOCOPY VARCHAR2
1570 ,retcode OUT NOCOPY VARCHAR2
1571 ,p_msg_id IN VARCHAR2)
1572 IS
1573 l_dq_options dbms_aq.dequeue_options_t;
1574 l_msg_prop dbms_aq.message_properties_t;
1575 l_msg_handle raw(16);
1576 l_msg system.okc_aq_msg_typ;
1577 l_sub_name varchar2(30);
1578 l_corrid OKC_AQ_PVT.corrid_rec_typ;
1579 l_api_name CONSTANT VARCHAR2(30) := 'DEQUEUE_EXCEPTION';
1580 l_api_version CONSTANT NUMBER := 1.0;
1581 l_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1582 x_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1583 x_msg_count NUMBER;
1584 x_msg_data VARCHAR2(1000);
1585 l_token1 VARCHAR2(50);
1586 l_token1_value VARCHAR2(50);
1587 l_token2 VARCHAR2(50);
1588 l_token2_value VARCHAR2(50);
1589 l_consumer_name VARCHAR2(50);
1590
1591
1592 CURSOR msg_cur IS
1593 SELECT consumer_name,corr_id
1594 FROM aq$okc_aq_ev_tab
1595 WHERE msg_id = HEXTORAW(p_msg_id);
1596 msg_rec msg_cur%ROWTYPE;
1597
1598 e_dequeue_timeout EXCEPTION;
1599 PRAGMA EXCEPTION_INIT (e_dequeue_timeout, -25228 );
1600 OKC_REMOVE_MSG EXCEPTION;
1601
1602 --
1603 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'dequeue_exception';
1604 --
1605 l_queue_name varchar2(25) := g_app_name||'.AQ$_OKC_AQ_EV_TAB_E';
1606 BEGIN
1607 IF (l_debug = 'Y') THEN
1608 okc_debug.Set_Indentation(l_proc);
1609 okc_debug.Log('10: Entering ',2);
1610 END IF;
1611
1612
1613 retcode := 0;
1614 -- Call start_activity to create savepoint, check compatibility
1615 -- and initialize message list
1616 l_return_status := OKC_API.START_ACTIVITY ( l_api_name
1617 , g_pkg_name
1618 , OKC_API.G_FALSE
1619 , l_api_version
1620 , 1.0
1621 , '_PVT'
1622 , x_return_status
1623 );
1624 -- Check if activity started successfully
1625 IF (l_return_status = OKC_API.G_RET_STS_UNEXP_ERROR) THEN
1626 RAISE OKC_API.G_EXCEPTION_UNEXPECTED_ERROR;
1627 ELSIF (l_return_status = OKC_API.G_RET_STS_ERROR) THEN
1628 RAISE OKC_API.G_EXCEPTION_ERROR;
1629 END IF;
1630
1631 -- get the consumer name to enqueue message
1632 OPEN msg_cur;
1633 FETCH msg_cur INTO msg_rec;
1634 IF msg_rec.consumer_name IS NOT NULL THEN
1635 l_consumer_name := msg_rec.consumer_name;
1636 l_corrid.corrid := msg_rec.corr_id;
1637 END IF;
1638 CLOSE msg_cur;
1639
1640 l_dq_options.consumer_name := null;
1641 l_dq_options.wait := OKC_AQ_PVT.g_dequeue_wait;
1642 l_dq_options.navigation := dbms_aq.next_message;
1643 l_dq_options.dequeue_mode := dbms_aq.REMOVE;
1644 l_dq_options.visibility := dbms_aq.on_commit;
1645 l_dq_options.msgid := HEXTORAW(p_msg_id);
1646
1647
1648 -- get the message from the exception queue
1649 DBMS_AQ.dequeue ( queue_name => l_queue_name
1650 , dequeue_options => l_dq_options
1651 , message_properties => l_msg_prop
1652 , payload => l_msg
1653 , msgid => l_msg_handle);
1654 -- if the payload is null then remove message from the queue and commit
1655 IF l_msg IS NULL THEN
1656 OKC_API.SET_MESSAGE(p_app_name => g_app_name
1657 ,p_msg_name => 'OKC_REMOVE_MSG'
1658 ,p_token1 => 'MSG_ID'
1659 ,p_token1_value => RAWTOHEX(l_msg_handle)
1660 ,p_token2 => 'CORRID'
1661 ,p_token2_value => l_dq_options.correlation
1662 );
1663 raise OKC_REMOVE_MSG;
1664 END IF;
1665
1666 -- enqueue the message into appropriate queue based on consumer name
1667 IF l_consumer_name = 'OKC_AQ_PVT#DEQUEUE_OUTCOME' THEN
1668 enqueue_message ( l_msg
1669 , l_corrid
1670 ,OKC_AQ_PVT.g_outcome_queue_name
1671 ,dbms_aq.no_delay
1672 , l_msg_handle);
1673 ELSE
1674 enqueue_message ( l_msg
1675 , l_corrid
1676 ,OKC_AQ_PVT.g_event_queue_name
1677 ,dbms_aq.no_delay
1678 , l_msg_handle);
1679 END IF;
1680 commit;
1681 -- end activity
1682 OKC_API.END_ACTIVITY ( x_msg_count
1683 , x_msg_data );
1684
1685 IF (l_debug = 'Y') THEN
1686 okc_debug.Log('1000: Leaving ',2);
1687 okc_debug.Reset_Indentation;
1688 END IF;
1689
1690
1691 EXCEPTION
1692
1693 WHEN e_dequeue_timeout THEN
1694 IF (l_debug = 'Y') THEN
1695 okc_debug.Log('2000: Leaving ',2);
1696 okc_debug.Reset_Indentation;
1697 END IF;
1698 null;
1699
1700 WHEN OTHERS THEN
1701 retcode := 2;
1702 errbuf := substr(sqlerrm,1,250);
1703 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
1704 l_api_name
1705 , g_pkg_name
1706 , 'OTHERS'
1707 , x_msg_count
1708 , x_msg_data
1709 , '_PVT'
1710 );
1711 IF (l_debug = 'Y') THEN
1712 okc_debug.Log('3000: Leaving ',2);
1713 okc_debug.Reset_Indentation;
1714 END IF;
1715 END dequeue_exception;
1716
1717 -- This procedure removes any message from events,outcome or
1718 -- exception queue. This will be registered as concurrent
1719 -- program which accepts msg_id as input parameter. This program
1720 -- can be run to remove messages in situations where message is
1721 -- not dequeued or retried due to some errors and also when
1722 -- it causes overflow of aqerror log tables.
1723 PROCEDURE remove_message(
1724 errbuf OUT NOCOPY VARCHAR2
1725 ,retcode OUT NOCOPY VARCHAR2
1726 ,p_msg_id IN VARCHAR2)
1727 IS
1728 l_dq_options dbms_aq.dequeue_options_t;
1729 l_msg_prop dbms_aq.message_properties_t;
1730 l_msg_handle raw(16);
1731 l_msg system.okc_aq_msg_typ;
1732 l_sub_name varchar2(30);
1733 l_corrid OKC_AQ_PVT.corrid_rec_typ;
1734 l_api_name CONSTANT VARCHAR2(30) := 'REMOVE_MESSAGE';
1735 l_api_version CONSTANT NUMBER := 1.0;
1736 l_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1737 x_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1738 x_msg_count NUMBER;
1739 x_msg_data VARCHAR2(1000);
1740 l_token1 VARCHAR2(50);
1741 l_token1_value VARCHAR2(50);
1742 l_token2 VARCHAR2(50);
1743 l_token2_value VARCHAR2(50);
1744 l_consumer_name VARCHAR2(50);
1745 l_queue VARCHAR2(30);
1746 l_error_queue VARCHAR2(30) := g_app_name||'.AQ$_OKC_AQ_EV_TAB_E';
1747
1748
1749 CURSOR msg_cur IS
1750 SELECT consumer_name,corr_id,queue
1751 FROM aq$okc_aq_ev_tab
1752 WHERE msg_id = HEXTORAW(p_msg_id);
1753 msg_rec msg_cur%ROWTYPE;
1754
1755 e_dequeue_timeout EXCEPTION;
1756 PRAGMA EXCEPTION_INIT (e_dequeue_timeout, -25228 );
1757 OKC_REMOVE_MSG EXCEPTION;
1758
1759 --
1760 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'remove_message';
1761 --
1762
1763 BEGIN
1764 IF (l_debug = 'Y') THEN
1765 okc_debug.Set_Indentation(l_proc);
1766 okc_debug.Log('10: Entering ',2);
1767 END IF;
1768
1769
1770 retcode := 0;
1771 -- Call start_activity to create savepoint, check compatibility
1772 -- and initialize message list
1773 l_return_status := OKC_API.START_ACTIVITY ( l_api_name
1774 , g_pkg_name
1775 , OKC_API.G_FALSE
1776 , l_api_version
1777 , 1.0
1778 , '_PVT'
1779 , x_return_status
1780 );
1781 -- Check if activity started successfully
1782 IF (l_return_status = OKC_API.G_RET_STS_UNEXP_ERROR) THEN
1783 RAISE OKC_API.G_EXCEPTION_UNEXPECTED_ERROR;
1784 ELSIF (l_return_status = OKC_API.G_RET_STS_ERROR) THEN
1785 RAISE OKC_API.G_EXCEPTION_ERROR;
1786 END IF;
1787
1788 -- get the consumer name to enqueue message
1789 OPEN msg_cur;
1790 FETCH msg_cur INTO msg_rec;
1791 IF msg_rec.consumer_name IS NOT NULL THEN
1792 l_consumer_name := msg_rec.consumer_name;
1793 l_corrid.corrid := msg_rec.corr_id;
1794 l_queue := msg_rec.queue;
1795 END IF;
1796 CLOSE msg_cur;
1797 l_dq_options.consumer_name := l_consumer_name;
1798 l_dq_options.wait := OKC_AQ_PVT.g_dequeue_wait;
1799 l_dq_options.navigation := dbms_aq.next_message;
1800 l_dq_options.dequeue_mode := dbms_aq.REMOVE;
1801 l_dq_options.visibility := dbms_aq.on_commit;
1802 l_dq_options.msgid := HEXTORAW(p_msg_id);
1803
1804 IF l_queue = 'OKC_AQ_EV_QUEUE' THEN
1805 l_queue := okc_aq_pvt.g_event_queue_name;
1806 ELSIF l_queue = 'OKC_AQ_OC_QUEUE' THEN
1807 l_queue := okc_aq_pvt.g_outcome_queue_name;
1808 ELSIF l_queue = 'AQ$_OKC_AQ_EV_TAB_E' THEN
1809 l_queue := l_error_queue;
1810 END IF;
1811
1812 -- get the message from the queue
1813 DBMS_AQ.dequeue ( queue_name => l_queue
1814 , dequeue_options => l_dq_options
1815 , message_properties => l_msg_prop
1816 , payload => l_msg
1817 , msgid => l_msg_handle);
1818
1819 commit;
1820 -- end activity
1821 OKC_API.END_ACTIVITY ( x_msg_count
1822 , x_msg_data );
1823
1824 IF (l_debug = 'Y') THEN
1825 okc_debug.Log('1000: Leaving ',2);
1826 okc_debug.Reset_Indentation;
1827 END IF;
1828
1829
1830 EXCEPTION
1831
1832 WHEN e_dequeue_timeout THEN
1833 IF (l_debug = 'Y') THEN
1834 okc_debug.Log('2000: Leaving ',2);
1835 okc_debug.Reset_Indentation;
1836 END IF;
1837 -- null;
1838
1839 WHEN OTHERS THEN
1840 retcode := 2;
1841 errbuf := substr(sqlerrm,1,250);
1842 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
1843 l_api_name
1844 , g_pkg_name
1845 , 'OTHERS'
1846 , x_msg_count
1847 , x_msg_data
1848 , '_PVT'
1849 );
1850 IF (l_debug = 'Y') THEN
1851 okc_debug.Log('3000: Leaving ',2);
1852 okc_debug.Reset_Indentation;
1853 END IF;
1854 END remove_message;
1855
1856
1857 PROCEDURE clear_message( errbuf OUT NOCOPY VARCHAR2
1858 ,retcode OUT NOCOPY VARCHAR2
1859 )
1860 IS
1861 l_dq_options dbms_aq.dequeue_options_t;
1862 l_msg_prop dbms_aq.message_properties_t;
1863 l_msg_handle raw(16);
1864 l_msg system.okc_aq_msg_typ;
1865 l_sub_name varchar2(30);
1866 l_corrid OKC_AQ_PVT.corrid_rec_typ;
1867 l_api_name CONSTANT VARCHAR2(30) := 'REMOVE_MESSAGE';
1868 l_api_version CONSTANT NUMBER := 1.0;
1869 l_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1870 x_return_status VARCHAR2(1) := OKC_API.G_RET_STS_SUCCESS;
1871 x_msg_count NUMBER;
1872 x_msg_data VARCHAR2(1000);
1873 l_token1 VARCHAR2(50);
1874 l_token1_value VARCHAR2(50);
1875 l_token2 VARCHAR2(50);
1876 l_token2_value VARCHAR2(50);
1877 l_consumer_name VARCHAR2(50);
1878 l_queue VARCHAR2(30);
1879 l_profile VARCHAR2(240);
1880
1881 CURSOR profile_cur IS
1882 SELECT opval.profile_option_value profile_value
1883 FROM fnd_profile_options op,
1884 fnd_profile_option_values opval
1885 WHERE op.profile_option_id = opval.profile_option_id
1886 AND op.application_id = opval.application_id
1887 AND op.profile_option_name = 'OKC_RETAIN_MSG_DAYS';
1888 profile_rec profile_cur%ROWTYPE;
1889
1890 CURSOR msg_cur(x IN NUMBER) IS
1891 SELECT msgid
1892 FROM okc_aq_ev_tab
1893 WHERE q_name = 'AQ$_OKC_AQ_EV_TAB_E'
1894 AND trunc(enq_time) + (x) = trunc(sysdate);
1895 msg_rec msg_cur%ROWTYPE;
1896
1897 e_dequeue_timeout EXCEPTION;
1898 PRAGMA EXCEPTION_INIT (e_dequeue_timeout, -25228 );
1899 OKC_REMOVE_MSG EXCEPTION;
1900
1901 --
1902 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'clear_message';
1903 l_error_queue varchar2(30) := g_app_name||'.AQ$_OKC_AQ_EV_TAB_E';
1904 --
1905
1906 BEGIN
1907 IF (l_debug = 'Y') THEN
1908 okc_debug.Set_Indentation(l_proc);
1909 okc_debug.Log('10: Entering ',2);
1910 END IF;
1911
1912
1913 retcode := 0;
1914 -- Call start_activity to create savepoint, check compatibility
1915 -- and initialize message list
1916 l_return_status := OKC_API.START_ACTIVITY ( l_api_name
1917 , g_pkg_name
1918 , OKC_API.G_FALSE
1919 , l_api_version
1920 , 1.0
1921 , '_PVT'
1922 , x_return_status
1923 );
1924 -- Check if activity started successfully
1925 IF (l_return_status = OKC_API.G_RET_STS_UNEXP_ERROR) THEN
1926 RAISE OKC_API.G_EXCEPTION_UNEXPECTED_ERROR;
1927 ELSIF (l_return_status = OKC_API.G_RET_STS_ERROR) THEN
1928 RAISE OKC_API.G_EXCEPTION_ERROR;
1929 END IF;
1930
1931 OPEN profile_cur;
1932 FETCH profile_cur INTO profile_rec;
1933 l_profile := profile_rec.profile_value;
1934 CLOSE profile_cur;
1935 -- get the msg_id name to dequeue message
1936 IF msg_cur%ISOPEN THEN
1937 CLOSE msg_cur;
1938 END IF;
1939 OPEN msg_cur(l_profile);
1940 LOOP
1941 FETCH msg_cur INTO msg_rec;
1942 IF msg_cur%NOTFOUND THEN
1943 EXIT;
1944 ELSE
1945
1946 l_dq_options.consumer_name := null;
1947 l_dq_options.wait := OKC_AQ_PVT.g_dequeue_wait;
1948 l_dq_options.navigation := dbms_aq.next_message;
1949 l_dq_options.dequeue_mode := dbms_aq.REMOVE;
1950 l_dq_options.visibility := dbms_aq.on_commit;
1951 l_dq_options.msgid := HEXTORAW(msg_rec.msgid);
1952
1953
1954 -- remove the message from the exception queue
1955 DBMS_AQ.dequeue ( queue_name => l_error_queue
1956 , dequeue_options => l_dq_options
1957 , message_properties => l_msg_prop
1958 , payload => l_msg
1959 , msgid => l_msg_handle);
1960
1961 commit;
1962 END IF;
1963 END LOOP;
1964 CLOSE msg_cur;
1965 -- end activity
1966 OKC_API.END_ACTIVITY ( x_msg_count
1967 , x_msg_data );
1968
1969 IF (l_debug = 'Y') THEN
1970 okc_debug.Log('1000: Leaving ',2);
1971 okc_debug.Reset_Indentation;
1972 END IF;
1973
1974
1975 EXCEPTION
1976
1977 WHEN e_dequeue_timeout THEN
1978 IF (l_debug = 'Y') THEN
1979 okc_debug.Log('2000: Leaving ',2);
1980 okc_debug.Reset_Indentation;
1981 END IF;
1982 null;
1983
1984 WHEN OTHERS THEN
1985 retcode := 2;
1986 errbuf := substr(sqlerrm,1,250);
1987 x_return_status := OKC_API.HANDLE_EXCEPTIONS (
1988 l_api_name
1989 , g_pkg_name
1990 , 'OTHERS'
1991 , x_msg_count
1992 , x_msg_data
1993 , '_PVT'
1994 );
1995 IF (l_debug = 'Y') THEN
1996 okc_debug.Log('3000: Leaving ',2);
1997 okc_debug.Reset_Indentation;
1998 END IF;
1999 END clear_message;
2000
2001 -- This procedure will be executed by ICM at the time of database shutdown
2002 PROCEDURE stop_listener
2003 IS
2004 l_corrid_rec okc_aq_pvt.corrid_rec_typ;
2005 l_msg_tab okc_aq_pvt.msg_tab_typ := okc_aq_pvt.msg_tab_typ() ;
2006 l_msg_count number;
2007 l_msg_data varchar2(1000);
2008 l_return_status varchar2(1);
2009 x_msg_count number;
2010 x_msg_data varchar2(1000);
2011 x_return_status varchar2(1);
2012 l_api_name CONSTANT VARCHAR2(30) := 'STOP_LISTENER';
2013 PRAGMA AUTONOMOUS_TRANSACTION;
2014 --
2015 l_proc varchar2(72) := ' OKC_AQ_PVT.'||'stop_listener';
2016 --
2017
2018 BEGIN
2019 IF (l_debug = 'Y') THEN
2020 okc_debug.Set_Indentation(l_proc);
2021 okc_debug.Log('10: Entering ',2);
2022 END IF;
2023
2024 --Initialize return status
2025 l_return_status := OKC_API.G_RET_STS_SUCCESS;
2026
2027 l_corrid_rec.corrid := 'SHUTDOWN';
2028 -- enqueues shutdown message into events queue
2029 okc_aq_pvt.send_message ( p_api_version => '1.0'
2030 , x_msg_count => l_msg_count
2031 , x_msg_data => l_msg_data
2032 , x_return_status => l_return_status
2033 , p_corrid_rec => l_corrid_rec
2034 , p_msg_tab => l_msg_tab
2035 , p_queue_name => okc_aq_pvt.g_event_queue_name );
2036
2037 -- enqueues shutdown message into outcomes queue
2038 okc_aq_pvt.send_message ( p_api_version => '1.0'
2039 , x_msg_count => l_msg_count
2040 , x_msg_data => l_msg_data
2041 , x_return_status => l_return_status
2042 , p_corrid_rec => l_corrid_rec
2043 , p_msg_tab => l_msg_tab
2044 , p_queue_name => okc_aq_pvt.g_outcome_queue_name );
2045 commit;
2046
2047 IF (l_debug = 'Y') THEN
2048 okc_debug.Log('1000: Leaving ',2);
2049 okc_debug.Reset_Indentation;
2050 END IF;
2051
2052
2053 EXCEPTION
2054 WHEN others THEN
2055 IF (l_debug = 'Y') THEN
2056 okc_debug.Log('2000: Leaving ',2);
2057 okc_debug.Reset_Indentation;
2058 END IF;
2059 rollback;
2060 END stop_listener;
2061
2062 END okc_aq_pvt;