1 package body wf_bes_cleanup as
2 /* $Header: WFBESCUB.pls 120.4 2005/11/07 06:33:28 nravindr noship $ */
3
4 -- the maximum amount of time pings will be retained (days)
5
6 G_MAX_RETENTION_TIME constant number := 30; -- 30 days
7
8 -- the minimum amount of time which must elapse before this procedure will
9 -- run again (days)
10
11 G_MIN_WAIT_TIME constant number := 30/(60*24); -- 30 min
12
13 -- status to indicate that the subscriber was pinged
14
15 STATUS_PINGED constant varchar2(30) := 'PINGED';
16
17 -- status to indicate that a subscriber responded to a ping
18
19 STATUS_RESPONDED constant varchar2(30) := 'RESPONDED';
20
21 -- status to indicate that a subscriber was removed
22
23 STATUS_REMOVED constant varchar2(30) := 'REMOVED';
24
25 -- status to indicate that an attempt to remove a subscriber failed
26
27 STATUS_REMOVE_FAILED constant varchar2(30) := 'REMOVE_FAILED';
28
29 -- return code to indicate success
30
31 RETURN_SUCCESS constant varchar2(30) := 0;
32
33 -- return code to indicate warning
34
35 RETURN_WARNING constant varchar2(30) := 1;
36
37 -- return code to indicate error
38
39 RETURN_ERROR constant varchar2(30) := 2;
40
41 --------------------------------------------------------------------------------
42 procedure acknowledge_ping(p_ping_number in number,
43 p_queue_name in varchar2,
44 p_subscriber_name in varchar2)
45 is
46 PRAGMA AUTONOMOUS_TRANSACTION;
47 begin
48 update wf_bes_subscriber_pings
49 set status = STATUS_RESPONDED,
50 action_time = sysdate
51 where ping_number = p_ping_number
52 and queue_name = p_queue_name
53 and subscriber_name = p_subscriber_name;
54
55 commit;
56 exception
57 when others then
58 rollback;
59 end acknowledge_ping;
60
61 --------------------------------------------------------------------------------
62 -- Pings a subscriber.
63 --
64 -- p_ping_number - the ping number
65 -- p_ping_time - the ping time
66 -- p_queue_name - the queue name
67 -- p_subscriber_name - the subscriber_name
68 --------------------------------------------------------------------------------
69 procedure ping_subscriber(p_ping_number in number,
70 p_ping_time in date,
71 p_queue_name in varchar2,
72 p_subscriber_name in varchar2)
73 is
74 begin
75 insert into wf_bes_subscriber_pings
76 (
77 ping_number,
78 ping_time,
79 queue_name,
80 subscriber_name,
81 status,
82 action_time
83 )
84 values
85 (
86 p_ping_number,
87 p_ping_time,
88 p_queue_name,
89 p_subscriber_name,
90 STATUS_PINGED,
91 p_ping_time
92 );
93 end ping_subscriber;
94 --------------------------------------------------------------------------------
95 procedure dequeue_jms_queue(p_queue_name in VARCHAR2,
96 p_consumer_name in VARCHAR2,
97 p_navigation in binary_integer,
98 p_correlation in VARCHAR2,
99 x_have_msg out nocopy boolean)
100 is
101 l_dequeue_options dbms_aq.dequeue_options_t;
102 x_message_properties dbms_aq.message_properties_t;
103 x_msgid RAW(16);
104 x_payload SYS.AQ$_JMS_TEXT_MESSAGE;
105 no_messages exception;
106 pragma exception_init(no_messages, -25228);
107 snap_too_old exception;
108 pragma exception_init(snap_too_old, -1555);
109 begin
110 l_dequeue_options.consumer_name := p_consumer_name;
111 l_dequeue_options.wait := dbms_aq.NO_WAIT;
112 l_dequeue_options.dequeue_mode := dbms_aq.remove_nodata;
113 l_dequeue_options.navigation := p_navigation;
114 l_dequeue_options.correlation := p_correlation;
115
116 x_have_msg := true;
117 dbms_aq.dequeue(queue_name => p_queue_name,
118 dequeue_options => l_dequeue_options,
119 message_properties => x_message_properties, -- out
120 payload => x_payload, -- out
121 msgid => x_msgid); -- out
122 exception
123 when no_messages then
124 x_have_msg := false;
125 when snap_too_old then
126 if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
127 begin
128 l_dequeue_options.navigation := dbms_aq.first_message;
129 dbms_aq.dequeue(queue_name => p_queue_name,
130 dequeue_options => l_dequeue_options,
131 message_properties => x_message_properties, -- out
132 payload => x_payload, -- out
133 msgid => x_msgid); -- out
134 exception
135 when no_messages then
136 x_have_msg := false;
137 end;
138 else
139 raise;
140 end if;
141 end dequeue_jms_queue;
142 --------------------------------------------------------------------------------
143 procedure dequeue_evt_queue(p_queue_name in VARCHAR2,
144 p_consumer_name in VARCHAR2,
145 p_navigation in binary_integer,
146 p_correlation in VARCHAR2,
147 x_have_msg out nocopy boolean)
148 is
149 l_dequeue_options dbms_aq.dequeue_options_t;
150 x_message_properties dbms_aq.message_properties_t;
151 x_payload WF_EVENT_T;
152 x_msgid RAW(16);
153 no_messages exception;
154 pragma exception_init(no_messages, -25228);
155 snap_too_old exception;
156 pragma exception_init(snap_too_old, -1555);
157 begin
158 l_dequeue_options.consumer_name := p_consumer_name;
159 l_dequeue_options.wait := dbms_aq.NO_WAIT;
160 l_dequeue_options.dequeue_mode := dbms_aq.remove_nodata;
161 l_dequeue_options.navigation := p_navigation;
162 l_dequeue_options.correlation := p_correlation;
163
164 x_have_msg := true;
165 dbms_aq.dequeue(queue_name => p_queue_name,
166 dequeue_options => l_dequeue_options,
167 message_properties => x_message_properties, -- out
168 payload => x_payload, -- out
169 msgid => x_msgid); -- out
170 exception
171 when no_messages then
172 x_have_msg := false;
173 when snap_too_old then
174 if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
175 begin
176 l_dequeue_options.navigation := dbms_aq.first_message;
177 dbms_aq.dequeue(queue_name => p_queue_name,
178 dequeue_options => l_dequeue_options,
179 message_properties => x_message_properties, -- out
180 payload => x_payload, -- out
181 msgid => x_msgid); -- out
182 exception
183 when no_messages then
184 x_have_msg := false;
185 end;
186 else
187 raise;
188 end if;
189 end dequeue_evt_queue;
190 --------------------------------------------------------------------------------
191 procedure purge_jms_queue(p_queue_name in VARCHAR2,
192 p_consumer_name in VARCHAR2 default null,
193 p_correlation in VARCHAR2 default null,
194 p_commit_frequency in NUMBER default 100)
195
196 is
197 x_have_msg boolean := true;
198 l_xcount NUMBER := 0;
199 l_navigation binary_integer := dbms_aq.first_message;
200 begin
201 while (x_have_msg) loop
202 dequeue_jms_queue(p_queue_name => p_queue_name,
203 p_consumer_name => p_consumer_name,
204 p_navigation => l_navigation,
205 p_correlation => p_correlation,
206 x_have_msg => x_have_msg);
207 l_xcount := l_xcount + 1;
208 if (l_xcount >= p_commit_frequency) then
209 commit;
210 l_xcount := 0;
211 end if;
212 l_navigation := dbms_aq.next_message;
213 end loop;
214 commit;
215 end purge_jms_queue;
216 --------------------------------------------------------------------------------
217 procedure purge_evt_queue(p_queue_name in VARCHAR2,
218 p_consumer_name in VARCHAR2 default null,
219 p_correlation in VARCHAR2 default null,
220 p_commit_frequency in NUMBER default 100)
221
222 is
223 x_have_msg boolean := true;
224 l_xcount NUMBER := 0;
225 l_navigation binary_integer := dbms_aq.first_message;
226 begin
227 while (x_have_msg) loop
228 dequeue_evt_queue(p_queue_name => p_queue_name,
229 p_consumer_name => p_consumer_name,
230 p_navigation => l_navigation,
231 p_correlation => p_correlation,
232 x_have_msg => x_have_msg);
233 l_xcount := l_xcount + 1;
234 if (l_xcount >= p_commit_frequency) then
235 commit;
236 l_xcount := 0;
237 end if;
238 l_navigation := dbms_aq.next_message;
239 end loop;
240 commit;
241 end purge_evt_queue;
242 ---------------------------------------------------------------------------------
243 -- Removes a subscriber from a queue.
244 --
245 -- p_owner - the owner (schema) of the queue
246 -- p_queue_name - the queue name
247 -- p_subscriber_name - the subscriber_name
248 -- p_status - the return status
249 --------------------------------------------------------------------------------
250 procedure remove_subscriber(p_owner in varchar2,
251 p_queue_name in varchar2,
252 p_subscriber_name in varchar2,
253 p_status out nocopy varchar2)
254 is
255 begin
256
257 -- Purge the queue for this subscriber.
258 purge_jms_queue(p_queue_name => p_owner || '.' || p_queue_name,
259 p_consumer_name => p_subscriber_name);
260 -- remove the subscriber from the queue
261
262 dbms_aqadm.remove_subscriber(
263 queue_name => p_owner || '.' || p_queue_name,
264 subscriber => sys.aq$_agent(p_subscriber_name, null, null));
265
266 -- mark the subscriber removed
267
268 update wf_bes_subscriber_pings
269 set status = STATUS_REMOVED,
270 action_time = sysdate
271 where queue_name = p_queue_name
272 and subscriber_name = p_subscriber_name
273 and status = STATUS_PINGED;
274
275 if (wf_log_pkg.level_procedure >= fnd_log.g_current_runtime_level) then
276 wf_log_pkg.string(wf_log_pkg.LEVEL_PROCEDURE,
277 'wf.plsql.wf_bes_cleanup.remove_subscriber.done',
278 'Removed subscriber '||p_owner ||'.'||p_queue_name||'.'||
279 p_subscriber_name);
280 end if;
281 commit;
282 p_status := RETURN_SUCCESS;
283 exception
284 when others then
285 -- the attempt to remove the subscriber failed
286
287 update wf_bes_subscriber_pings
288 set status = STATUS_REMOVE_FAILED,
289 action_time = sysdate
290 where queue_name = p_queue_name
291 and subscriber_name = p_subscriber_name
292 and status = STATUS_PINGED;
293
294 if (wf_log_pkg.level_unexpected >= fnd_log.g_current_runtime_level) then
295 wf_log_pkg.string(wf_log_pkg.LEVEL_UNEXPECTED,
296 'wf.plsql.wf_bes_cleanup.remove_subscriber.error',
297 'Failed to remove subscriber ' || p_owner || '.' || p_queue_name || '.' ||
298 p_subscriber_name);
299 end if;
300 commit;
301 p_status := RETURN_ERROR;
302 end remove_subscriber;
303 --------------------------------------------------------------------------------
304 PROCEDURE GetQueueDetails(p_agent_name in varchar2,
305 x_queue_table OUT NOCOPY VARCHAR2,
306 x_exception_queue OUT NOCOPY VARCHAR2,
307 x_owner OUT NOCOPY VARCHAR2,
308 x_queue_name OUT NOCOPY VARCHAR2)
309 is
310 l_queue_name varchar2(80);
311 l_recipients varchar2(30);
312 l_pos number := 0;
313 l_name varchar2(30) := null;
314 l_owner varchar2(30) := null;
315 l_queue_table VARCHAR2(30) := null;
316 l_exception_queue VARCHAR2(80) := null;
317 begin
318
319 select queue_name
320 INTO l_queue_name
321 from wf_agents
322 where name = p_agent_name
323 and system_guid = WF_EVENT.local_system_guid;
324
325 if(l_queue_name is not null) then
326 -- derive the queue name and the schema
330 if (l_pos > 0) then
327 l_pos := instr(l_queue_name,'.');
328 l_name := substr(l_queue_name, l_pos + 1);
329
331 l_owner := substr(l_queue_name, 1, l_pos - 1);
332 else
333 -- if queue_name does not contain schema we will look in WF_SCHEMA
334 l_owner := wf_event.schema_name;
335 end if;
336
337 SELECT queue_table
338 into l_queue_table
339 from all_queues
340 where owner = l_owner
341 and name = l_name;
342
343 -- If default exception queue table, should be l_owner || '.AQ$_' || l_queue_table || '_E'
344 -- We can't select the queue name given the queue name and queue_type = 'EXCEPTION_QUEUE',
345 -- because it can have multiple result. Shall we have a column in WF_AGENTS for exception queue?
346 l_exception_queue := l_owner || '.AQ$_' || l_queue_table || '_E';
347 x_queue_table := l_queue_table;
348 x_exception_queue := l_exception_queue;
349 x_owner := l_owner;
350 x_queue_name := l_name;
351 else
352 raise no_data_found;
353 end if;
354 end GetQueueDetails;
355
356 --------------------------------------------------------------------------------
357 procedure cleanup_subscribers(errbuf out nocopy varchar2,
358 retcode out nocopy varchar2)
359 is
360 -- the dead subscribers
361
362 -- A subscriber is dead if
363 -- 1. more than G_MIN_WAIT_TIME has elapsed since it was pinged and
364 -- 2. its status is still STATUS_PINGED
365
366 -- If the subscriber were alive, it would have responded and its status
370 select distinct queue_name, subscriber_name
367 -- would be STATUS_RESPONDED.
368
369 cursor dead_subscribers is
371 from wf_bes_subscriber_pings
372 where ping_time < sysdate - G_MIN_WAIT_TIME
373 and status = STATUS_PINGED;
374
375 l_owner varchar2(30);
376 l_queue_name varchar2(30);
377 l_last_ping_time date;
378 l_ping_time date;
379 l_ping_number number;
380 l_remove_failed_count number;
381 l_remove_status varchar2(30);
382 l_subscribers dbms_aqadm.aq$_subscriber_list_t;
383 i integer;
384 begin
385 -- get the last ping time
386
387 select max(ping_time)
388 into l_last_ping_time
389 from wf_bes_subscriber_pings;
390
391 if(l_last_ping_time is null) then
392 -- wf_bes_subscriber_pings table is empty so set the last ping time to be very old
393
394 l_last_ping_time := to_date('1900/01/01', 'YYYY/MM/DD');
395 end if;
396
397 -- check the minimum wait time
398
399 if(l_last_ping_time < sysdate - G_MIN_WAIT_TIME) then
400 -- the minimum wait time has elapsed so perform cleanup processing
401
402 -- get the owner and queue name of the WF_CONTROL agent
403
404 declare
405 l_qualified_queue_name wf_agents.queue_name%type;
406 j integer;
407 begin
408 select queue_name
409 into l_qualified_queue_name
410 from wf_agents
411 where name = 'WF_CONTROL'
412 and system_guid = hextoraw(wf_core.translate('WF_SYSTEM_GUID'));
413
414 -- l_qualified_queue_name is of the form <schema>.<queue name>
415
416 -- parse it into owner (schema) and queue name
417
418 j := instr(l_qualified_queue_name, '.');
419
420 l_owner := substr(l_qualified_queue_name, 1, j - 1);
421 l_queue_name := substr(l_qualified_queue_name, j + 1);
422 exception
423 when no_data_found then
424 -- WF_CONTROL agent does not exist
425
426 errbuf := 'WF_CONTROL agent not found';
427
428 retcode := RETURN_ERROR;
429
430 return;
431 end;
432
433
434 -- remove the dead subscribers
435
436 l_remove_failed_count := 0;
437
438 for dead_subscriber in dead_subscribers loop
439 remove_subscriber(p_owner => l_owner,
440 p_queue_name => dead_subscriber.queue_name,
441 p_subscriber_name => dead_subscriber.subscriber_name,
442 p_status => l_remove_status);
443
444 if(l_remove_status = RETURN_ERROR) then
445 l_remove_failed_count := l_remove_failed_count + 1;
446 end if;
447 end loop;
448
449 -- After remove the subscribers, remove all the messages that got moved to exception queue
450 purge_jms_queue(p_queue_name => l_owner || '.AQ$_WF_CONTROL_E');
451 -- ping the current subscribers
452 l_subscribers := dbms_aqadm.queue_subscribers(l_owner || '.' || l_queue_name);
453
454 if(l_subscribers.count() > 0) then
455 -- a subscriber exists
456
457 -- get the next ping number
458
459 select wf_bes_ping_number_s.nextval
460 into l_ping_number
461 from dual;
462
463 -- get the ping time (current time)
464
465 l_ping_time := sysdate;
466
467 -- ping the current subscribers
468
469 i := l_subscribers.first;
470
471 while i is not null loop
472 ping_subscriber(p_ping_number => l_ping_number,
473 p_ping_time => l_ping_time,
474 p_queue_name => l_queue_name,
475 p_subscriber_name => l_subscribers(i).name);
476
477 i := l_subscribers.next(i);
478 end loop;
479
480 -- raise the ping event
481
482 wf_event.raise(p_event_name => 'oracle.apps.wf.bes.control.ping',
483 p_event_key => l_ping_number);
484 end if;
485
486 -- remove the data older than G_MAX_RETENTION_TIME
487
488 delete
489 from wf_bes_subscriber_pings
490 where ping_time < sysdate - G_MAX_RETENTION_TIME;
491
492 commit;
493
494 if(l_remove_failed_count > 0) then
495 -- at least one dead subscriber could not be removed
496
497 errbuf := 'Failed to remove ' || l_remove_failed_count || ' dead subscriber(s).';
498
499 retcode := RETURN_WARNING;
500 else
501 -- normal completion
502
503 retcode := RETURN_SUCCESS;
504 end if;
505 else
506 -- the minimum wait time has not yet elapsed so just return
507
508 retcode := RETURN_SUCCESS;
509 end if;
510 end cleanup_subscribers;
511
512 end wf_bes_cleanup;