[Home] [Help]
FUNCTION: SYS.AQ$_GET_SUBSCRIBERS
Source
1 FUNCTION aq$_get_subscribers (
2 queue_schema IN VARCHAR2,
3 queue_name IN VARCHAR2,
4 queue_table IN VARCHAR2,
5 deq_user IN VARCHAR2,
6 queue_id IN BINARY_INTEGER,
7 qtab_flags IN BINARY_INTEGER) RETURN sys.aq$_subscriber_t PIPELINED IS
8
9 sub80 aq$_subscribers;
10 sel_txt VARCHAR2(1000);
11 type rt is REF CURSOR;
12 sqlrc rt; -- ref cursor for sql statement
13 sub_name VARCHAR2(30);
14 sub_addr VARCHAR2(1024);
15 sub_proto NUMBER;
16 sub_trans VARCHAR2(65);
17 sub_trans_sch VARCHAR2(30);
18 sub_trans_nm VARCHAR2(30);
19 sub_type NUMBER;
20 sub_rule VARCHAR2(30); /*bug 648822: add rule_name for the subscriber*/
21 sub_id NUMBER;
22 sub_bpos NUMBER;
23 BEGIN
24 IF bitand(qtab_flags, 8) = 0 and bitand(qtab_flags, 67108864) = 0 THEN
25 -- 8.0 style queue, return all subscribers in aq$_queues
26 select subscribers INTO sub80 FROM system.aq$_queues
27 where eventid = queue_id;
28
29 IF sub80 IS NOT NULL and sub80.count > 0 THEN
30 FOR i IN sub80.first .. sub80.last LOOP
31 PIPE ROW (aq$_subscriber(sub80(i).name, sub80(i).address,
32 sub80(i).protocol, null, 65, null, null, null));
33 END LOOP;
34 END IF;
35 ElSIF bitand(qtab_flags, 4096) = 4096 and deq_user IS NOT NULL THEN
36 -- 8.1 style secure queue, join with agent mapping table
37 sel_txt := 'select qs.name, qs.address, qs.protocol, qs.trans_name, '
38 || ' qs.subscriber_type, qs.rule_name from '
39 || 'dba_aq_agent_privs dp, '
40 || dbms_assert.enquote_name('"'||queue_schema||'"') || '.'
41 || dbms_assert.enquote_name('"AQ$_' || queue_table || '_S"')
42 || ' qs where dp.db_username = :1 and ' ||
43 'dp.agent_name = qs.name and bitand(qs.subscriber_type, 1)=1'
44 || ' and qs.queue_name = :2';
45 OPEN sqlrc FOR sel_txt using deq_user, queue_name;
46 LOOP
47 FETCH sqlrc INTO sub_name, sub_addr, sub_proto,sub_trans, sub_type, sub_rule;
48 EXIT WHEN sqlrc%NOTFOUND;
49 PIPE ROW (aq$_subscriber(sub_name, sub_addr, sub_proto, sub_trans,
50 sub_type, sub_rule, null, null));
51 END LOOP;
52
53 ELSIF bitand(qtab_flags, 67108864) = 67108864 THEN
54 -- 12c style sharded queue
55 sel_txt := 'select name, address, protocol, trans_owner,' ||
56 'trans_name , subscriber_type, ' ||
57 'rule_name, subscriber_id, pos_bitmap ' ||
58 'from SYS.AQ$_DURABLE_SUBS s ' ||
59 'WHERE queue_id = :1 and '||
60 'bitand(s.subscriber_type, 1)=1';
61 OPEN sqlrc FOR sel_txt using queue_id;
62 LOOP
63 FETCH sqlrc INTO sub_name, sub_addr, sub_proto, sub_trans_sch,
64 sub_trans_nm, sub_type, sub_rule, sub_id, sub_bpos;
65 if sub_trans_sch is not null then
66 sub_trans := dbms_assert.enquote_name(sub_trans_sch, FALSE) ||'.' ||
67 dbms_assert.enquote_name(sub_trans_nm, FALSE);
68 end if;
69 EXIT WHEN sqlrc%NOTFOUND;
70 PIPE ROW (aq$_subscriber(sub_name, sub_addr, sub_proto,
71 sub_trans, sub_type,
72 sub_rule, sub_id, sub_bpos));
73 END LOOP;
74 ELSE
75 -- 8.1 style normal queue, return all subscribers
76 sel_txt := 'select name, address, protocol, trans_name, ' ||
77 'subscriber_type, rule_name from ' ||
78 dbms_assert.enquote_name('"'||queue_schema||'"') || '.' ||
79 dbms_assert.enquote_name('"AQ$_' || queue_table || '_S"') ||
80 ' where ' ||
81 'bitand(subscriber_type, 1)=1 and queue_name = :1';
82 OPEN sqlrc FOR sel_txt using queue_name;
83 LOOP
84 FETCH sqlrc INTO sub_name, sub_addr, sub_proto, sub_trans, sub_type, sub_rule;
85 EXIT WHEN sqlrc%NOTFOUND;
86 PIPE ROW (aq$_subscriber(sub_name, sub_addr, sub_proto, sub_trans,
87 sub_type, sub_rule, null, null));
88 END LOOP;
89 END IF;
90 RETURN;
91 END;