1 PACKAGE BODY fnd_cp_opp_ipc AS
2 /* $Header: AFCPOPIB.pls 120.6 2006/09/15 17:06:51 pferguso noship $ */
3
4
5
6 -- Name of the OPP AQ
7 QUEUE_NAME constant VARCHAR2(30) := 'FND_CP_GSM_OPP_AQ';
8
9
10 -- All names will be prefixed with this prefix before being used.
11 -- The prefix will be stripped off if returned outside this package.
12 -- Necessary because subscriber names and consumer names cannot begin with a number
13 OPPPREFIX constant VARCHAR2(3) := 'OPP';
14
15
16 -- Name of the schema that owns the AQ
17 Q_Schema varchar2(30) := NULL;
18
19
20 TYPE CurType is REF CURSOR;
21
22
23 -- Largest increment to wait for dequeue (in seconds)
24 TIMEOUT_INCREMENT constant number := 5;
25
26
27 DAY_PER_SEC constant number := 1 / (24 * 60 * 60); -- Days in a second
28 SEC_PER_DAY constant number := (24 * 60 * 60); -- Seconds in a day
29
30 --------------------------------------------------------------------------------
31
32
33 -- =========================================================
34 -- Subscription procedures
35 -- =========================================================
36
37
38 --
39 -- Subscribe to the OPP AQ
40 --
41 procedure Subscribe(subscriber in varchar2) is
42
43 pragma AUTONOMOUS_TRANSACTION;
44
45 Begin
46 DBMS_AQADM.ADD_SUBSCRIBER(queue_name =>Q_Schema || '.' || QUEUE_NAME,
47 subscriber => sys.aq$_agent(OPPPREFIX || subscriber, NULL, NULL));
48
49
50 commit;
51
52 Exception
53 when others then
54 rollback;
55 raise;
56
57 end;
58
59
60
61 --
62 -- Subscribe to the OPP AQ using a particular group
63 --
64 -- Subscribers will only receive messages targeted to this group,
65 -- i.e. where payload.message_group matches the subscriber's group
66 --
67 -- The OPP service will subscribe using the node name (or APPL_TOP name)
68 -- as its group id.
69 --
70 procedure Subscribe_to_group(subscriber in varchar2, groupid in varchar2) is
71
72 pragma AUTONOMOUS_TRANSACTION;
73
74 Begin
75 DBMS_AQADM.ADD_SUBSCRIBER(queue_name =>Q_Schema || '.' || QUEUE_NAME,
76 subscriber => sys.aq$_agent(OPPPREFIX || subscriber, NULL, NULL),
77 rule => 'tab.user_data.message_group = ''' || groupid || '''');
78
79
80 commit;
81
82 Exception
83 when others then
84 rollback;
85 raise;
86
87 end;
88
89
90
91 --
92 -- Unsubscribe a single subscriber from the OPP AQ
93 --
94 procedure Unsubscribe(subscriber in varchar2) is
95
96 pragma AUTONOMOUS_TRANSACTION;
97
98 Begin
99 DBMS_AQADM.REMOVE_SUBSCRIBER(queue_name =>Q_Schema || '.' || QUEUE_NAME,
100 subscriber => sys.aq$_agent(OPPPREFIX || subscriber, NULL, NULL));
101
102
103 commit;
104
105 Exception
106 when others then
107 rollback;
108 raise;
109 end;
110
111
112
113 --
114 -- Return a count of how many subscribers are currently subscribed to the AQ
115 -- for a particular group.
116 --
117 function check_group_subscribers(groupid in varchar2) return number is
118
119 cnt number := 0;
120 stmt varchar2(256);
121
122 begin
123
124 -- For some reason, a select from aq$FND_CP_GSM_OPP_AQTBL_R is not working...
125
126 stmt := 'select count(*) from ' ||
127 Q_Schema || '.aq$' || QUEUE_NAME || 'TBL_S qtab, ' ||
128 'fnd_concurrent_processes fcp ' ||
129 'where qtab.queue = ''' || QUEUE_NAME ||
130 ''' and fcp.node_name = :1 ' ||
131 ' and fcp.process_status_code in (''A'',''Z'') ' ||
132 ' and qtab.name = ''' || OPPPREFIX || ''' || fcp.concurrent_process_id';
133
134 execute immediate stmt into cnt using groupid;
135 return cnt;
136
137 end;
138
139
140
141 --
142 -- Select a random OPP AQ subscriber out of all the current subscribers.
143 -- Returns the subscriber name.
144 --
145 function select_random_subscriber return varchar2 is
146
147 stmt varchar2(512);
148 subscriber varchar2(30);
149
150 begin
151
152
153 stmt := 'select * ' ||
154 'from ' ||
155 '( ' ||
156 'select name from ' || Q_Schema || '.aq$' || QUEUE_NAME || 'TBL_S ' ||
157 'ORDER BY DBMS_RANDOM.VALUE ' ||
158 ') where rownum = 1';
159
160 execute immediate stmt into subscriber;
161
162 if instr(subscriber, OPPPREFIX, 1, 1) = 1 then
163 subscriber := substr(subscriber, length(OPPPREFIX) + 1);
164 end if;
165
166 return subscriber;
167
168 exception
169 when no_data_found then
170 return null;
171
172 end;
173
174
175
176 --
177 -- Remove all subscribers of the OPP AQ
178 --
179 procedure remove_all_subscribers is
180
181 c1 CurType;
182 subname varchar2(30);
183
184 begin
185
186 open c1 for
187 'select name from ' || Q_Schema || '.aq$' || QUEUE_NAME || 'TBL_S ' || ' where QUEUE = ''' || QUEUE_NAME || '''';
188 loop
189 fetch c1 into subname;
190 exit when c1%NOTFOUND;
191
192 DBMS_AQADM.REMOVE_SUBSCRIBER(queue_name => Q_Schema || '.' || QUEUE_NAME,
193 subscriber => sys.aq$_agent(subname, NULL, NULL));
194 end loop;
195 close c1;
196
197 end;
198
199
200
201 --
202 -- Return a list of all subscribers
203 --
204 function list_subscribers return subscriber_list is
205
206 c1 CurType;
207 sublist subscriber_list := subscriber_list();
208
209 begin
210
211 open c1 for
212 'select name from ' || Q_Schema || '.aq$' || QUEUE_NAME || 'TBL_S' || ' where QUEUE = ''' || QUEUE_NAME || '''';
213
214
215 fetch c1 bulk collect into sublist;
216 close c1;
217 return sublist;
218
219 end;
220
221
222
223
224 -- =========================================================
225 -- Message sending procedures
226 -- =========================================================
227
228
229
230 --
231 -- send_message_private
232 -- All messages are enqueued using this private procedure
233 --
234 -- INPUT:
235 -- recipients - List of recipients. If null, published to the entire queue.
236 -- groupid - Group to send to. Pass NULL if sending to a specific recipient
237 -- sender - Sender's name
238 -- type - Message type
239 -- message - Message contents
240 -- Parameters - Message payload
241 --
242 procedure send_message_private (recipients in subscriber_list,
243 groupid in varchar2,
244 sender in Varchar2,
245 type in Number,
246 message in Varchar2,
247 Parameters in Varchar2,
248 correlation in Varchar2 default null) is
249
250 enq_opts DBMS_AQ.enqueue_options_t;
251 msg_props DBMS_AQ.message_properties_t;
252 msg_id raw(16);
253 msg system.FND_CP_GSM_OPP_AQ_PAYLOAD;
254
255
256 pragma AUTONOMOUS_TRANSACTION;
257
258 begin
259 msg := system.FND_CP_GSM_OPP_AQ_PAYLOAD(groupid, type, message, Parameters);
260
261 enq_opts.visibility := DBMS_AQ.ON_COMMIT;
262 enq_opts.sequence_deviation := NULL;
263 msg_props.delay := DBMS_AQ.NO_DELAY;
264 msg_props.expiration := 365 * 24 * 3600; -- One Year
265
266 msg_props.sender_id := sys.aq$_agent(OPPPREFIX || sender, NULL, NULL);
267
268 if correlation is not null then
269 msg_props.correlation := correlation;
270 end if;
271
272 if recipients is not null then
273 for i in 1 .. recipients.COUNT
274 loop
275 msg_props.recipient_list(i) := sys.aq$_agent(OPPPREFIX || recipients(i), NULL, NULL);
276 end loop;
277 end if;
278
279 DBMS_AQ.Enqueue( queue_name => Q_Schema || '.' || QUEUE_NAME,
280 enqueue_options => enq_opts,
281 message_properties => msg_props,
282 Payload => msg,
283 msgid => msg_id);
284
285 commit;
286
287 exception
288 when OTHERS then
289 rollback;
290 raise;
291
292
293 end;
294
295
296
297
298
299 --
300 -- Generic send message procedure
301 -- Send a message of any type to one or more recipients
302 --
303 procedure send_message (recipients in subscriber_list,
304 sender in Varchar2,
305 type in Number,
306 message in Varchar2,
307 Parameters in Varchar2) is
308
309 begin
310
311 if recipients is null then
312 return;
313 end if;
314
315 send_message_private(recipients, null, sender, type, message, parameters);
316
317 end;
318
319
320
321
322 --
323 -- Send a message of any type to a specific process
324 --
325 procedure send_targeted_message (recipient in Varchar2,
326 sender in Varchar2,
327 type in Number,
328 message in Varchar2,
329 Parameters in Varchar2,
330 correlation in Varchar2 default null) is
331
332 rlist subscriber_list;
333 begin
334
335 if recipient is null then
336 return;
337 end if;
338
339 rlist := subscriber_list(recipient);
340 send_message_private(rlist, null, sender, type, message, parameters, correlation);
341
342
343 end;
344
345
346
347
348 --
349 -- Send a message to a group to post-process a request
350 --
351 procedure send_request (groupid in Varchar2,
352 sender in Varchar2,
353 request_id in number,
354 Parameters in Varchar2) is
355
356 cnt number;
357 begin
358
359
360 if groupid is null then
361 return;
362 end if;
363
364
365 send_message_private(null, groupid, sender, REQUEST_TYPE, to_char(request_id), parameters);
366
367 end;
368
369
370
371 --
372 -- Send a message to a specific process to post-process a request
373 --
374 procedure send_targeted_request ( recipient in Varchar2,
375 sender in Varchar2,
376 request_id in number,
377 parameters in Varchar2) is
378
379 begin
380 if recipient is null then
381 return;
382 end if;
383
384 send_targeted_message(recipient, sender, REQUEST_TYPE, to_char(request_id), parameters);
385
386 end;
387
388
389
390
391 --
392 -- Send an OPP command to a specific process
393 --
394 procedure send_command ( recipient in Varchar2,
395 sender in Varchar2,
396 command in Varchar2,
397 parameters in Varchar2) is
398
399 begin
400
401 if recipient is null then
402 return;
403 end if;
404
405 send_targeted_message(recipient, sender, COMMAND_TYPE, command, parameters);
406
407 end;
408
409
410
411
412
413
414
415
416
417 -- =========================================================
418 -- Receiving messages
419 -- =========================================================
420
421
422 --
423 -- Dequeue a message from the OPP AQ
424 --
425 -- INPUT:
426 -- Handle - Used as the consumer name
427 -- Message_Wait_Timeout - Timeout in seconds
428 --
429 -- OUTPUT:
430 -- Success_Flag - Y if received message, T if timeout, N if error
431 -- Message_Type - Type of message
432 -- Message_group - Group message was sent to
433 -- Message - Message contents
434 -- Parameters - Message payload
435 -- Sender - Sender of message
436 --
437 -- If an exception occurs, success_flag will contain 'N', and
438 -- Message will contain the error message.
439 --
440 Procedure Get_Message ( Handle in Varchar2,
441 Success_Flag OUT NOCOPY Varchar2,
442 Message_Type OUT NOCOPY Number,
443 Message_group OUT NOCOPY Varchar2,
444 Message OUT NOCOPY Varchar2,
445 Parameters OUT NOCOPY Varchar2,
446 Sender OUT NOCOPY Varchar2,
447 Message_Wait_Timeout IN Number default 60,
448 Correlation IN Varchar2 default null) is
449
450
451 payload system.FND_CP_GSM_OPP_AQ_PAYLOAD;
452 dq_opts DBMS_AQ.DEQUEUE_OPTIONS_T;
453 msg_props DBMS_AQ.MESSAGE_PROPERTIES_T;
454 msgid raw(16);
455 queue_timeout exception;
456 time_left number;
457 end_time date;
458
459 pragma exception_init(queue_timeout, -25228);
460
461 pragma AUTONOMOUS_TRANSACTION;
462
463 Begin
464 payload := system.FND_CP_GSM_OPP_AQ_PAYLOAD(NULL,NULL,NULL,NULL);
465
466 dq_opts.DEQUEUE_MODE := DBMS_AQ.REMOVE;
467 dq_opts.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;
468 dq_opts.VISIBILITY := DBMS_AQ.IMMEDIATE;
469 dq_opts.MSGID := NULL;
470 dq_opts.consumer_name := OPPPREFIX || Handle;
471
472 if correlation is not null then
473 dq_opts.correlation := correlation;
474 end if;
475
476
477 time_left := Message_Wait_Timeout;
478 end_time := sysdate + (Message_Wait_Timeout * DAY_PER_SEC);
479
480 -- Loop until the return message arrives or the timeout expires,
481 -- but do not wait on any single dequeue call more than TIMEOUT_INCREMENT seconds
482 loop
483 if time_left > TIMEOUT_INCREMENT then
484 dq_opts.WAIT := TIMEOUT_INCREMENT;
485 else
486 dq_opts.WAIT := time_left;
487 end if;
488
489 begin
490
491 DBMS_AQ.DEQUEUE(QUEUE_NAME => Q_Schema || '.' || QUEUE_NAME,
492 DEQUEUE_OPTIONS => dq_opts,
493 MESSAGE_PROPERTIES => msg_props,
494 PAYLOAD => payload,
495 MSGID => msgid);
496
497 exit;
498
499 exception
500 when queue_timeout then
501 if sysdate >= end_time then
502 Success_Flag := 'T';
503 commit;
504 return;
505 end if;
506
507 time_left := (end_time - sysdate) * SEC_PER_DAY;
508 end;
509
510 end loop;
511
512 message_type := payload.message_type;
513 message_group := payload.message_group;
514 message := payload.message;
515 parameters := payload.parameters;
516
517 -- strip off any OPP prefix from the sender's name,
518 if instr(msg_props.sender_id.name, OPPPREFIX, 1, 1) = 1 then
519 sender := substr(msg_props.sender_id.name, length(OPPPREFIX) + 1);
520 else
521 sender := msg_props.sender_id.name;
522 end if;
523
524 Success_Flag := 'Y';
525
526 commit;
527
528 exception
529
530 when OTHERS then
531 Success_Flag := 'N';
532 message := substr(sqlerrm, 1, 240);
533 commit;
534
535 end;
536
537
538
539 --
540 -- Package Initialization
541 --
542 procedure initialize is
543
544 status varchar2(1);
545 industry varchar2(1);
546 retval boolean;
547
548 begin
549
550 retval := fnd_installation.get_app_info('FND', status, industry, Q_Schema);
551
552 end;
553
554
555
556
557 begin
558
559 initialize;
560
561 END fnd_cp_opp_ipc;