1 PACKAGE BODY CSI_TRANSACTION_IMPORT_PVT as
2 /* $Header: CSIVTXPB.pls 120.0.12000000.2 2007/07/11 17:03:00 ngoutam noship $*/
3
4 -- Start of comments
5 -- API name : PROCESS_TRANSACTION_ROWS
6 -- Type : Private
7 -- Function :
8 -- Pre-reqs : None.
9 -- Parameters :
10 -- IN p_max_worker_number IN NUMBER := 10
11
12 -- OUT ERRBUF OUT VARCHAR2,
13 -- RETCODE OUT VARCHAR2
14 --
15 -- Version Initial version 1.0 Himal Karmacharya
16 --
17 -- End of comments
18
19 procedure debug(p_message in varchar2) is
20 begin
21 IF fnd_log.level_statement >= fnd_log.g_current_runtime_level THEN
22 --csi_t_gen_utility_pvt.add(p_message);
23 fnd_log.string(fnd_log.level_statement, 'csi.plsql.csi_transaction_import_pvt.process_transaction_rows',p_message);
24 END IF;
25 end debug;
26
27 FUNCTION group_rows(p_num_workers in number) RETURN NUMBER IS
28 l_num_rows NUMBER;
29 l_seq NUMBER := 0;
30 l_seq_incr NUMBER;
31 l_header_count number;
32 l_old_sequence_val number := 0;
33 l_schema varchar2(30);
34 l_status varchar2(1);
35 l_industry varchar2(1);
36
37 TYPE header_id_array_type is TABLE OF CSI_BATCH_TXN_LINES.ORDER_HEADER_ID%TYPE ;
38 l_header_id header_id_array_type;
39 l_limit number;
40
41 CURSOR l_header_cur IS
42 SELECT DISTINCT cbtl.order_header_id
43 FROM csi_batch_txn_lines cbtl
44 WHERE batch_id = -1
45 AND NOT EXISTS (SELECT 'x' --do not assign a batch_id to an order that is
46 -- being processed
47 FROM csi_batch_txn_lines cbtl2
48 WHERE cbtl2.order_header_id = cbtl.order_header_id
49 AND cbtl2.processed_flag IN (1,2)
50 AND cbtl2.batch_id <> -1)
51 ORDER BY order_header_id;
52
53 BEGIN
54
55 SELECT csi_batch_txn_lines_s.nextval
56 INTO l_seq
57 FROM dual;
58
59 OPEN l_header_cur;
60 FETCH l_header_cur BULK COLLECT INTO l_header_id LIMIT 100*p_num_workers;
61 CLOSE l_header_cur;
62
63 debug('Order Headers that will be batched: ' || l_header_id.count);
64 if (l_header_id.count > 0) then
65
66 FORALL i in l_header_id.first..l_header_id.last
67 UPDATE csi_batch_txn_lines cbtl
68 SET batch_id = l_seq + MOD(l_header_id(i), p_num_workers)
69 WHERE order_header_id = l_header_id(i);
70
71 debug('Assigned batch_id to ' || SQL%ROWCOUNT || ' order lines');
72
73 commit;
74 debug(' Old Sequence Value '||l_seq);
75 l_old_sequence_val := l_seq;
76
77 IF NOT FND_INSTALLATION.GET_APP_INFO('CSI', l_status, l_industry, l_schema) THEN
78 RAISE FND_API.G_EXC_UNEXPECTED_ERROR;
79 END IF;
80
81 --Adjust sequence
82 l_seq_incr := p_num_workers;
83 EXECUTE IMMEDIATE 'ALTER SEQUENCE '||l_schema||'.CSI_BATCH_TXN_LINES_S INCREMENT BY '||TO_CHAR(l_seq_incr);
84 SELECT csi_batch_txn_lines_s.nextval
85 INTO l_seq
86 FROM dual;
87 EXECUTE IMMEDIATE 'ALTER SEQUENCE '||l_schema||'.CSI_BATCH_TXN_LINES_S INCREMENT BY 1';
88
89 debug(' New Sequence Value '||l_seq);
90 end if;
91
92 RETURN l_old_sequence_val;
93
94 exception
95 when no_data_found then
96 return l_old_sequence_val;
97 END group_rows;
98
99 PROCEDURE PROCESS_TRANSACTION_ROWS
100 (ERRBUF OUT NOCOPY VARCHAR2,
101 RETCODE OUT NOCOPY VARCHAR2,
102 p_max_worker_number IN NUMBER := 10
103 ,p_purge_option IN VARCHAR2
104 ) IS
105
106 l_retcode Number;
107 CONC_STATUS BOOLEAN;
108 l_process boolean := FALSE;
109 l_num_workers number := 0;
110 l_req_id number;
111 l_request_id number;
112 l_target_num number;
113 l_active_num number;
114 l_method varchar2(100);
115 l_message varchar2(100);
116 l_max_worker_number number;
117 l_batch_lower_limit number;
118
119 CURSOR worker_batch_cur(cp_workers_available in number,p_batch_lower_limit in number) IS
120 select * from (
121 SELECT DISTINCT ctl.batch_id
122 FROM csi_batch_txn_lines CTL
123 WHERE ctl.processed_flag = 0
124 AND batch_id >= p_batch_lower_limit
125 )
126 where ROWNUM <= cp_workers_available
127 ;
128
129 BEGIN
130
131 debug('BEGIN CSI_TRANSACTION_IMPORT_PVT.process_transaction_rows');
132
133 if (p_max_worker_number < 1) then
134 l_max_worker_number := 1;
135 else
136 l_max_worker_number := p_max_worker_number;
137 end if;
138
139 SELECT COUNT(*)
140 INTO l_active_num
141 FROM FND_CONCURRENT_REQUESTS FCR,
142 fnd_concurrent_programs FCP
143 WHERE FCR.concurrent_program_id = FCP.concurrent_program_id
144 AND FCR.program_application_id = FCP.application_id
145 AND FCP.application_id = 542
146 AND FCP.concurrent_program_name = 'CSITXIMW'
147 AND FCR.phase_code IN ('I','P','R');
148
149 fnd_file.put_line(fnd_file.log, 'Number of Active Workers '||l_active_num);
150 --check number of workers currently running
151 if l_max_worker_number <= l_active_num then
152 debug('Requested '||l_max_worker_number||' workers, but there are already '||l_active_num||' active workers');
153 RETCODE := 'Success';
154 CONC_STATUS := FND_CONCURRENT.SET_COMPLETION_STATUS('NORMAL',Current_Error_Code);
155 l_num_workers := 0;
156 return;
157 else
158 l_num_workers := l_max_worker_number - l_active_num;
159 end if;
160
161 -- setting maximum num of workers that can be launched to 30
162 if (l_num_workers > 30) then
163 l_num_workers := 30;
164 end if;
165
166 debug('Number of Workers Available '||l_num_workers);
167 fnd_file.put_line(fnd_file.log, 'Number of Workers Available '||l_num_workers);
168 --batch rows
169 l_batch_lower_limit := group_rows(l_num_workers);
170 debug('Function Group_Rows returns with value '||l_batch_lower_limit);
171
172 if (l_batch_lower_limit = 0) then
173 debug(' No Rows to Process..Exiting CSI_TRANSACTION_IMPORT_PVT.process_transaction_rows ');
174 return;
175 end if;
176
177 for cur_batch in worker_batch_cur(l_num_workers,l_batch_lower_limit) loop
178
179 l_request_id := FND_REQUEST.submit_request(
180 'CSI',
181 'CSITXIMW',
182 NULL,
183 NULL,
184 FALSE,
185 cur_batch.batch_id
186 ,p_purge_option
187 );
188
189 debug('Request Id '|| l_request_id || ' for batch ID '|| cur_batch.batch_id);
190 --fnd_file.put_line(fnd_file.log,'Request Id '|| l_request_id || ' for batch ID '|| cur_batch.batch_id);
191
192 IF ((l_request_id = 0) OR (l_request_id IS NULL)) then
193
194 --FND_MESSAGE.RETREIVE;
195 --FND_MESSAGE.ERROR;
196 debug('Error: Submit Request failed by returning Request Id '||l_request_id);
197 RETCODE := 'Error';
198 CONC_STATUS := FND_CONCURRENT.SET_COMPLETION_STATUS('ERROR',Current_Error_Code);
199
200 -- unmark rows in csi_batch_txn_lines
201 update csi_batch_txn_lines
202 set batch_id = -1,last_update_date = sysdate,last_updated_by = fnd_global.user_id
203 where batch_id = cur_batch.batch_id;
204
205 RAISE FND_API.G_EXC_UNEXPECTED_ERROR;
206 end if;
207
208 -- set the processed flag to 1 so that the next run of this program
209 -- will not process the rows
210 update CSI_BATCH_TXN_LINES
211 set processed_flag = 1,last_update_date = sysdate,last_updated_by = fnd_global.user_id
212 where batch_id = cur_batch.batch_id
213 and processed_flag = 0;
214
215 end loop;
216
217 debug('END CSI_TRANSACTION_IMPORT_PVT.process_transaction_rows');
218 COMMIT;
219 RETCODE := 'Success';
220
221 CONC_STATUS := FND_CONCURRENT.SET_COMPLETION_STATUS('NORMAL',Current_Error_Code);
222
223 EXCEPTION
224 WHEN OTHERS THEN
225 IF fnd_log.level_unexpected >= fnd_log.g_current_runtime_level THEN
226 fnd_log.string(fnd_log.level_unexpected,'csi.plsql.csi_transaction_import_pvt.process_transaction_rows', 'WHEN OTHERS: ' ||SQLERRM);
227 END IF;
228
229 update csi_batch_txn_lines
230 set batch_id = -1,last_update_date = sysdate,last_updated_by = fnd_global.user_id
231 where processed_flag = 0;
232
233 commit;
234 RAISE;
235
236 END PROCESS_TRANSACTION_ROWS;
237
238 END CSI_TRANSACTION_IMPORT_PVT;
239