DBA Data[Home] [Help]

FUNCTION: SYS.AQ$_GET_SUBSCRIBERS

Source


1 FUNCTION aq$_get_subscribers (
2   queue_schema   IN  VARCHAR2,
3   queue_name     IN  VARCHAR2,
4   queue_table    IN  VARCHAR2,
5   deq_user       IN  VARCHAR2,
6   queue_id       IN  BINARY_INTEGER,
7   qtab_flags     IN  BINARY_INTEGER) RETURN sys.aq$_subscriber_t PIPELINED IS
8 
9   sub80          aq$_subscribers;
10   sel_txt        VARCHAR2(1000);
11   type rt is	 REF CURSOR;
12   sqlrc		 rt;		  	 	-- ref cursor for sql statement
13   sub_name       VARCHAR2(30);
14   sub_addr       VARCHAR2(1024);
15   sub_proto      NUMBER;
16   sub_trans      VARCHAR2(65);
17   sub_trans_sch  VARCHAR2(30);
18   sub_trans_nm   VARCHAR2(30);
19   sub_type       NUMBER;
20   sub_rule       VARCHAR2(30); /*bug 648822: add rule_name for the subscriber*/
21   sub_id         NUMBER;
22   sub_bpos       NUMBER;
23 BEGIN
24   IF bitand(qtab_flags, 8) = 0 and bitand(qtab_flags, 67108864) = 0 THEN
25     -- 8.0 style queue, return all subscribers in aq$_queues
26     select subscribers INTO sub80 FROM system.aq$_queues
27     where  eventid = queue_id;
28 
29     IF sub80 IS NOT NULL and sub80.count > 0 THEN
30       FOR i IN sub80.first .. sub80.last LOOP
31        PIPE ROW (aq$_subscriber(sub80(i).name, sub80(i).address,
32                  sub80(i).protocol, null, 65, null, null, null));
33       END LOOP;
34     END  IF;
35   ElSIF bitand(qtab_flags, 4096) = 4096 and deq_user IS NOT NULL THEN
36     -- 8.1 style secure queue, join with agent mapping table
37     sel_txt := 'select qs.name, qs.address, qs.protocol, qs.trans_name, '
38                || ' qs.subscriber_type,  qs.rule_name from '
39                || 'dba_aq_agent_privs dp, '
40                || dbms_assert.enquote_name('"'||queue_schema||'"') || '.'
41                || dbms_assert.enquote_name('"AQ$_' || queue_table || '_S"')
42                || ' qs where dp.db_username = :1 and ' ||
43                'dp.agent_name = qs.name and bitand(qs.subscriber_type, 1)=1'
44                || ' and qs.queue_name = :2';
45     OPEN sqlrc FOR sel_txt using deq_user, queue_name;
46     LOOP
47       FETCH sqlrc INTO sub_name, sub_addr, sub_proto,sub_trans, sub_type, sub_rule;
48       EXIT WHEN sqlrc%NOTFOUND;
49       PIPE ROW (aq$_subscriber(sub_name, sub_addr, sub_proto, sub_trans,
50                                sub_type, sub_rule, null, null));
51     END LOOP;
52 
53   ELSIF bitand(qtab_flags, 67108864) = 67108864 THEN
54     -- 12c style sharded queue
55     sel_txt := 'select name, address, protocol, trans_owner,' ||
56                'trans_name , subscriber_type, ' ||
57                'rule_name, subscriber_id, pos_bitmap ' ||
58                'from SYS.AQ$_DURABLE_SUBS s ' ||
59                'WHERE queue_id = :1 and '||
60                'bitand(s.subscriber_type, 1)=1';
61     OPEN sqlrc FOR sel_txt using queue_id;
62     LOOP
63       FETCH sqlrc INTO sub_name, sub_addr, sub_proto, sub_trans_sch,
64                        sub_trans_nm,  sub_type, sub_rule, sub_id, sub_bpos;
65       if sub_trans_sch is not null then
66         sub_trans := dbms_assert.enquote_name(sub_trans_sch, FALSE) ||'.' ||
67                      dbms_assert.enquote_name(sub_trans_nm, FALSE);
68       end if;
69       EXIT WHEN sqlrc%NOTFOUND;
70       PIPE ROW (aq$_subscriber(sub_name, sub_addr, sub_proto,
71                                sub_trans, sub_type,
72                                sub_rule, sub_id, sub_bpos));
73     END LOOP;
74   ELSE
75     -- 8.1 style normal queue, return all subscribers
76     sel_txt := 'select name, address, protocol, trans_name, ' ||
77                'subscriber_type, rule_name from ' ||
78                dbms_assert.enquote_name('"'||queue_schema||'"') || '.' ||
79                dbms_assert.enquote_name('"AQ$_' || queue_table || '_S"') ||
80                ' where ' ||
81                'bitand(subscriber_type, 1)=1 and queue_name = :1';
82     OPEN sqlrc FOR sel_txt using queue_name;
83     LOOP
84       FETCH sqlrc INTO sub_name, sub_addr, sub_proto, sub_trans, sub_type, sub_rule;
85       EXIT WHEN sqlrc%NOTFOUND;
86       PIPE ROW (aq$_subscriber(sub_name, sub_addr, sub_proto, sub_trans,
87                                sub_type, sub_rule, null, null));
88     END LOOP;
89   END IF;
90   RETURN;
91 END;