DBA Data[Home] [Help]

PACKAGE BODY: APPS.JTF_FM_AQ_WRAPPER_PVT

Source


1 PACKAGE BODY JTF_FM_AQ_WRAPPER_PVT AS
2 /* $Header: jtfvaqb.pls 120.1.12020000.2 2013/02/27 05:48:53 sariff ship $*/
3 
4 ---------------------------------------------------------------
5 -- PROCEDURE
6 --    Enqueue
7 --
8 -- HISTORY
9 --    12/10/99  xqin  Create.
10 ---------------------------------------------------------------
11 PROCEDURE Enqueue
12 (
13     queue_name     IN VARCHAR2,
14     message_in     IN RAW,
15     priority       IN NUMBER,
16     message_handle OUT NOCOPY RAW
17 )
18 IS
19     enqueue_options     dbms_aq.enqueue_options_t;
20     message_properties  dbms_aq.message_properties_t;
21 
22 BEGIN
23     message_properties.priority := priority;
24 
25     DBMS_AQ.ENQUEUE(queue_name  => queue_name,
26              enqueue_options    => enqueue_options,
27              message_properties => message_properties,
28                        payload  => message_in,
29                         msgid   => message_handle);
30 
31     COMMIT;
32 END Enqueue;
33 
34 ---------------------------------------------------------------
35 -- PROCEDURE
36 --    Dequeue
37 --
38 -- HISTORY
39 --    12/10/99  xqin  Create.
40 ---------------------------------------------------------------
41 PROCEDURE Dequeue
42 (
43     queue_name     IN VARCHAR2,
44     waiting_time   IN NUMBER,
45     message_handle IN RAW,
46     message_out    OUT NOCOPY RAW
47 )
48 IS
49     dequeue_options     DBMS_AQ.dequeue_options_t;
50     message_properties  DBMS_AQ.message_properties_t;
51     message_handle2     RAW(16);
52 
53 BEGIN
54     dequeue_options.wait := waiting_time;
55     dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
56     dequeue_options.msgid := message_handle;
57     dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
58 
59     DBMS_AQ.DEQUEUE(queue_name => queue_name,
60             dequeue_options    => dequeue_options,
61             message_properties => message_properties,
62             payload            => message_out,
63             msgid              => message_handle2);
64 
65 
66 	DELETE FROM JTF_FM_REQUESTS_AQ where AQ_MSG_ID = message_handle2;
67 
68     COMMIT;
69 END Dequeue;
70 
71 ---------------------------------------------------------------
72 -- PROCEDURE
73 --    INSERT_JTF_FM_REQUEST_AQ
74 --
75 -- HISTORY
76 --    07/11/03 SK  Created.
77 ---------------------------------------------------------------
78 PROCEDURE INSERT_JTF_FM_REQUEST_AQ
79 (p_request_id NUMBER,
80  p_aq_msg_id  RAW,
81  p_queue_type VARCHAR2)
82 
83 IS
84 
85 BEGIN
86     INSERT INTO JTF_FM_REQUESTS_AQ (
87     REQUEST_ID, AQ_MSG_ID, QUEUE_TYPE,
88     CREATED_BY, CREATION_DATE, LAST_UPDATED_BY,
89     LAST_UPDATE_DATE, LAST_UPDATE_LOGIN)
90     VALUES ( p_request_id,p_aq_msg_id ,p_queue_type ,
91     FND_GLOBAL.USER_ID,SYSDATE ,FND_GLOBAL.USER_ID,
92     SYSDATE,FND_GLOBAL.LOGIN_ID  );
93 
94 END INSERT_JTF_FM_REQUEST_AQ;
95 
96 ---------------------------------------------------------------
97 -- PROCEDURE
98 --    Enqueue_Segment
99 --
100 -- HISTORY
101 --    07/10/03 SK  Created.
102 ---------------------------------------------------------------
103 PROCEDURE Enqueue_Segment
104 (
105     queue_name     IN VARCHAR2,
106     message_in     IN RAW,
107     priority       IN NUMBER,
108     message_handle OUT NOCOPY RAW,
109 	request_id     IN NUMBER
110 )
111 
112 IS
113 
114    CURSOR CSTATUS( request_id NUMBER) IS
115      SELECT outcome_code, SERVER_ID
116      FROM JTF_FM_REQUEST_HISTORY_ALL
117      WHERE HIST_REQ_ID = request_id;
118 
119 
120 
121    CURSOR CQUEUE( server_id NUMBER) IS
122      SELECT BATCH_PAUSE_Q
123      FROM JTF_FM_SERVICE_ALL
124      WHERE SERVER_ID = server_id;
125 
126      l_request_status VARCHAR2(50);
127      l_server_id  NUMBER;
128      l_queue_name VARCHAR2(50);
129 
130 
131      enqueue_options     dbms_aq.enqueue_options_t;
132      message_properties  dbms_aq.message_properties_t;
133 
134 BEGIN
135     message_properties.priority := priority;
136 
137 
138 	OPEN CSTATUS(request_id);
139 	FETCH  CSTATUS INTO l_request_status, l_server_id ;
140     CLOSE CSTATUS;
141 
142 	OPEN CQUEUE(l_server_id);
143 	FETCH  CQUEUE INTO l_queue_name ;
144     CLOSE CQUEUE;
145 
146 	IF l_request_status = 'PAUSED' THEN
147 
148     DBMS_AQ.ENQUEUE(queue_name  => l_queue_name,
149              enqueue_options    => enqueue_options,
150              message_properties => message_properties,
151                        payload  => message_in,
152                         msgid   => message_handle);
153 
154 	INSERT_JTF_FM_REQUEST_AQ(request_id,message_handle ,'BP');
155 
156 
157 
158 	ELSIF	l_request_status = 'CANCELED' THEN
159 	-- Don't bother to enqueue
160 	    NULL;
161 	ELSE
162 	    DBMS_AQ.ENQUEUE(queue_name  => queue_name,
163                  enqueue_options    => enqueue_options,
164                  message_properties => message_properties,
165                            payload  => message_in,
166                             msgid   => message_handle);
167 
168 	INSERT_JTF_FM_REQUEST_AQ(request_id,message_handle ,'B');
169 
170 
171 	END IF;
172 
173     COMMIT;
174 END Enqueue_Segment;
175 
176 END JTF_FM_AQ_WRAPPER_PVT;