DBA Data[Home] [Help]

PACKAGE BODY: APPS.FND_CP_OPP_IPC

Source


1 PACKAGE BODY fnd_cp_opp_ipc AS
2 /* $Header: AFCPOPIB.pls 120.6 2006/09/15 17:06:51 pferguso noship $ */
3 
4 
5 
6 -- Name of the OPP AQ
7 QUEUE_NAME   constant VARCHAR2(30) := 'FND_CP_GSM_OPP_AQ';
8 
9 
10 -- All names will be prefixed with this prefix before being used.
11 -- The prefix will be stripped off if returned outside this package.
12 -- Necessary because subscriber names and consumer names cannot begin with a number
13 OPPPREFIX    constant VARCHAR2(3)  := 'OPP';
14 
15 
16 -- Name of the schema that owns the AQ
17 Q_Schema varchar2(30) := NULL;
18 
19 
20 TYPE CurType is REF CURSOR;
21 
22 
23 -- Largest increment to wait for dequeue (in seconds)
24 TIMEOUT_INCREMENT constant number := 5;
25 
26 
27 DAY_PER_SEC constant number := 1 / (24 * 60 * 60); -- Days in a second
28 SEC_PER_DAY constant number := (24 * 60 * 60); -- Seconds in a day
29 
30 --------------------------------------------------------------------------------
31 
32 
33 -- =========================================================
34 -- Subscription procedures
35 -- =========================================================
36 
37 
38 --
39 -- Subscribe to the OPP AQ
40 --
41 procedure Subscribe(subscriber in varchar2) is
42 
43 pragma AUTONOMOUS_TRANSACTION;
44 
45 Begin
46     DBMS_AQADM.ADD_SUBSCRIBER(queue_name =>Q_Schema || '.' || QUEUE_NAME,
47                               subscriber => sys.aq$_agent(OPPPREFIX || subscriber, NULL, NULL));
48 
49 
50   commit;
51 
52 Exception
53   when others then
54      rollback;
55 	 raise;
56 
57 end;
58 
59 
60 
61 --
62 -- Subscribe to the OPP AQ using a particular group
63 --
64 -- Subscribers will only receive messages targeted to this group,
65 -- i.e. where payload.message_group matches the subscriber's group
66 --
67 -- The OPP service will subscribe using the node name (or APPL_TOP name)
68 -- as its group id.
69 --
70 procedure Subscribe_to_group(subscriber in varchar2, groupid in varchar2) is
71 
72 pragma AUTONOMOUS_TRANSACTION;
73 
74 Begin
75     DBMS_AQADM.ADD_SUBSCRIBER(queue_name =>Q_Schema || '.' || QUEUE_NAME,
76                               subscriber => sys.aq$_agent(OPPPREFIX || subscriber, NULL, NULL),
77 							  rule       => 'tab.user_data.message_group = ''' || groupid || '''');
78 
79 
80   commit;
81 
82 Exception
83   when others then
84      rollback;
85 	 raise;
86 
87 end;
88 
89 
90 
91 --
92 -- Unsubscribe a single subscriber from the OPP AQ
93 --
94 procedure Unsubscribe(subscriber in varchar2) is
95 
96 pragma AUTONOMOUS_TRANSACTION;
97 
98 Begin
99     DBMS_AQADM.REMOVE_SUBSCRIBER(queue_name =>Q_Schema || '.' || QUEUE_NAME,
100                                  subscriber => sys.aq$_agent(OPPPREFIX || subscriber, NULL, NULL));
101 
102 
103   commit;
104 
105 Exception
106   when others then
107      rollback;
108 	 raise;
109 end;
110 
111 
112 
113 --
114 -- Return a count of how many subscribers are currently subscribed to the AQ
115 -- for a particular group.
116 --
117 function check_group_subscribers(groupid  in varchar2) return number is
118 
119 cnt   number := 0;
120 stmt  varchar2(256);
121 
122 begin
123 
124     -- For some reason, a select from aq$FND_CP_GSM_OPP_AQTBL_R is not working...
125 
126     stmt := 'select count(*) from ' ||
127 	        Q_Schema || '.aq$' || QUEUE_NAME || 'TBL_S qtab, ' ||
128 	        'fnd_concurrent_processes fcp ' ||
129 	        'where qtab.queue = ''' || QUEUE_NAME ||
130 	        ''' and fcp.node_name = :1 ' ||
131 	        ' and fcp.process_status_code in (''A'',''Z'') ' ||
132 	        ' and qtab.name = ''' || OPPPREFIX || ''' || fcp.concurrent_process_id';
133 
134 	execute immediate stmt into cnt using groupid;
135 	return cnt;
136 
137 end;
138 
139 
140 
141 --
142 -- Select a random OPP AQ subscriber out of all the current subscribers.
143 -- Returns the subscriber name.
144 --
145 function select_random_subscriber return varchar2 is
146 
147 stmt         varchar2(512);
148 subscriber   varchar2(30);
149 
150 begin
151 
152 
153   stmt := 'select * ' ||
154           'from ' ||
155           '( ' ||
156           'select name from ' || Q_Schema || '.aq$' || QUEUE_NAME || 'TBL_S ' ||
157           'ORDER BY DBMS_RANDOM.VALUE ' ||
158           ') where rownum = 1';
159 
160   execute immediate stmt into subscriber;
161 
162   if instr(subscriber, OPPPREFIX, 1, 1) = 1 then
163           subscriber := substr(subscriber, length(OPPPREFIX) + 1);
164   end if;
165 
166   return subscriber;
167 
168 exception
169   when no_data_found then
170     return null;
171 
172 end;
173 
174 
175 
176 --
177 -- Remove all subscribers of the OPP AQ
178 --
179 procedure remove_all_subscribers is
180 
181   c1       CurType;
182   subname  varchar2(30);
183 
184 begin
185 
186 	open c1 for
187 	  'select name from ' || Q_Schema || '.aq$' || QUEUE_NAME || 'TBL_S ' || ' where QUEUE = ''' || QUEUE_NAME || '''';
188 	loop
189 	    fetch c1 into subname;
190 		exit when c1%NOTFOUND;
191 
192         DBMS_AQADM.REMOVE_SUBSCRIBER(queue_name => Q_Schema || '.' || QUEUE_NAME,
193 	                                 subscriber => sys.aq$_agent(subname, NULL, NULL));
194 	end loop;
195     close c1;
196 
197 end;
198 
199 
200 
201 --
202 -- Return a list of all subscribers
203 --
204 function list_subscribers return subscriber_list is
205 
206   c1         CurType;
207   sublist    subscriber_list := subscriber_list();
208 
209 begin
210 
211     open c1 for
212 	    'select name from ' || Q_Schema || '.aq$' || QUEUE_NAME || 'TBL_S' || ' where QUEUE = ''' || QUEUE_NAME || '''';
213 
214 
215 	fetch c1 bulk collect into sublist;
216     close c1;
217 	return sublist;
218 
219 end;
220 
221 
222 
223 
224 -- =========================================================
225 -- Message sending procedures
226 -- =========================================================
227 
228 
229 
230 --
231 -- send_message_private
232 -- All messages are enqueued using this private procedure
233 --
234 -- INPUT:
235 --   recipients   - List of recipients. If null, published to the entire queue.
236 --   groupid      - Group to send to. Pass NULL if sending to a specific recipient
237 --   sender       - Sender's name
238 --   type         - Message type
239 --   message      - Message contents
240 --   Parameters   - Message payload
241 --
242 procedure send_message_private (recipients  in subscriber_list,
243                                 groupid     in varchar2,
244                                 sender      in Varchar2,
245                                 type        in Number,
246                                 message     in Varchar2,
247                                 Parameters  in Varchar2,
248 							    correlation in Varchar2 default null) is
249 
250  enq_opts	DBMS_AQ.enqueue_options_t;
251  msg_props	DBMS_AQ.message_properties_t;
252  msg_id		raw(16);
253  msg		system.FND_CP_GSM_OPP_AQ_PAYLOAD;
254 
255 
256  pragma AUTONOMOUS_TRANSACTION;
257 
258  begin
259      msg := system.FND_CP_GSM_OPP_AQ_PAYLOAD(groupid, type, message, Parameters);
260 
261      enq_opts.visibility := DBMS_AQ.ON_COMMIT;
262      enq_opts.sequence_deviation := NULL;
263      msg_props.delay := DBMS_AQ.NO_DELAY;
264      msg_props.expiration := 365 * 24 * 3600;	 -- One Year
265 
266 	 msg_props.sender_id := sys.aq$_agent(OPPPREFIX || sender, NULL, NULL);
267 
268 	 if correlation is not null then
269 		msg_props.correlation := correlation;
270 	 end if;
271 
272 	 if recipients is not null then
273 	   for i in 1 .. recipients.COUNT
274 	   loop
275 	     msg_props.recipient_list(i) := sys.aq$_agent(OPPPREFIX || recipients(i), NULL, NULL);
276 	   end loop;
277 	 end if;
278 
279      DBMS_AQ.Enqueue( queue_name 	     => Q_Schema || '.' || QUEUE_NAME,
280  			          enqueue_options    => enq_opts,
281  			          message_properties => msg_props,
282  			          Payload 	         => msg,
283  			          msgid	 	         => msg_id);
284 
285      commit;
286 
287  exception
288      when OTHERS then
289         rollback;
290  		raise;
291 
292 
293 end;
294 
295 
296 
297 
298 
299 --
300 -- Generic send message procedure
301 -- Send a message of any type to one or more recipients
302 --
303 procedure send_message (recipients in subscriber_list,
304                         sender     in Varchar2,
305                         type       in Number,
306                         message    in Varchar2,
307                         Parameters in Varchar2) is
308 
309 begin
310 
311 	if recipients is null then
312 	  return;
313 	end if;
314 
315     send_message_private(recipients, null, sender, type, message, parameters);
316 
317 end;
318 
319 
320 
321 
322 --
323 -- Send a message of any type to a specific process
324 --
325 procedure send_targeted_message (recipient   in Varchar2,
326                                  sender      in Varchar2,
327                                  type        in Number,
328                                  message     in Varchar2,
329                                  Parameters  in Varchar2,
330 								 correlation in Varchar2 default null) is
331 
332    rlist    subscriber_list;
333 begin
334 
335   if recipient is null then
336 	  return;
337   end if;
338 
339   rlist := subscriber_list(recipient);
340   send_message_private(rlist, null, sender, type, message, parameters, correlation);
341 
342 
343 end;
344 
345 
346 
347 
348 --
349 -- Send a message to a group to post-process a request
350 --
351 procedure send_request (groupid       in Varchar2,
352                         sender        in Varchar2,
353                         request_id    in number,
354                         Parameters    in Varchar2) is
355 
356   cnt   number;
357 begin
358 
359 
360     if groupid is null then
361 	    return;
362 	end if;
363 
364 
365 	send_message_private(null, groupid, sender, REQUEST_TYPE, to_char(request_id), parameters);
366 
367 end;
368 
369 
370 
371 --
372 -- Send a message to a specific process to post-process a request
373 --
374 procedure send_targeted_request ( recipient  in Varchar2,
375                                   sender     in Varchar2,
376                                   request_id in number,
377                                   parameters in Varchar2) is
378 
379 begin
380     if recipient is null then
381 	  return;
382 	end if;
383 
384     send_targeted_message(recipient, sender, REQUEST_TYPE, to_char(request_id), parameters);
385 
386 end;
387 
388 
389 
390 
391 --
392 -- Send an OPP command to a specific process
393 --
394 procedure send_command ( recipient  in Varchar2,
395                          sender     in Varchar2,
396                          command    in Varchar2,
397                          parameters in Varchar2) is
398 
399 begin
400 
401     if recipient is null then
402 	  return;
403 	end if;
404 
405     send_targeted_message(recipient, sender, COMMAND_TYPE, command, parameters);
406 
407 end;
408 
409 
410 
411 
412 
413 
414 
415 
416 
417 -- =========================================================
418 -- Receiving messages
419 -- =========================================================
420 
421 
422 --
423 -- Dequeue a message from the OPP AQ
424 --
425 -- INPUT:
426 --   Handle               - Used as the consumer name
427 --   Message_Wait_Timeout - Timeout in seconds
428 --
429 -- OUTPUT:
430 --   Success_Flag   - Y if received message, T if timeout, N if error
431 --   Message_Type   - Type of message
432 --   Message_group  - Group message was sent to
433 --   Message        - Message contents
434 --   Parameters     - Message payload
435 --   Sender         - Sender of message
436 --
437 -- If an exception occurs, success_flag will contain 'N', and
438 -- Message will contain the error message.
439 --
440 Procedure Get_Message ( Handle               in Varchar2,
441                         Success_Flag         OUT NOCOPY  Varchar2,
442                         Message_Type         OUT NOCOPY  Number,
443                         Message_group        OUT NOCOPY  Varchar2,
444                         Message              OUT NOCOPY  Varchar2,
445                         Parameters           OUT NOCOPY  Varchar2,
446                         Sender               OUT NOCOPY  Varchar2,
447                         Message_Wait_Timeout IN          Number   default 60,
448 					    Correlation          IN          Varchar2 default null) is
449 
450 
451  payload          system.FND_CP_GSM_OPP_AQ_PAYLOAD;
452  dq_opts          DBMS_AQ.DEQUEUE_OPTIONS_T;
453  msg_props        DBMS_AQ.MESSAGE_PROPERTIES_T;
454  msgid            raw(16);
455  queue_timeout    exception;
456  time_left        number;
457  end_time         date;
458 
459  pragma exception_init(queue_timeout, -25228);
460 
461  pragma AUTONOMOUS_TRANSACTION;
462 
463  Begin
464      payload := system.FND_CP_GSM_OPP_AQ_PAYLOAD(NULL,NULL,NULL,NULL);
465 
466      dq_opts.DEQUEUE_MODE := DBMS_AQ.REMOVE;
467      dq_opts.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;
468      dq_opts.VISIBILITY := DBMS_AQ.IMMEDIATE;
469      dq_opts.MSGID := NULL;
470      dq_opts.consumer_name := OPPPREFIX || Handle;
471 
472 	 if correlation is not null then
473 		dq_opts.correlation := correlation;
474 	 end if;
475 
476 
477 	 time_left := Message_Wait_Timeout;
478 	 end_time := sysdate + (Message_Wait_Timeout * DAY_PER_SEC);
479 
480 	 -- Loop until the return message arrives or the timeout expires,
481 	 -- but do not wait on any single dequeue call more than TIMEOUT_INCREMENT seconds
482 	 loop
483 		if time_left > TIMEOUT_INCREMENT then
484 		   dq_opts.WAIT := TIMEOUT_INCREMENT;
485 		else
486 		   dq_opts.WAIT := time_left;
487 		end if;
488 
489 		begin
490 
491           DBMS_AQ.DEQUEUE(QUEUE_NAME => Q_Schema ||  '.' || QUEUE_NAME,
492                           DEQUEUE_OPTIONS => dq_opts,
493                           MESSAGE_PROPERTIES => msg_props,
494                           PAYLOAD => payload,
495                           MSGID => msgid);
496 
497 		  exit;
498 
499 		exception
500 		   when queue_timeout then
501 			 if sysdate >= end_time then
502 			   Success_Flag := 'T';
503                commit;
504                return;
505 			 end if;
506 
507 			 time_left := (end_time - sysdate) * SEC_PER_DAY;
508 		end;
509 
510 	 end loop;
511 
512      message_type := payload.message_type;
513      message_group := payload.message_group;
514      message := payload.message;
515      parameters := payload.parameters;
516 
517      -- strip off any OPP prefix from the sender's name,
518      if instr(msg_props.sender_id.name, OPPPREFIX, 1, 1) = 1 then
519        sender := substr(msg_props.sender_id.name, length(OPPPREFIX) + 1);
520      else
521        sender := msg_props.sender_id.name;
522      end if;
523 
524      Success_Flag := 'Y';
525 
526      commit;
527 
528 exception
529 
530     when OTHERS then
531         Success_Flag := 'N';
532         message := substr(sqlerrm, 1, 240);
533         commit;
534 
535 end;
536 
537 
538 
539 --
540 -- Package Initialization
541 --
542 procedure initialize is
543 
544 status    varchar2(1);
545 industry  varchar2(1);
546 retval    boolean;
547 
548 begin
549 
550   retval := fnd_installation.get_app_info('FND', status, industry, Q_Schema);
551 
552 end;
553 
554 
555 
556 
557 begin
558 
559     initialize;
560 
561 END fnd_cp_opp_ipc;