1 package body hr_dm_download as
2 /* $Header: perdmdn.pkb 120.0 2005/05/30 21:16:58 appldev noship $ */
3
4 /*---------------------------- PRIVATE ROUTINES ------------------------------*/
5
6
7
8 /*---------------------------- PUBLIC ROUTINES ------------------------------*/
9
10 -- ------------------------- main ------------------------
11 -- Description: This is the download phase slave. It reads an item from the
12 -- hr_dm_migration_ranges table and calls the appropriate TDS package
13 -- download procedure to download the data into data pump interface table i.e.
14 -- hr_pump_batch_lines.
15 --
16 --
17 -- Input Parameters
18 -- p_migration_id - of current migration
19 --
20 -- p_concurrent_process - Y if program called from CM, otherwise
21 -- N prevents message logging
22 --
23 -- p_last_migration_date - date of last sucessful migration
24 --
25 -- p_process_number - process number given to slave process by
26 -- master process. The first process gets
27 -- number 1, second gets number 2 and so on
28 -- the maximum nuber being equal to the
29 -- number of threads.
30 --
31 --
32 -- Output Parameters
33 -- errbuf - buffer for output message (for CM manager)
34 --
35 -- retcode - program return code (for CM manager)
36 --
37 --
38 -- ------------------------------------------------------------------------
39 --
40 procedure main(errbuf OUT NOCOPY VARCHAR2,
41 retcode OUT NOCOPY NUMBER,
42 p_migration_id IN NUMBER,
43 p_concurrent_process IN VARCHAR2 DEFAULT 'Y',
44 p_last_migration_date IN DATE,
45 p_process_number IN NUMBER
46 ) is
47 --
48
49 l_current_phase_status VARCHAR2(30);
50 l_range_phase_id NUMBER;
51 l_download_phase_id NUMBER;
52 e_fatal_error EXCEPTION;
53 l_fatal_error_message VARCHAR2(200);
54 l_table_name VARCHAR2(30);
55 l_status VARCHAR2(30);
56 l_phase_item_id NUMBER;
57 l_business_group_id NUMBER;
58 l_migration_type VARCHAR2(30);
59 l_string VARCHAR2(500);
60 l_short_name VARCHAR2(30);
61 l_no_of_threads NUMBER;
62 l_cursor NUMBER;
63 l_return_value NUMBER;
64 l_chunk_size NUMBER;
65
66
67 -- cursor to get the table and range to be processed
68 cursor csr_table_range is
69 select mr.phase_item_id,
70 mr.range_id,
71 mr.starting_process_sequence,
72 mr.ending_process_sequence,
73 mr.status,
74 tbl.table_name,
75 tbl.short_name,
76 pi_dn.batch_id
77 from hr_dm_phase_items pi_dn,
78 hr_dm_tables tbl,
79 hr_dm_migration_ranges mr,
80 hr_dm_phase_items pi_rg
81 where pi_rg.phase_id = l_range_phase_id
82 and pi_rg.phase_item_id = mr.phase_item_id
83 and mr.status = 'NS'
84 and mod(mr.range_id,l_no_of_threads) + 1 = p_process_number
85 and pi_rg.table_name = tbl.table_name
86 and pi_dn.phase_id = l_download_phase_id
87 and pi_dn.group_id = pi_rg.group_id
88 and pi_dn.status in ('NS','S', 'E');
89
90 -- get the migration details
91 cursor csr_migration_info is
92 select business_group_id,
93 migration_type
94 from hr_dm_migrations
95 where migration_id = p_migration_id;
96
97 l_table_range_rec csr_table_range%rowtype;
98 l_no_of_rec_downloaded number;
99 l_range_id number;
100 --
101 begin
102 --
103
104 -- initialize messaging (only for concurrent processing)
105 if (p_concurrent_process = 'Y') then
106 hr_dm_utility.message_init;
107 end if;
108
109 hr_dm_utility.message('ROUT','entry:hr_dm_download.main', 5);
110 hr_dm_utility.message('PARA','(p_migration_id - ' || p_migration_id ||
111 ')(p_last_migration_date - ' || p_last_migration_date ||
112 ')', 10);
113 -- get the download phase_id
114 l_download_phase_id := hr_dm_utility.get_phase_id('DP', p_migration_id);
115
116 -- get the range phase_id
117 l_range_phase_id := hr_dm_utility.get_phase_id('R', p_migration_id);
118
119 -- get the business_group_id and migration_type
120 open csr_migration_info;
121 fetch csr_migration_info into l_business_group_id, l_migration_type;
122 if csr_migration_info%notfound then
123 close csr_migration_info;
124 l_fatal_error_message := 'hr_dm_download.main :- Migration Id ' ||
125 to_char(p_migration_id) || ' not found.';
126 raise e_fatal_error;
127 end if;
128 close csr_migration_info;
129
130 -- find the chunk size
131 l_chunk_size := hr_dm_utility.chunk_size(p_business_group_id => l_business_group_id);
132
133 if l_chunk_size is null then
134 l_fatal_error_message := 'hr_dm_download.main :- Chunk Size not ' ||
135 'defined for business group ' || l_business_group_id;
136 raise e_fatal_error;
137 end if;
138
139 -- find the number of threads to use
140 l_no_of_threads := hr_dm_utility.number_of_threads(l_business_group_id);
141
142
143 -- loop until either range phase is in error or all range phase items have
144 -- been processed
145
146 loop
147
148 --
149 -- get status of download phase. If phase has error status set by other slave
150 -- process then we need to stop the processing of this slave.
151 -- if null returned, then assume it is not started.
152 --
153 l_current_phase_status := nvl(hr_dm_utility.get_phase_status('DP',
154 p_migration_id),
155 'NS');
156
157 -- if status is error, then raise an exception
158 if (l_current_phase_status = 'E') then
159 l_fatal_error_message := 'error in download phase - slave exiting';
160 raise e_fatal_error;
161 end if;
162
163 l_no_of_rec_downloaded := 0;
164 open csr_table_range;
165
166 -- fetch a row from the phase items table
167 fetch csr_table_range into l_table_range_rec;
168 exit when csr_table_range%notfound;
169 close csr_table_range;
170 l_range_id := l_table_range_rec.range_id;
171
172 -- update status to started
173 hr_dm_utility.update_migration_ranges(p_new_status => 'S',
174 p_id => l_table_range_rec.range_id);
175
176 -- call download table range code in TDS package
177 -- passing business_group_id, r_migration_data.last_migration_date,
178 -- l_phase_item_id, l_number_of_threads
179
180
181 -- build parameter string
182 l_string := 'begin hrdmd_' || l_table_range_rec.short_name ||
183 '.download( ''' ||
184 l_migration_type || ''',' ||
185 l_business_group_id || ', ''' ||
186 p_last_migration_date || ''', ' ||
187 l_table_range_rec.starting_process_sequence || ',' ||
188 l_table_range_rec.ending_process_sequence || ',' ||
189 l_table_range_rec.batch_id || ', ' ||
190 l_chunk_size ||',' ||
191 ':l_no_of_rec_downloaded); end;';
192
193 hr_dm_utility.message('INFO','Call to TDS ' || l_string , 6);
194 -- bind variables
195
196 l_cursor := dbms_sql.open_cursor;
197 hr_dm_utility.message('INFO','Open dynamic cursor ' , 7);
198
199 dbms_sql.parse(l_cursor, l_string, dbms_sql.native);
200
201 hr_dm_utility.message('INFO','Bind dynamic var ' , 8);
202 -- bind the out variable of the download procedure.
203 dbms_sql.bind_variable(l_cursor,':l_no_of_rec_downloaded',20);
204
205 hr_dm_utility.message('INFO','Get return value ' , 9);
206 l_return_value := dbms_sql.execute(l_cursor);
207
208 hr_dm_utility.message('INFO','Get variable value ' , 10);
209 -- get the value of the download procedure.
210 dbms_sql.variable_value(l_cursor,
211 ':l_no_of_rec_downloaded',
212 l_no_of_rec_downloaded);
213
214 hr_dm_utility.message('INFO','Close dynamic variable ' , 10);
215 -- close the cursor.
216 dbms_sql.close_cursor(l_cursor);
217
218 -- update the no of records downloaded for the range.
219 update hr_dm_migration_ranges
220 set row_count = l_no_of_rec_downloaded
221 where range_id = l_table_range_rec.range_id;
222
223 -- update status to completed
224 hr_dm_utility.update_migration_ranges(p_new_status => 'C',
225 p_id => l_table_range_rec.range_id);
226 commit;
227 end loop;
228 if csr_table_range%isopen then
229 close csr_table_range;
230 end if;
231
232 -- set up return values to concurrent manager
233 retcode := 0;
234 errbuf := 'No errors - examine logfiles for detailed reports.';
235
236
237 hr_dm_utility.message('INFO','Download - main controller', 15);
238 hr_dm_utility.message('SUMM','Download - main controller', 20);
239 hr_dm_utility.message('ROUT','exit:hr_dm_download.main', 25);
240 hr_dm_utility.message('PARA','(retcode - ' || retcode ||
241 ')(errbuf - ' || errbuf || ')', 30);
242
243 -- error handling
244 exception
245 when e_fatal_error then
246 if csr_table_range%isopen then
247 close csr_table_range;
248 end if;
249 retcode := 0;
250 errbuf := 'An error occurred during the migration - examine logfiles for detailed reports.';
251 hr_dm_utility.error(SQLCODE,'hr_dm_download.main',l_fatal_error_message,'R');
252 hr_dm_utility.error(SQLCODE,'hr_dm_download.main','(none)','R');
253 when others then
254 if csr_table_range%isopen then
255 close csr_table_range;
256 end if;
257 retcode := 2;
258 errbuf := 'An error occurred during the migration - examine logfiles for detailed reports.';
259 -- update status to error
260 hr_dm_utility.update_migration_ranges(p_new_status => 'E',
261 p_id => l_range_id);
262 hr_dm_utility.error(SQLCODE,'hr_dm_download.main','(none)','R');
263 --
264 end main;
265 --
266 end hr_dm_download;