[Home] [Help]
PACKAGE BODY: APPS.DDR_ETL_UTIL_PKG
Source
1 PACKAGE BODY ddr_etl_util_pkg AS
2 /* $Header: ddruetlb.pls 120.5.12010000.2 2008/09/13 05:54:34 vbhave ship $ */
3
4 g_delimeter_char VARCHAR2(1) := ' '; /* Tab as delimeter */
5 -- g_delimeter_char VARCHAR2(1) := '~';
6 -- g_delimeter_char VARCHAR2(1) := CHR(9); /* Tab as delimeter */
7 g_owner_name VARCHAR2(30) := 'DDR';
8 g_max_col_value_size INTEGER := 1000;
9 g_max_linesize INTEGER := 32767;
10
11 PROCEDURE Raise_Error (p_error_text IN VARCHAR2)
12 IS
13 l_error_text VARCHAR2(240);
14 BEGIN
15 l_error_text := p_error_text;
16 Raise_Application_Error(-20001,l_error_text);
17 END Raise_Error;
18
19 PROCEDURE Validate_Parameters (p_table_name IN VARCHAR2,p_file_name IN VARCHAR2)
20 IS
21 CURSOR cur_tab (p_table_name IN VARCHAR2) IS
22 SELECT 1
23 FROM ALL_TABLES
24 WHERE table_name = p_table_name
25 AND owner = g_owner_name;
26
27 l_dummy INTEGER;
28 BEGIN
29 IF p_table_name IS NULL
30 THEN
31 Raise_Error('Table Name must be specified');
32 END IF;
33
34 IF p_file_name IS NULL
35 THEN
36 Raise_Error('File Name must be specified');
37 END IF;
38
39 /* Validate Table Name */
40 OPEN cur_tab (UPPER(p_table_name));
41 FETCH cur_tab INTO l_dummy;
42 IF cur_tab%NOTFOUND
43 THEN
44 Raise_Error('Invalid table name: ' || p_table_name);
45 END IF;
46 CLOSE cur_tab;
47 END Validate_Parameters;
48
49 FUNCTION is_string (p_column_type IN VARCHAR2)
50 RETURN BOOLEAN
51 IS
52 BEGIN
53 RETURN (p_column_type IN ('CHAR', 'VARCHAR2'));
54 END;
55
56 FUNCTION is_string (p_column_type IN string_tab,p_row_idx IN INTEGER)
57 RETURN BOOLEAN
58 IS
59 BEGIN
60 RETURN (p_column_type(p_row_idx) IN ('CHAR', 'VARCHAR2'));
61 END;
62
63 FUNCTION is_number (p_column_type IN VARCHAR2)
64 RETURN BOOLEAN
65 IS
66 BEGIN
67 RETURN (p_column_type IN ('FLOAT', 'INTEGER', 'NUMBER'));
68 END;
69
70 FUNCTION is_number (p_column_type IN string_tab,p_row_idx IN INTEGER)
71 RETURN BOOLEAN
72 IS
73 BEGIN
74 RETURN (p_column_type(p_row_idx) IN ('FLOAT', 'INTEGER', 'NUMBER'));
75 END;
76
77 FUNCTION is_date (p_column_type IN VARCHAR2)
78 RETURN BOOLEAN
79 IS
80 BEGIN
81 RETURN (p_column_type = 'DATE');
82 END;
83
84 FUNCTION is_date (p_column_type IN string_tab,p_row_idx IN INTEGER)
85 RETURN BOOLEAN
86 IS
87 BEGIN
88 RETURN (p_column_type(p_row_idx) = 'DATE');
89 END;
90
91 FUNCTION Get_Directory_Name RETURN VARCHAR2
92 IS
93 l_dirname VARCHAR2(100);
94 BEGIN
95 -- l_dirname := 'E:\Biswajit\BI-Projects\DSM\Praxis-Work\ETL\Scripts\Error\';
96 l_dirname := 'DDR_ERROR_DIR';
97 RETURN l_dirname;
98 END Get_Directory_Name;
99
100 PROCEDURE Get_Column_Details (
101 p_table_name IN VARCHAR2,
102 p_column_name IN OUT NOCOPY string_tab,
103 p_column_type IN OUT NOCOPY string_tab,
104 p_column_count IN OUT NOCOPY NUMBER
105 )
106 IS
107 CURSOR cur_col(c_table_name IN VARCHAR2) IS
108 SELECT column_name, data_type, data_length, data_precision, data_scale
109 FROM ALL_TAB_COLUMNS
110 WHERE table_name = c_table_name
111 AND owner = g_owner_name
112 ORDER BY column_id;
113 BEGIN
114 p_column_count := 0;
115 FOR rec_col IN cur_col (p_table_name)
116 LOOP
117 p_column_count := p_column_count + 1;
118 p_column_name(p_column_count) := rec_col.column_name;
119 p_column_type(p_column_count) := rec_col.data_type;
120 END LOOP;
121 END Get_Column_Details;
122
123 PROCEDURE Get_File_Column_Details (
124 p_line IN VARCHAR2,
125 p_file_column_name IN OUT NOCOPY string_tab,
126 p_file_column_count IN OUT NOCOPY NUMBER
127 )
128 IS
129 l_token VARCHAR2(4000);
130 BEGIN
131 p_file_column_count := LENGTH(p_line) - LENGTH(REPLACE(p_line,g_delimeter_char,'')) + 1;
132
133 FOR idx IN 0 .. (p_file_column_count-1)
134 LOOP
135 IF (idx=0)
136 THEN
137 IF INSTR(p_line,g_delimeter_char,1,1) <> 0
138 THEN
139 l_token := SUBSTR(p_line,1,INSTR(p_line,g_delimeter_char,1,1)-1);
140 ELSE
141 l_token := p_line;
142 END IF;
143 ELSE
144 IF INSTR(p_line,g_delimeter_char,1,idx+1) <> 0
145 THEN
146 l_token := SUBSTR(p_line,INSTR(p_line,g_delimeter_char,1,idx)+1,
147 INSTR(p_line,g_delimeter_char,1,idx+1)-INSTR(p_line,g_delimeter_char,1,idx)-1);
148 ELSE
149 l_token := SUBSTR(p_line,INSTR(p_line,g_delimeter_char,1,idx)+1);
150 END IF;
151 END IF;
152
153 p_file_column_name(idx+1) := l_token;
154 END LOOP;
155
156 END Get_File_Column_Details;
157
158 FUNCTION Column_Exists (p_column_name_array IN string_tab,p_column_name IN VARCHAR2)
159 RETURN BOOLEAN
160 IS
161 l_return_value BOOLEAN := FALSE;
162 BEGIN
163 FOR indx IN 1 .. p_column_name_array.COUNT
164 LOOP
165 IF p_column_name_array(indx) = p_column_name
166 THEN
167 l_return_value := TRUE;
168 EXIT;
169 END IF;
170 END LOOP;
171 RETURN l_return_value;
172 END Column_Exists;
173
174 PROCEDURE Export_Error (
175 p_table_name IN VARCHAR2,
176 p_load_id IN NUMBER DEFAULT NULL,
177 p_file_name IN VARCHAR2 DEFAULT NULL
178 )
179 AS
180 l_where_clause VARCHAR2(500);
181 l_file_name VARCHAR2(100);
182 BEGIN
183 l_where_clause := 'WHERE ACTION_FLAG = ''N'' ';
184 IF p_load_id IS NOT NULL
185 THEN
186 l_where_clause := l_where_clause || ' AND LOAD_ID = ' || p_load_id;
187 END IF;
188
189 l_file_name := p_file_name;
190 IF l_file_name IS NULL
191 THEN
192 IF p_load_id IS NOT NULL
193 THEN
194 l_file_name := p_table_name || '_' || TO_CHAR(p_load_id) || '.err';
195 ELSE
196 l_file_name := p_table_name || '.err';
197 END IF;
198 END IF;
199
200 Export_Data(p_table_name,l_where_clause,NVL(p_file_name,l_file_name));
201 END Export_Error;
202
203 PROCEDURE Export_Data (
204 p_table_name IN VARCHAR2,
205 p_where_clause IN VARCHAR2 DEFAULT NULL,
206 p_file_name IN VARCHAR2 DEFAULT NULL
207 )
208 AS
209 l_dir_name VARCHAR2(100);
210 l_table_name VARCHAR2(30);
211 l_where_clause VARCHAR2(500);
212 l_file UTL_FILE.FILE_TYPE;
213 l_column_name string_tab;
214 l_column_type string_tab;
215 l_column_count INTEGER;
216 l_column_list VARCHAR2(10000);
217 l_SQL_stmt VARCHAR2(10000);
218 cur_err INTEGER;
219 l_string_value VARCHAR2(1000);
220 l_number_value NUMBER;
221 l_date_value DATE;
222 l_return_value INTEGER;
223 l_line VARCHAR2(10000);
224 l_hdr_line VARCHAR2(10000);
225 BEGIN
226 l_table_name := UPPER(p_table_name);
227 l_where_clause := LTRIM(UPPER(p_where_clause));
228
229 Validate_Parameters(l_table_name,p_file_name);
230 Get_Column_Details(l_table_name,l_column_name,l_column_type,l_column_count);
231
232 /* Build the SELECT statement to be executed */
233 FOR indx IN 1 .. l_column_count
234 LOOP
235 IF indx = 1
236 THEN
237 l_column_list := l_column_name(indx);
238 l_hdr_line := l_column_name(indx);
239 ELSE
240 l_column_list := l_column_list || ', ' || l_column_name(indx);
241 l_hdr_line := l_hdr_line || g_delimeter_char || l_column_name(indx);
242 END IF;
243 END LOOP;
244
245 l_SQL_stmt := 'SELECT ' || l_column_list;
246 l_SQL_stmt := l_SQL_stmt || ' FROM ' || l_table_name;
247 IF l_where_clause IS NOT NULL
248 THEN
249 IF (l_where_clause NOT LIKE 'GROUP BY%' AND l_where_clause NOT LIKE 'ORDER BY%')
250 THEN
251 l_where_clause := ' WHERE ' || LTRIM (l_where_clause, 'WHERE');
252 END IF;
253 END IF;
254 l_SQL_stmt := l_SQL_stmt || l_where_clause;
255
256 l_dir_name := Get_Directory_Name;
257 l_file := UTL_FILE.FOPEN(l_dir_name,p_file_name,'W',g_max_linesize);
258
259 /* Retrieve the records */
260 cur_err := DBMS_SQL.OPEN_CURSOR;
261 DBMS_SQL.PARSE(cur_err,l_SQL_stmt,DBMS_SQL.NATIVE);
262 FOR col_indx IN 1 .. l_column_count
263 LOOP
264 IF is_string(l_column_type,col_indx)
265 THEN
266 DBMS_SQL.DEFINE_COLUMN (cur_err,col_indx,l_string_value,g_max_col_value_size);
267 ELSIF is_number(l_column_type,col_indx)
268 THEN
269 DBMS_SQL.DEFINE_COLUMN (cur_err,col_indx,l_number_value);
270 ELSIF is_date (l_column_type,col_indx)
271 THEN
272 DBMS_SQL.DEFINE_COLUMN (cur_err,col_indx,l_date_value);
273 END IF;
274 END LOOP;
275 l_return_value := DBMS_SQL.EXECUTE(cur_err);
276
277 LOOP
278 l_return_value := DBMS_SQL.FETCH_ROWS(cur_err);
279 EXIT WHEN l_return_value = 0;
280
281 IF DBMS_SQL.last_row_count = 1
282 THEN
283 /* Write the column header line in file */
284 UTL_FILE.PUT_LINE(l_file,l_hdr_line);
285 END IF;
286
287 l_line := NULL;
288 FOR col_indx IN 1 .. l_column_count
289 LOOP
290 IF is_string(l_column_type,col_indx)
291 THEN
292 DBMS_SQL.COLUMN_VALUE(cur_err,col_indx,l_string_value);
293 ELSIF is_number(l_column_type,col_indx)
294 THEN
295 DBMS_SQL.COLUMN_VALUE(cur_err,col_indx,l_number_value);
296 l_string_value := TO_CHAR (l_number_value);
297 ELSIF is_date(l_column_type,col_indx)
298 THEN
299 DBMS_SQL.COLUMN_VALUE(cur_err,col_indx,l_date_value);
300 l_string_value := TO_CHAR(l_date_value,'YYYY/MM/DD');
301 END IF;
302
303 l_line := l_line || g_delimeter_char || l_string_value;
304 END LOOP;
305 l_line := SUBSTR(l_line,2);
306
307 /* Write the line to the file */
308 UTL_FILE.PUT_LINE(l_file,l_line);
309 END LOOP;
310 DBMS_SQL.CLOSE_CURSOR(cur_err);
311
312 UTL_FILE.FCLOSE(l_file);
313 EXCEPTION
314 WHEN OTHERS
315 THEN
316 IF UTL_FILE.IS_OPEN(l_file)
317 THEN
318 UTL_FILE.FCLOSE(l_file);
319 END IF;
320 IF DBMS_SQL.IS_OPEN(cur_err)
321 THEN
322 DBMS_SQL.CLOSE_CURSOR(cur_err);
323 END IF;
324 Raise_Error(SQLERRM);
325 END Export_Data;
326
327 PROCEDURE Import_Error (
328 p_table_name IN VARCHAR2,
329 p_file_name IN VARCHAR2 DEFAULT NULL
330 )
331 AS
332 l_dir_name VARCHAR2(100);
333 l_table_name VARCHAR2(30);
334 l_file_name VARCHAR2(100);
335 l_file UTL_FILE.FILE_TYPE;
336 l_column_name string_tab;
337 l_column_type string_tab;
338 l_column_count INTEGER;
339 l_update_stmt VARCHAR2(10000);
340 l_insert_stmt VARCHAR2(10000);
341 l_update_clause VARCHAR2(10000);
342 l_insert_clause VARCHAR2(10000);
343 l_values_clause VARCHAR2(10000);
344 cur_err_upd INTEGER;
345 cur_err_ins INTEGER;
346 l_string_value VARCHAR2(4000);
347 l_number_value NUMBER;
348 l_date_value DATE;
349 l_return_value INTEGER;
350 l_line VARCHAR2(10000);
351 l_col_type_by_name string_index_by_char_tab;
352 l_file_column_name string_tab;
353 l_file_column_count INTEGER;
354 l_rec_count INTEGER;
355 l_count INTEGER;
356 l_file_column_value string_tab;
357 l_update_count INTEGER;
358 l_action_column_indx INTEGER;
359 BEGIN
360 l_table_name := UPPER(p_table_name);
361 l_file_name := NVL(p_file_name,l_table_name || '.err');
362
363 Validate_Parameters(l_table_name,l_file_name);
364 Get_Column_Details(l_table_name,l_column_name,l_column_type,l_column_count);
365
366 FOR indx IN 1 .. l_column_count
367 LOOP
368 l_col_type_by_name(l_column_name(indx)) := l_column_type(indx);
369 END LOOP;
370
371 l_dir_name := Get_Directory_Name;
372 l_file := UTL_FILE.FOPEN(l_dir_name,l_file_name,'R',g_max_linesize);
373
374 /* Read and Process all records from the file */
375 cur_err_upd := DBMS_SQL.OPEN_CURSOR;
376 cur_err_ins := DBMS_SQL.OPEN_CURSOR;
377 l_rec_count := 0;
378 LOOP
379 BEGIN
380 UTL_FILE.GET_LINE(l_file,l_line,g_max_linesize);
381 l_rec_count := l_rec_count + 1;
382
383 IF l_rec_count = 1 /* Assuming that the first line is the File Column Header Line */
384 THEN
385 /* Build l_file_column_name array based on the read line */
386 Get_File_Column_Details(l_line,l_file_column_name,l_file_column_count);
387
388 /* Build the Update and Insert Statements and Parse them */
389 l_update_clause := 'UPDATE ' || l_table_name || ' SET ';
390 l_insert_clause := 'INSERT INTO ' || l_table_name ||'(';
391 l_values_clause := 'VALUES (';
392 l_action_column_indx := 0;
393 FOR indx IN 1 .. l_file_column_count
394 LOOP
395 IF l_file_column_name(indx) <> 'REC_ID'
396 THEN
397 l_update_clause := l_update_clause || ' ' || l_file_column_name(indx)
398 || '=:' || l_file_column_name(indx) || ',';
399 END IF;
400 l_insert_clause := l_insert_clause || l_file_column_name(indx) || ',';
401 l_values_clause := l_values_clause || ':' || l_file_column_name(indx) || ',';
402
403 IF l_file_column_name(indx) = 'ACTION_FLAG'
404 THEN
405 l_action_column_indx := indx;
406 END IF;
407 END LOOP;
408 l_update_clause := SUBSTR(l_update_clause,1,LENGTH(l_update_clause)-1);
409 l_insert_clause := SUBSTR(l_insert_clause,1,LENGTH(l_insert_clause)-1);
410 l_values_clause := SUBSTR(l_values_clause,1,LENGTH(l_values_clause)-1);
411 l_update_stmt := l_update_clause || ' WHERE REC_ID=:REC_ID';
412 l_insert_stmt := l_insert_clause || ') ' || l_values_clause || ')';
416 ELSE
413
414 DBMS_SQL.PARSE(cur_err_upd,l_update_stmt,DBMS_SQL.NATIVE);
415 DBMS_SQL.PARSE(cur_err_ins,l_insert_stmt,DBMS_SQL.NATIVE);
417 Get_File_Column_Details(l_line,l_file_column_value,l_count);
418
419 IF l_file_column_value(l_action_column_indx) IN ('Y','D') /* i.e. ACTION_FLAG IN ('Y','D') */
420 THEN
421 /* Update the record into table */
422 FOR indx IN 1 .. l_count
423 LOOP
424 IF is_string(l_col_type_by_name(l_file_column_name(indx)))
425 THEN
426 l_string_value := l_file_column_value(indx);
427 DBMS_SQL.BIND_VARIABLE(cur_err_upd,l_file_column_name(indx),l_string_value);
428 ELSIF is_number(l_col_type_by_name(l_file_column_name(indx)))
429 THEN
430 l_number_value := TO_NUMBER(l_file_column_value(indx));
431 DBMS_SQL.BIND_VARIABLE(cur_err_upd,l_file_column_name(indx),l_number_value);
432 ELSIF is_date(l_col_type_by_name(l_file_column_name(indx)))
433 THEN
434 l_date_value := TO_DATE(l_file_column_value(indx),'YYYY/MM/DD');
435 DBMS_SQL.BIND_VARIABLE(cur_err_upd,l_file_column_name(indx),l_date_value);
436 END IF;
437 END LOOP;
438 l_update_count := DBMS_SQL.EXECUTE(cur_err_upd);
439
440 /* Insert the record into table if Update fails */
441 IF l_update_count = 0
442 THEN
443 FOR indx IN 1 .. l_count
444 LOOP
445 IF is_string(l_col_type_by_name(l_file_column_name(indx)))
446 THEN
447 l_string_value := l_file_column_value(indx);
448 DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_string_value);
449 ELSIF is_number(l_col_type_by_name(l_file_column_name(indx)))
450 THEN
451 l_number_value := TO_NUMBER(l_file_column_value(indx));
452 DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_number_value);
453 ELSIF is_date(l_col_type_by_name(l_file_column_name(indx)))
454 THEN
455 l_date_value := TO_DATE(l_file_column_value(indx),'YYYY/MM/DD');
456 DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_date_value);
457 END IF;
458 END LOOP;
459 l_update_count := DBMS_SQL.EXECUTE(cur_err_ins);
460 END IF;
461 END IF;
462
463 END IF;
464
465 EXCEPTION
466 WHEN NO_DATA_FOUND
467 THEN EXIT;
468 END;
469 END LOOP;
470
471 DBMS_SQL.CLOSE_CURSOR(cur_err_upd);
472 DBMS_SQL.CLOSE_CURSOR(cur_err_ins);
473 UTL_FILE.FCLOSE(l_file);
474
475 COMMIT;
476 EXCEPTION
477 WHEN OTHERS
478 THEN
479 IF UTL_FILE.IS_OPEN(l_file)
480 THEN
481 UTL_FILE.FCLOSE(l_file);
482 END IF;
483 IF DBMS_SQL.IS_OPEN(cur_err_upd)
484 THEN
485 DBMS_SQL.CLOSE_CURSOR(cur_err_upd);
486 END IF;
487 IF DBMS_SQL.IS_OPEN(cur_err_ins)
488 THEN
489 DBMS_SQL.CLOSE_CURSOR(cur_err_ins);
490 END IF;
491 Raise_Error(SQLERRM);
492 END Import_Error;
493
494 PROCEDURE Import_Error (
495 p_table_name IN VARCHAR2,
496 p_file_name IN VARCHAR2 DEFAULT NULL,
497 p_err_table_name IN VARCHAR2,
498 p_load_id IN NUMBER DEFAULT NULL,
499 p_tgt_table_type IN VARCHAR2 DEFAULT 'I'
500 )
501 AS
502 l_err_table_name VARCHAR2(30);
503 BEGIN
504 IF p_err_table_name IS NULL
505 THEN
506 l_err_table_name := 'DDR_E_' || SUBSTR(p_table_name,7);
507 ELSE
508 l_err_table_name := p_err_table_name;
509 END IF;
510 Import_Error(l_err_table_name,p_file_name);
511 Transfer_Data(l_err_table_name,p_table_name,p_load_id,p_tgt_table_type);
512 END Import_Error;
513
514 PROCEDURE Import_Data (
515 p_table_name IN VARCHAR2,
516 p_file_name IN VARCHAR2 DEFAULT NULL
517 )
518 AS
519 l_dir_name VARCHAR2(100);
520 l_table_name VARCHAR2(30);
521 l_file_name VARCHAR2(100);
522 l_file UTL_FILE.FILE_TYPE;
523 l_column_name string_tab;
524 l_column_type string_tab;
525 l_column_count INTEGER;
526 l_insert_stmt VARCHAR2(10000);
527 l_insert_clause VARCHAR2(10000);
528 l_values_clause VARCHAR2(10000);
529 cur_err_ins INTEGER;
530 l_string_value VARCHAR2(4000);
534 l_line VARCHAR2(10000);
531 l_number_value NUMBER;
532 l_date_value DATE;
533 l_return_value INTEGER;
535 l_col_type_by_name string_index_by_char_tab;
536 l_file_column_name string_tab;
537 l_file_column_count INTEGER;
538 l_rec_count INTEGER;
539 l_count INTEGER;
540 l_file_column_value string_tab;
541 l_update_count INTEGER;
542 BEGIN
543 l_table_name := UPPER(p_table_name);
544 l_file_name := NVL(p_file_name,l_table_name || '.txt');
545
546 Validate_Parameters(l_table_name,l_file_name);
547 Get_Column_Details(l_table_name,l_column_name,l_column_type,l_column_count);
548
549 FOR indx IN 1 .. l_column_count
550 LOOP
551 l_col_type_by_name(l_column_name(indx)) := l_column_type(indx);
552 END LOOP;
553
554 l_dir_name := Get_Directory_Name;
555 l_file := UTL_FILE.FOPEN(l_dir_name,l_file_name,'R',g_max_linesize);
556
557 /* Read and Process all records from the file */
558 cur_err_ins := DBMS_SQL.OPEN_CURSOR;
559 l_rec_count := 0;
560 LOOP
561 BEGIN
562 UTL_FILE.GET_LINE(l_file,l_line,g_max_linesize);
563 l_rec_count := l_rec_count + 1;
564
565 IF l_rec_count = 1 /* Assuming that the first line is the File Column Header Line */
566 THEN
567 /* Build l_file_column_name array based on the read line */
568 Get_File_Column_Details(l_line,l_file_column_name,l_file_column_count);
569
570 /* Build the Insert Statement and Parse them */
571 l_insert_clause := 'INSERT INTO ' || l_table_name ||'(';
572 l_values_clause := 'VALUES (';
573 FOR indx IN 1 .. l_file_column_count
574 LOOP
575 l_insert_clause := l_insert_clause || l_file_column_name(indx) || ',';
576 l_values_clause := l_values_clause || ':' || l_file_column_name(indx) || ',';
577 END LOOP;
578 l_insert_clause := SUBSTR(l_insert_clause,1,LENGTH(l_insert_clause)-1);
579 l_values_clause := SUBSTR(l_values_clause,1,LENGTH(l_values_clause)-1);
580 l_insert_stmt := l_insert_clause || ') ' || l_values_clause || ')';
581
582 DBMS_SQL.PARSE(cur_err_ins,l_insert_stmt,DBMS_SQL.NATIVE);
583 ELSE
584 Get_File_Column_Details(l_line,l_file_column_value,l_count);
585
586 /* Insert the record into table */
587 FOR indx IN 1 .. l_count
588 LOOP
589 IF is_string(l_col_type_by_name(l_file_column_name(indx)))
590 THEN
591 l_string_value := l_file_column_value(indx);
592 DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_string_value);
593 ELSIF is_number(l_col_type_by_name(l_file_column_name(indx)))
594 THEN
595 l_number_value := TO_NUMBER(l_file_column_value(indx));
596 DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_number_value);
597 ELSIF is_date(l_col_type_by_name(l_file_column_name(indx)))
598 THEN
599 l_date_value := TO_DATE(l_file_column_value(indx),'YYYY/MM/DD');
600 DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_date_value);
601 END IF;
602 END LOOP;
603 l_update_count := DBMS_SQL.EXECUTE(cur_err_ins);
604
605 END IF;
606
607 EXCEPTION
608 WHEN NO_DATA_FOUND
609 THEN EXIT;
610 END;
611 END LOOP;
612
613 DBMS_SQL.CLOSE_CURSOR(cur_err_ins);
614 UTL_FILE.FCLOSE(l_file);
615
616 COMMIT;
617 EXCEPTION
618 WHEN OTHERS
619 THEN
620 IF UTL_FILE.IS_OPEN(l_file)
621 THEN
622 UTL_FILE.FCLOSE(l_file);
623 END IF;
624 IF DBMS_SQL.IS_OPEN(cur_err_ins)
625 THEN
626 DBMS_SQL.CLOSE_CURSOR(cur_err_ins);
627 END IF;
628 Raise_Error(SQLERRM);
629 END Import_Data;
630
631 PROCEDURE Transfer_Data (
632 p_src_table_name IN VARCHAR2,
633 p_tgt_table_name IN VARCHAR2,
634 p_load_id IN NUMBER DEFAULT NULL,
635 p_tgt_table_type IN VARCHAR2 DEFAULT 'I'
636 )
637 AS
638 l_src_table_name VARCHAR2(30);
639 l_tgt_table_name VARCHAR2(30);
640 l_tgt_table_type VARCHAR2(30);
641 l_src_column_name string_tab;
642 l_src_column_type string_tab;
643 l_src_column_count INTEGER;
644 l_src_column_list VARCHAR2(10000);
645 l_select_stmt VARCHAR2(10000);
646 l_where_clause VARCHAR2(500);
647 l_tgt_column_name string_tab;
648 l_tgt_column_type string_tab;
649 l_tgt_column_count INTEGER;
653 l_update_clause VARCHAR2(10000);
650 l_tgt_col_type_by_name string_index_by_char_tab;
651 l_update_stmt VARCHAR2(10000);
652 l_insert_stmt VARCHAR2(10000);
654 l_insert_clause VARCHAR2(10000);
655 l_values_clause VARCHAR2(10000);
656 l_delete_stmt VARCHAR2(1000);
657 l_rest_where_clause VARCHAR2(500);
658 cur_src INTEGER;
659 cur_tgt_upd INTEGER;
660 cur_tgt_ins INTEGER;
661 l_string_value VARCHAR2(1000);
662 l_number_value NUMBER;
663 l_date_value DATE;
664 l_return_value INTEGER;
665 l_update_count INTEGER;
666 BEGIN
667 l_src_table_name := UPPER(p_src_table_name);
668 l_tgt_table_name := UPPER(p_tgt_table_name);
669 l_tgt_table_type := NVL(p_tgt_table_type,'I');
670
671 Validate_Parameters(l_src_table_name,'DUMMY');
672 Validate_Parameters(l_tgt_table_name,'DUMMY');
673 IF p_tgt_table_type NOT IN ('I','S')
674 THEN
675 Raise_Error('Valid values for Target Table Type is ''I'' and ''S'' only');
676 END IF;
677
678 /* Get Target Table Column details */
679 Get_Column_Details(l_tgt_table_name,l_tgt_column_name,l_tgt_column_type,l_tgt_column_count);
680 FOR indx IN 1 .. l_tgt_column_count
681 LOOP
682 l_tgt_col_type_by_name(l_tgt_column_name(indx)) := l_tgt_column_type(indx);
683 END LOOP;
684
685 /* Get Source Table Column details */
686 Get_Column_Details(l_src_table_name,l_src_column_name,l_src_column_type,l_src_column_count);
687
688 /* Build the SELECT statement to be executed */
689 FOR indx IN 1 .. l_src_column_count
690 LOOP
691 IF indx = 1
692 THEN
693 l_src_column_list := l_src_column_name(indx);
694 ELSE
695 l_src_column_list := l_src_column_list || ', ' || l_src_column_name(indx);
696 END IF;
697 END LOOP;
698
699 l_select_stmt := 'SELECT ' || l_src_column_list;
700 l_select_stmt := l_select_stmt || ' FROM ' || l_src_table_name;
701 l_where_clause := ' WHERE ACTION_FLAG = ''Y'' ';
702 l_rest_where_clause := null;
703 IF p_load_id IS NOT NULL
704 THEN
705 l_where_clause := l_where_clause || ' AND LOAD_ID = ' || p_load_id;
706 l_rest_where_clause := l_rest_where_clause || ' AND LOAD_ID = ' || p_load_id;
707 END IF;
708 IF Column_Exists(l_src_column_name,'SRC_IDNT_FLAG')
709 THEN
710 l_where_clause := l_where_clause || ' AND SRC_IDNT_FLAG = ''' || l_tgt_table_type || '''';
711 l_rest_where_clause := l_rest_where_clause || ' AND SRC_IDNT_FLAG = ''' || l_tgt_table_type || '''';
712 END IF;
713 l_select_stmt := l_select_stmt || l_where_clause;
714
715 cur_tgt_upd := DBMS_SQL.OPEN_CURSOR;
716 cur_tgt_ins := DBMS_SQL.OPEN_CURSOR;
717
718 /* Build the Update and Insert Statements and Parse them */
719 l_update_clause := 'UPDATE ' || l_tgt_table_name || ' SET ';
720 l_insert_clause := 'INSERT INTO ' || l_tgt_table_name ||'(';
721 l_values_clause := 'VALUES (';
722 FOR indx IN 1 .. l_tgt_column_count
723 LOOP
724 /* Here check for the corresponding column existance in source table */
725 IF Column_Exists(l_src_column_name,l_tgt_column_name(indx))
726 THEN
727 IF l_tgt_column_name(indx) <> 'REC_ID'
728 THEN
729 l_update_clause := l_update_clause || ' ' || l_tgt_column_name(indx)
730 || '=:' || l_tgt_column_name(indx) || ',';
731 END IF;
732 l_insert_clause := l_insert_clause || l_tgt_column_name(indx) || ',';
733 l_values_clause := l_values_clause || ':' || l_tgt_column_name(indx) || ',';
734 END IF;
735 END LOOP;
736 l_update_clause := SUBSTR(l_update_clause,1,LENGTH(l_update_clause)-1);
737 l_insert_clause := SUBSTR(l_insert_clause,1,LENGTH(l_insert_clause)-1);
738 l_values_clause := SUBSTR(l_values_clause,1,LENGTH(l_values_clause)-1);
739 l_update_stmt := l_update_clause || ' WHERE REC_ID=:REC_ID';
740 l_insert_stmt := l_insert_clause || ') ' || l_values_clause || ')';
741
742 DBMS_SQL.PARSE(cur_tgt_upd,l_update_stmt,DBMS_SQL.NATIVE);
743 DBMS_SQL.PARSE(cur_tgt_ins,l_insert_stmt,DBMS_SQL.NATIVE);
744
745 /* Retrieve the records from Source table */
746 cur_src := DBMS_SQL.OPEN_CURSOR;
747 DBMS_SQL.PARSE(cur_src,l_select_stmt,DBMS_SQL.NATIVE);
748 FOR col_indx IN 1 .. l_src_column_count
749 LOOP
750 IF is_string(l_src_column_type,col_indx)
751 THEN
752 DBMS_SQL.DEFINE_COLUMN (cur_src,col_indx,l_string_value,g_max_col_value_size);
753 ELSIF is_number(l_src_column_type,col_indx)
754 THEN
755 DBMS_SQL.DEFINE_COLUMN (cur_src,col_indx,l_number_value);
756 ELSIF is_date (l_src_column_type,col_indx)
757 THEN
758 DBMS_SQL.DEFINE_COLUMN (cur_src,col_indx,l_date_value);
759 END IF;
760 END LOOP;
761
765 EXIT WHEN l_return_value = 0;
762 l_return_value := DBMS_SQL.EXECUTE(cur_src);
763 LOOP
764 l_return_value := DBMS_SQL.FETCH_ROWS(cur_src);
766
767 FOR col_indx IN 1 .. l_src_column_count
768 LOOP
772 THEN
769 IF Column_Exists(l_tgt_column_name,l_src_column_name(col_indx))
770 THEN
771 IF is_string(l_src_column_type,col_indx)
773 DBMS_SQL.COLUMN_VALUE(cur_src,col_indx,l_string_value);
774 DBMS_SQL.BIND_VARIABLE(cur_tgt_upd,l_src_column_name(col_indx),l_string_value);
775 DBMS_SQL.BIND_VARIABLE(cur_tgt_ins,l_src_column_name(col_indx),l_string_value);
776 ELSIF is_number(l_src_column_type,col_indx)
777 THEN
778 DBMS_SQL.COLUMN_VALUE(cur_src,col_indx,l_number_value);
779 DBMS_SQL.BIND_VARIABLE(cur_tgt_upd,l_src_column_name(col_indx),l_number_value);
780 DBMS_SQL.BIND_VARIABLE(cur_tgt_ins,l_src_column_name(col_indx),l_number_value);
781 ELSIF is_date(l_src_column_type,col_indx)
782 THEN
783 DBMS_SQL.COLUMN_VALUE(cur_src,col_indx,l_date_value);
784 DBMS_SQL.BIND_VARIABLE(cur_tgt_upd,l_src_column_name(col_indx),l_date_value);
785 DBMS_SQL.BIND_VARIABLE(cur_tgt_ins,l_src_column_name(col_indx),l_date_value);
786 END IF;
787 END IF;
788 END LOOP;
789
790 /* Execute the DML statement against the Target table */
791 l_update_count := DBMS_SQL.EXECUTE(cur_tgt_upd);
792
793 /* Insert the record into table if Update fails */
794 IF l_update_count = 0
795 THEN
796 l_update_count := DBMS_SQL.EXECUTE(cur_tgt_ins);
797 END IF;
798 END LOOP;
799
800 DBMS_SQL.CLOSE_CURSOR(cur_src);
801 DBMS_SQL.CLOSE_CURSOR(cur_tgt_upd);
802 DBMS_SQL.CLOSE_CURSOR(cur_tgt_ins);
803
804 /* Delete Records from Target table for records marked with "ACTION_FLAG = 'D'" in Source Error table */
805 l_delete_stmt := 'DELETE FROM ' || l_tgt_table_name || ' WHERE REC_ID IN (SELECT REC_ID FROM ' || l_src_table_name;
806 l_delete_stmt := l_delete_stmt || ' WHERE ACTION_FLAG = ''D'' ' || l_rest_where_clause || ')';
807 EXECUTE IMMEDIATE l_delete_stmt;
808
809 /* Delete Transfered Records (i.e. ACTION_FLAG = 'Y') as well as records marked with "ACTION_FLAG = 'D'"
810 from Source Error table */
811 l_delete_stmt := 'DELETE FROM ' || l_src_table_name || ' WHERE ACTION_FLAG IN (''Y'',''D'') ' || l_rest_where_clause;
812 EXECUTE IMMEDIATE l_delete_stmt;
813
814 COMMIT;
815 EXCEPTION
816 WHEN OTHERS
817 THEN
818 IF DBMS_SQL.IS_OPEN(cur_src)
819 THEN
820 DBMS_SQL.CLOSE_CURSOR(cur_src);
821 END IF;
822 IF DBMS_SQL.IS_OPEN(cur_tgt_upd)
823 THEN
824 DBMS_SQL.CLOSE_CURSOR(cur_tgt_upd);
825 END IF;
826 IF DBMS_SQL.IS_OPEN(cur_tgt_ins)
827 THEN
828 DBMS_SQL.CLOSE_CURSOR(cur_tgt_ins);
829 END IF;
830 Raise_Error(SQLERRM);
831 END Transfer_Data;
832
833 PROCEDURE refresh_mv (
834 p_list IN VARCHAR2,
835 p_method IN VARCHAR2 DEFAULT NULL,
836 p_rollback_seg IN VARCHAR2 DEFAULT NULL,
837 p_push_deferred_rpc IN BOOLEAN DEFAULT TRUE,
838 p_refresh_after_errors IN BOOLEAN DEFAULT FALSE,
839 p_purge_option IN BINARY_INTEGER DEFAULT 1,
840 p_parallelism IN BINARY_INTEGER DEFAULT 0,
841 p_heap_size IN BINARY_INTEGER DEFAULT 0,
842 p_atomic_refresh IN BOOLEAN DEFAULT TRUE,
843 p_job_id IN VARCHAR2 DEFAULT NULL,
844 p_refreshed_by IN VARCHAR2 DEFAULT NULL,
845 x_out OUT NOCOPY VARCHAR2,
846 x_message OUT NOCOPY VARCHAR2
847 ) IS
848 v_seq NUMBER;
849 BEGIN
850
851 SELECT ddr_u_mv_rfrsh_seq.nextval INTO v_seq FROM DUAL;
852 INSERT INTO ddr_u_mv_rfrsh_log(refresh_job_id
853 ,refresh_sequence
857 ,refreshed_by
854 ,mv_name
855 ,refresh_method
856 ,error_message
858 ,start_date
859 ,end_date)
860 VALUES(p_job_id
861 ,v_seq
862 ,p_list
863 ,p_method
864 ,p_refreshed_by
865 ,NULL
866 ,SYSDATE
867 ,NULL);
868 COMMIT;
869
870 DBMS_MVIEW.REFRESH(p_list, p_method, p_rollback_seg, p_push_deferred_rpc,
871 p_refresh_after_errors, p_purge_option,
872 p_parallelism, p_heap_size, p_atomic_refresh);
873
874 UPDATE ddr_u_mv_rfrsh_log
875 SET end_date = SYSDATE
876 WHERE refresh_job_id = p_job_id
877 AND refresh_sequence = v_seq
878 AND mv_name = p_list;
879 COMMIT;
880
881 x_out := 'S';
882 x_message := NULL;
883 EXCEPTION
884 WHEN OTHERS THEN
885 x_out := 'F';
886 x_message := SQLERRM;
887
888 UPDATE ddr_u_mv_rfrsh_log
889 SET end_date = SYSDATE,
890 error_message = x_message
891 WHERE refresh_job_id = p_job_id
892 AND refresh_sequence = v_seq
893 AND mv_name = p_list;
894 COMMIT;
895
896 END refresh_mv;
897
898 FUNCTION get_mv_refresh_job_id RETURN VARCHAR2 IS
899 BEGIN
900 RETURN TO_CHAR(SYSDATE,'YYYYMMDDHH24MMSS');
901 END get_mv_refresh_job_id;
902
903 PROCEDURE truncate_mv_log(
904 p_mv_log_name IN VARCHAR2,
905 p_job_id IN VARCHAR2 DEFAULT NULL,
906 p_refreshed_by IN VARCHAR2 DEFAULT NULL,
907 x_out OUT NOCOPY VARCHAR2,
908 x_message OUT NOCOPY VARCHAR2
909 ) IS
910 v_seq NUMBER;
911 BEGIN
912
913 SELECT ddr_u_mv_rfrsh_seq.nextval INTO v_seq FROM DUAL;
914 INSERT INTO ddr_u_mv_rfrsh_log(refresh_job_id
915 ,refresh_sequence
916 ,mv_name
917 ,refresh_method
918 ,error_message
919 ,refreshed_by
920 ,start_date
921 ,end_date)
922 VALUES(p_job_id
923 ,v_seq
924 ,p_mv_log_name
925 ,'TRUNCATE'
926 ,p_refreshed_by
927 ,NULL
928 ,SYSDATE
929 ,NULL);
930 COMMIT;
931
932 EXECUTE IMMEDIATE 'TRUNCATE TABLE '||p_mv_log_name||' DROP STORAGE';
933
934 UPDATE ddr_u_mv_rfrsh_log
935 SET end_date = SYSDATE
936 WHERE refresh_job_id = p_job_id
937 AND refresh_sequence = v_seq
938 AND mv_name = p_mv_log_name;
939 COMMIT;
940
941 x_out := 'S';
942 x_message := NULL;
943 EXCEPTION
944 WHEN OTHERS THEN
945 x_out := 'F';
946 x_message := SQLERRM;
947 UPDATE ddr_u_mv_rfrsh_log
948 SET end_date = SYSDATE,
949 error_message = x_message
950 WHERE refresh_job_id = p_job_id
951 AND refresh_sequence = v_seq
952 AND mv_name = p_mv_log_name;
953 COMMIT;
954
955 END truncate_mv_log;
956
957 END ddr_etl_util_pkg;