DBA Data[Home] [Help]

PACKAGE BODY: APPS.WF_BES_CLEANUP

Source


1 package body wf_bes_cleanup as
2 /* $Header: WFBESCUB.pls 120.4 2005/11/07 06:33:28 nravindr noship $ */
3 
4 -- the maximum amount of time pings will be retained (days)
5 
6 G_MAX_RETENTION_TIME constant number := 30;  -- 30 days
7 
8 -- the minimum amount of time which must elapse before this procedure will
9 -- run again (days)
10 
11 G_MIN_WAIT_TIME constant number := 30/(60*24);  -- 30 min
12 
13 -- status to indicate that the subscriber was pinged
14 
15 STATUS_PINGED constant varchar2(30) := 'PINGED';
16 
17 -- status to indicate that a subscriber responded to a ping
18 
19 STATUS_RESPONDED constant varchar2(30) := 'RESPONDED';
20 
21 -- status to indicate that a subscriber was removed
22 
23 STATUS_REMOVED constant varchar2(30) := 'REMOVED';
24 
25 -- status to indicate that an attempt to remove a subscriber failed
26 
27 STATUS_REMOVE_FAILED constant varchar2(30) := 'REMOVE_FAILED';
28 
29 -- return code to indicate success
30 
31 RETURN_SUCCESS constant varchar2(30) := 0;
32 
33 -- return code to indicate warning
34 
35 RETURN_WARNING constant varchar2(30) := 1;
36 
37 -- return code to indicate error
38 
39 RETURN_ERROR constant varchar2(30) := 2;
40 
41 --------------------------------------------------------------------------------
42 procedure acknowledge_ping(p_ping_number     in number,
43                            p_queue_name      in varchar2,
44                            p_subscriber_name in varchar2)
45 is
46 PRAGMA AUTONOMOUS_TRANSACTION;
47 begin
48    update wf_bes_subscriber_pings
49    set status = STATUS_RESPONDED,
50        action_time = sysdate
51    where ping_number = p_ping_number
52    and queue_name = p_queue_name
53    and subscriber_name = p_subscriber_name;
54 
55    commit;
56 exception
57    when others then
58       rollback;
59 end acknowledge_ping;
60 
61 --------------------------------------------------------------------------------
62 -- Pings a subscriber.
63 --
64 -- p_ping_number - the ping number
65 -- p_ping_time - the ping time
66 -- p_queue_name - the queue name
67 -- p_subscriber_name - the subscriber_name
68 --------------------------------------------------------------------------------
69 procedure ping_subscriber(p_ping_number     in number,
70                           p_ping_time       in date,
71                           p_queue_name      in varchar2,
72                           p_subscriber_name in varchar2)
73 is
74 begin
75    insert into wf_bes_subscriber_pings
76    (
77       ping_number,
78       ping_time,
79       queue_name,
80       subscriber_name,
81       status,
82       action_time
83    )
84    values
85    (
86       p_ping_number,
87       p_ping_time,
88       p_queue_name,
89       p_subscriber_name,
90       STATUS_PINGED,
91       p_ping_time
92    );
93 end ping_subscriber;
94 --------------------------------------------------------------------------------
95 procedure dequeue_jms_queue(p_queue_name in VARCHAR2,
96                             p_consumer_name in VARCHAR2,
97                             p_navigation    in binary_integer,
98                             p_correlation   in VARCHAR2,
99                             x_have_msg      out nocopy boolean)
100 is
101     l_dequeue_options     dbms_aq.dequeue_options_t;
102     x_message_properties  dbms_aq.message_properties_t;
103     x_msgid               RAW(16);
104     x_payload             SYS.AQ$_JMS_TEXT_MESSAGE;
105     no_messages           exception;
106     pragma exception_init(no_messages, -25228);
107     snap_too_old exception;
108     pragma exception_init(snap_too_old, -1555);
109 begin
110     l_dequeue_options.consumer_name := p_consumer_name;
111     l_dequeue_options.wait          := dbms_aq.NO_WAIT;
112     l_dequeue_options.dequeue_mode  := dbms_aq.remove_nodata;
113     l_dequeue_options.navigation := p_navigation;
114     l_dequeue_options.correlation := p_correlation;
115 
116     x_have_msg := true;
117     dbms_aq.dequeue(queue_name         => p_queue_name,
118                     dequeue_options    => l_dequeue_options,
119                     message_properties => x_message_properties, -- out
120                     payload            => x_payload,   -- out
121                     msgid              => x_msgid);             -- out
122 exception
123     when no_messages then
124         x_have_msg := false;
125     when snap_too_old then
126         if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
127             begin
128                 l_dequeue_options.navigation := dbms_aq.first_message;
129                 dbms_aq.dequeue(queue_name         => p_queue_name,
130                                 dequeue_options    => l_dequeue_options,
131                                 message_properties => x_message_properties, -- out
132                                 payload            => x_payload,   -- out
133                                 msgid              => x_msgid);             -- out
134             exception
135                 when no_messages then
136                     x_have_msg := false;
137             end;
138         else
139             raise;
140         end if;
141 end dequeue_jms_queue;
142 --------------------------------------------------------------------------------
143 procedure dequeue_evt_queue(p_queue_name in VARCHAR2,
144                             p_consumer_name in VARCHAR2,
145                             p_navigation    in binary_integer,
146                             p_correlation   in VARCHAR2,
147                             x_have_msg      out nocopy boolean)
148 is
149     l_dequeue_options     dbms_aq.dequeue_options_t;
150     x_message_properties  dbms_aq.message_properties_t;
151     x_payload             WF_EVENT_T;
152     x_msgid               RAW(16);
153     no_messages           exception;
154     pragma exception_init(no_messages, -25228);
155     snap_too_old exception;
156     pragma exception_init(snap_too_old, -1555);
157 begin
158     l_dequeue_options.consumer_name := p_consumer_name;
159     l_dequeue_options.wait          := dbms_aq.NO_WAIT;
160     l_dequeue_options.dequeue_mode  := dbms_aq.remove_nodata;
161     l_dequeue_options.navigation := p_navigation;
162     l_dequeue_options.correlation := p_correlation;
163 
164     x_have_msg := true;
165     dbms_aq.dequeue(queue_name         => p_queue_name,
166                     dequeue_options    => l_dequeue_options,
167                     message_properties => x_message_properties, -- out
168                     payload            => x_payload,   -- out
169                     msgid              => x_msgid);             -- out
170 exception
171     when no_messages then
172         x_have_msg := false;
173     when snap_too_old then
174         if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
175             begin
176                 l_dequeue_options.navigation := dbms_aq.first_message;
177                 dbms_aq.dequeue(queue_name         => p_queue_name,
178                                 dequeue_options    => l_dequeue_options,
179                                 message_properties => x_message_properties, -- out
180                                 payload            => x_payload,   -- out
181                                 msgid              => x_msgid);             -- out
182             exception
183                 when no_messages then
184                     x_have_msg := false;
185             end;
186         else
187             raise;
188         end if;
189 end dequeue_evt_queue;
190 --------------------------------------------------------------------------------
191 procedure purge_jms_queue(p_queue_name in VARCHAR2,
192                           p_consumer_name in VARCHAR2 default null,
193                           p_correlation in VARCHAR2 default null,
194                           p_commit_frequency in NUMBER default 100)
195 
196 is
197     x_have_msg boolean := true;
198     l_xcount   NUMBER  := 0;
199     l_navigation binary_integer := dbms_aq.first_message;
200 begin
201     while (x_have_msg) loop
202         dequeue_jms_queue(p_queue_name => p_queue_name,
203                           p_consumer_name => p_consumer_name,
204                           p_navigation    => l_navigation,
205                           p_correlation   => p_correlation,
206                           x_have_msg      => x_have_msg);
207         l_xcount := l_xcount + 1;
208         if (l_xcount >= p_commit_frequency) then
209             commit;
210             l_xcount := 0;
211         end if;
212         l_navigation := dbms_aq.next_message;
213     end loop;
214     commit;
215 end purge_jms_queue;
216 --------------------------------------------------------------------------------
217 procedure purge_evt_queue(p_queue_name in VARCHAR2,
218                           p_consumer_name in VARCHAR2 default null,
219                           p_correlation in VARCHAR2 default null,
220                           p_commit_frequency in NUMBER default 100)
221 
222 is
223     x_have_msg boolean := true;
224     l_xcount   NUMBER  := 0;
225     l_navigation binary_integer := dbms_aq.first_message;
226 begin
227     while (x_have_msg) loop
228         dequeue_evt_queue(p_queue_name => p_queue_name,
229                           p_consumer_name => p_consumer_name,
230                           p_navigation    => l_navigation,
231                           p_correlation   => p_correlation,
232                           x_have_msg      => x_have_msg);
233         l_xcount := l_xcount + 1;
234         if (l_xcount >= p_commit_frequency) then
235             commit;
236             l_xcount := 0;
237         end if;
238         l_navigation := dbms_aq.next_message;
239     end loop;
240     commit;
241 end purge_evt_queue;
242 ---------------------------------------------------------------------------------
243 -- Removes a subscriber from a queue.
244 --
245 -- p_owner - the owner (schema) of the queue
246 -- p_queue_name - the queue name
247 -- p_subscriber_name - the subscriber_name
248 -- p_status - the return status
249 --------------------------------------------------------------------------------
250 procedure remove_subscriber(p_owner           in varchar2,
251                             p_queue_name      in varchar2,
252                             p_subscriber_name in varchar2,
253                             p_status          out nocopy varchar2)
254 is
255 begin
256 
257    -- Purge the queue for this subscriber.
258    purge_jms_queue(p_queue_name => p_owner || '.' || p_queue_name,
259                    p_consumer_name => p_subscriber_name);
260    -- remove the subscriber from the queue
261 
262    dbms_aqadm.remove_subscriber(
263       queue_name => p_owner || '.' || p_queue_name,
264       subscriber => sys.aq$_agent(p_subscriber_name, null, null));
265 
266    -- mark the subscriber removed
267 
268    update wf_bes_subscriber_pings
269    set status = STATUS_REMOVED,
270        action_time = sysdate
271    where queue_name = p_queue_name
272    and subscriber_name = p_subscriber_name
273    and status = STATUS_PINGED;
274 
275    if (wf_log_pkg.level_procedure >= fnd_log.g_current_runtime_level) then
276       wf_log_pkg.string(wf_log_pkg.LEVEL_PROCEDURE,
277                        'wf.plsql.wf_bes_cleanup.remove_subscriber.done',
278                        'Removed subscriber '||p_owner ||'.'||p_queue_name||'.'||
279                        p_subscriber_name);
280    end if;
281    commit;
282    p_status := RETURN_SUCCESS;
283 exception
284    when others then
285       -- the attempt to remove the subscriber failed
286 
287       update wf_bes_subscriber_pings
288       set status = STATUS_REMOVE_FAILED,
289           action_time = sysdate
290       where queue_name = p_queue_name
291       and subscriber_name = p_subscriber_name
292       and status = STATUS_PINGED;
293 
294       if (wf_log_pkg.level_unexpected >= fnd_log.g_current_runtime_level) then
295          wf_log_pkg.string(wf_log_pkg.LEVEL_UNEXPECTED,
296                           'wf.plsql.wf_bes_cleanup.remove_subscriber.error',
297                           'Failed to remove subscriber ' || p_owner || '.' || p_queue_name || '.' ||
298                            p_subscriber_name);
299       end if;
300       commit;
301       p_status := RETURN_ERROR;
302 end remove_subscriber;
303 --------------------------------------------------------------------------------
304 PROCEDURE GetQueueDetails(p_agent_name in varchar2,
305                             x_queue_table OUT NOCOPY VARCHAR2,
306                             x_exception_queue OUT NOCOPY VARCHAR2,
307                             x_owner       OUT NOCOPY VARCHAR2,
308                             x_queue_name  OUT NOCOPY VARCHAR2)
309 is
310     l_queue_name     varchar2(80);
311     l_recipients     varchar2(30);
312     l_pos            number := 0;
313     l_name           varchar2(30) := null;
314     l_owner          varchar2(30) := null;
315     l_queue_table    VARCHAR2(30) := null;
316     l_exception_queue VARCHAR2(80) := null;
317 begin
318 
319     select queue_name
320     INTO   l_queue_name
321     from   wf_agents
322     where  name = p_agent_name
323     and    system_guid = WF_EVENT.local_system_guid;
324 
325     if(l_queue_name is not null) then
326       -- derive the queue name and the schema
330       if (l_pos > 0) then
327       l_pos := instr(l_queue_name,'.');
328       l_name := substr(l_queue_name, l_pos + 1);
329 
331         l_owner := substr(l_queue_name, 1, l_pos - 1);
332       else
333         -- if queue_name does not contain schema we will look in WF_SCHEMA
334         l_owner := wf_event.schema_name;
335       end if;
336 
337       SELECT queue_table
338       into l_queue_table
339       from all_queues
340       where owner = l_owner
341       and   name  = l_name;
342 
343       -- If default exception queue table, should be l_owner || '.AQ$_' || l_queue_table || '_E'
344       -- We can't select the queue name given the queue name and queue_type = 'EXCEPTION_QUEUE',
345       -- because it can have multiple result. Shall we have a column in WF_AGENTS for exception queue?
346       l_exception_queue := l_owner || '.AQ$_' || l_queue_table || '_E';
347       x_queue_table := l_queue_table;
348       x_exception_queue := l_exception_queue;
349       x_owner := l_owner;
350       x_queue_name  := l_name;
351     else
352         raise no_data_found;
353     end if;
354 end GetQueueDetails;
355 
356 --------------------------------------------------------------------------------
357 procedure cleanup_subscribers(errbuf  out nocopy varchar2,
358                               retcode out nocopy varchar2)
359 is
360    -- the dead subscribers
361 
362    -- A subscriber is dead if
363    -- 1. more than G_MIN_WAIT_TIME has elapsed since it was pinged and
364    -- 2. its status is still STATUS_PINGED
365 
366    -- If the subscriber were alive, it would have responded and its status
370       select distinct queue_name, subscriber_name
367    -- would be STATUS_RESPONDED.
368 
369    cursor dead_subscribers is
371       from wf_bes_subscriber_pings
372       where ping_time < sysdate - G_MIN_WAIT_TIME
373       and status = STATUS_PINGED;
374 
375    l_owner varchar2(30);
376    l_queue_name varchar2(30);
377    l_last_ping_time date;
378    l_ping_time date;
379    l_ping_number number;
380    l_remove_failed_count number;
381    l_remove_status varchar2(30);
382    l_subscribers dbms_aqadm.aq$_subscriber_list_t;
383    i integer;
384 begin
385    -- get the last ping time
386 
387    select max(ping_time)
388    into l_last_ping_time
389    from wf_bes_subscriber_pings;
390 
391    if(l_last_ping_time is null) then
392       -- wf_bes_subscriber_pings table is empty so set the last ping time to be very old
393 
394       l_last_ping_time := to_date('1900/01/01', 'YYYY/MM/DD');
395    end if;
396 
397    -- check the minimum wait time
398 
399    if(l_last_ping_time < sysdate - G_MIN_WAIT_TIME) then
400       -- the minimum wait time has elapsed so perform cleanup processing
401 
402       -- get the owner and queue name of the WF_CONTROL agent
403 
404       declare
405          l_qualified_queue_name wf_agents.queue_name%type;
406          j integer;
407       begin
408          select queue_name
409          into l_qualified_queue_name
410          from wf_agents
411          where name = 'WF_CONTROL'
412          and system_guid = hextoraw(wf_core.translate('WF_SYSTEM_GUID'));
413 
414          -- l_qualified_queue_name is of the form <schema>.<queue name>
415 
416          -- parse it into owner (schema) and queue name
417 
418          j := instr(l_qualified_queue_name, '.');
419 
420          l_owner := substr(l_qualified_queue_name, 1, j - 1);
421          l_queue_name := substr(l_qualified_queue_name, j + 1);
422       exception
423          when no_data_found then
424             -- WF_CONTROL agent does not exist
425 
426             errbuf := 'WF_CONTROL agent not found';
427 
428             retcode := RETURN_ERROR;
429 
430             return;
431       end;
432 
433 
434       -- remove the dead subscribers
435 
436       l_remove_failed_count := 0;
437 
438       for dead_subscriber in dead_subscribers loop
439          remove_subscriber(p_owner           => l_owner,
440                            p_queue_name      => dead_subscriber.queue_name,
441                            p_subscriber_name => dead_subscriber.subscriber_name,
442                            p_status          => l_remove_status);
443 
444          if(l_remove_status = RETURN_ERROR) then
445             l_remove_failed_count := l_remove_failed_count + 1;
446          end if;
447       end loop;
448 
449       -- After remove the subscribers, remove all the messages that got moved to exception queue
450       purge_jms_queue(p_queue_name => l_owner || '.AQ$_WF_CONTROL_E');
451       -- ping the current subscribers
452       l_subscribers := dbms_aqadm.queue_subscribers(l_owner || '.' || l_queue_name);
453 
454       if(l_subscribers.count() > 0) then
455          -- a subscriber exists
456 
457          -- get the next ping number
458 
459          select wf_bes_ping_number_s.nextval
460          into l_ping_number
461          from dual;
462 
463          -- get the ping time (current time)
464 
465          l_ping_time := sysdate;
466 
467          -- ping the current subscribers
468 
469          i := l_subscribers.first;
470 
471          while i is not null loop
472             ping_subscriber(p_ping_number     => l_ping_number,
473                             p_ping_time       => l_ping_time,
474                             p_queue_name      => l_queue_name,
475                             p_subscriber_name => l_subscribers(i).name);
476 
477             i := l_subscribers.next(i);
478          end loop;
479 
480          -- raise the ping event
481 
482          wf_event.raise(p_event_name => 'oracle.apps.wf.bes.control.ping',
483                         p_event_key  => l_ping_number);
484       end if;
485 
486       -- remove the data older than G_MAX_RETENTION_TIME
487 
488       delete
489       from wf_bes_subscriber_pings
490       where ping_time < sysdate - G_MAX_RETENTION_TIME;
491 
492       commit;
493 
494       if(l_remove_failed_count > 0) then
495          -- at least one dead subscriber could not be removed
496 
497          errbuf := 'Failed to remove ' || l_remove_failed_count || ' dead subscriber(s).';
498 
499          retcode := RETURN_WARNING;
500       else
501          -- normal completion
502 
503          retcode := RETURN_SUCCESS;
504       end if;
505    else
506       -- the minimum wait time has not yet elapsed so just return
507 
508       retcode := RETURN_SUCCESS;
509    end if;
510 end cleanup_subscribers;
511 
512 end wf_bes_cleanup;