97: p_navigation in binary_integer,
98: p_correlation in VARCHAR2,
99: x_have_msg out nocopy boolean)
100: is
101: l_dequeue_options dbms_aq.dequeue_options_t;
102: x_message_properties dbms_aq.message_properties_t;
103: x_msgid RAW(16);
104: x_payload SYS.AQ$_JMS_TEXT_MESSAGE;
105: no_messages exception;
98: p_correlation in VARCHAR2,
99: x_have_msg out nocopy boolean)
100: is
101: l_dequeue_options dbms_aq.dequeue_options_t;
102: x_message_properties dbms_aq.message_properties_t;
103: x_msgid RAW(16);
104: x_payload SYS.AQ$_JMS_TEXT_MESSAGE;
105: no_messages exception;
106: pragma exception_init(no_messages, -25228);
107: snap_too_old exception;
108: pragma exception_init(snap_too_old, -1555);
109: begin
110: l_dequeue_options.consumer_name := p_consumer_name;
111: l_dequeue_options.wait := dbms_aq.NO_WAIT;
112: l_dequeue_options.dequeue_mode := dbms_aq.remove_nodata;
113: l_dequeue_options.navigation := p_navigation;
114: l_dequeue_options.correlation := p_correlation;
115:
108: pragma exception_init(snap_too_old, -1555);
109: begin
110: l_dequeue_options.consumer_name := p_consumer_name;
111: l_dequeue_options.wait := dbms_aq.NO_WAIT;
112: l_dequeue_options.dequeue_mode := dbms_aq.remove_nodata;
113: l_dequeue_options.navigation := p_navigation;
114: l_dequeue_options.correlation := p_correlation;
115:
116: x_have_msg := true;
113: l_dequeue_options.navigation := p_navigation;
114: l_dequeue_options.correlation := p_correlation;
115:
116: x_have_msg := true;
117: dbms_aq.dequeue(queue_name => p_queue_name,
118: dequeue_options => l_dequeue_options,
119: message_properties => x_message_properties, -- out
120: payload => x_payload, -- out
121: msgid => x_msgid); -- out
122: exception
123: when no_messages then
124: x_have_msg := false;
125: when snap_too_old then
126: if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
127: begin
128: l_dequeue_options.navigation := dbms_aq.first_message;
129: dbms_aq.dequeue(queue_name => p_queue_name,
130: dequeue_options => l_dequeue_options,
124: x_have_msg := false;
125: when snap_too_old then
126: if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
127: begin
128: l_dequeue_options.navigation := dbms_aq.first_message;
129: dbms_aq.dequeue(queue_name => p_queue_name,
130: dequeue_options => l_dequeue_options,
131: message_properties => x_message_properties, -- out
132: payload => x_payload, -- out
125: when snap_too_old then
126: if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
127: begin
128: l_dequeue_options.navigation := dbms_aq.first_message;
129: dbms_aq.dequeue(queue_name => p_queue_name,
130: dequeue_options => l_dequeue_options,
131: message_properties => x_message_properties, -- out
132: payload => x_payload, -- out
133: msgid => x_msgid); -- out
145: p_navigation in binary_integer,
146: p_correlation in VARCHAR2,
147: x_have_msg out nocopy boolean)
148: is
149: l_dequeue_options dbms_aq.dequeue_options_t;
150: x_message_properties dbms_aq.message_properties_t;
151: x_payload WF_EVENT_T;
152: x_msgid RAW(16);
153: no_messages exception;
146: p_correlation in VARCHAR2,
147: x_have_msg out nocopy boolean)
148: is
149: l_dequeue_options dbms_aq.dequeue_options_t;
150: x_message_properties dbms_aq.message_properties_t;
151: x_payload WF_EVENT_T;
152: x_msgid RAW(16);
153: no_messages exception;
154: pragma exception_init(no_messages, -25228);
155: snap_too_old exception;
156: pragma exception_init(snap_too_old, -1555);
157: begin
158: l_dequeue_options.consumer_name := p_consumer_name;
159: l_dequeue_options.wait := dbms_aq.NO_WAIT;
160: l_dequeue_options.dequeue_mode := dbms_aq.remove_nodata;
161: l_dequeue_options.navigation := p_navigation;
162: l_dequeue_options.correlation := p_correlation;
163:
156: pragma exception_init(snap_too_old, -1555);
157: begin
158: l_dequeue_options.consumer_name := p_consumer_name;
159: l_dequeue_options.wait := dbms_aq.NO_WAIT;
160: l_dequeue_options.dequeue_mode := dbms_aq.remove_nodata;
161: l_dequeue_options.navigation := p_navigation;
162: l_dequeue_options.correlation := p_correlation;
163:
164: x_have_msg := true;
161: l_dequeue_options.navigation := p_navigation;
162: l_dequeue_options.correlation := p_correlation;
163:
164: x_have_msg := true;
165: dbms_aq.dequeue(queue_name => p_queue_name,
166: dequeue_options => l_dequeue_options,
167: message_properties => x_message_properties, -- out
168: payload => x_payload, -- out
169: msgid => x_msgid); -- out
170: exception
171: when no_messages then
172: x_have_msg := false;
173: when snap_too_old then
174: if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
175: begin
176: l_dequeue_options.navigation := dbms_aq.first_message;
177: dbms_aq.dequeue(queue_name => p_queue_name,
178: dequeue_options => l_dequeue_options,
172: x_have_msg := false;
173: when snap_too_old then
174: if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
175: begin
176: l_dequeue_options.navigation := dbms_aq.first_message;
177: dbms_aq.dequeue(queue_name => p_queue_name,
178: dequeue_options => l_dequeue_options,
179: message_properties => x_message_properties, -- out
180: payload => x_payload, -- out
173: when snap_too_old then
174: if (p_navigation = DBMS_AQ.NEXT_MESSAGE) then
175: begin
176: l_dequeue_options.navigation := dbms_aq.first_message;
177: dbms_aq.dequeue(queue_name => p_queue_name,
178: dequeue_options => l_dequeue_options,
179: message_properties => x_message_properties, -- out
180: payload => x_payload, -- out
181: msgid => x_msgid); -- out
195:
196: is
197: x_have_msg boolean := true;
198: l_xcount NUMBER := 0;
199: l_navigation binary_integer := dbms_aq.first_message;
200: begin
201: while (x_have_msg) loop
202: dequeue_jms_queue(p_queue_name => p_queue_name,
203: p_consumer_name => p_consumer_name,
208: if (l_xcount >= p_commit_frequency) then
209: commit;
210: l_xcount := 0;
211: end if;
212: l_navigation := dbms_aq.next_message;
213: end loop;
214: commit;
215: end purge_jms_queue;
216: --------------------------------------------------------------------------------
221:
222: is
223: x_have_msg boolean := true;
224: l_xcount NUMBER := 0;
225: l_navigation binary_integer := dbms_aq.first_message;
226: begin
227: while (x_have_msg) loop
228: dequeue_evt_queue(p_queue_name => p_queue_name,
229: p_consumer_name => p_consumer_name,
234: if (l_xcount >= p_commit_frequency) then
235: commit;
236: l_xcount := 0;
237: end if;
238: l_navigation := dbms_aq.next_message;
239: end loop;
240: commit;
241: end purge_evt_queue;
242: ---------------------------------------------------------------------------------
258: purge_jms_queue(p_queue_name => p_owner || '.' || p_queue_name,
259: p_consumer_name => p_subscriber_name);
260: -- remove the subscriber from the queue
261:
262: dbms_aqadm.remove_subscriber(
263: queue_name => p_owner || '.' || p_queue_name,
264: subscriber => sys.aq$_agent(p_subscriber_name, null, null));
265:
266: -- mark the subscriber removed
378: l_ping_time date;
379: l_ping_number number;
380: l_remove_failed_count number;
381: l_remove_status varchar2(30);
382: l_subscribers dbms_aqadm.aq$_subscriber_list_t;
383: i integer;
384: begin
385: -- get the last ping time
386:
448:
449: -- After remove the subscribers, remove all the messages that got moved to exception queue
450: purge_jms_queue(p_queue_name => l_owner || '.AQ$_WF_CONTROL_E');
451: -- ping the current subscribers
452: l_subscribers := dbms_aqadm.queue_subscribers(l_owner || '.' || l_queue_name);
453:
454: if(l_subscribers.count() > 0) then
455: -- a subscriber exists
456: