DBA Data[Home] [Help]

PACKAGE BODY: APPS.DDR_ETL_UTIL_PKG

Source


1 PACKAGE BODY ddr_etl_util_pkg AS
2 /* $Header: ddruetlb.pls 120.5.12010000.2 2008/09/13 05:54:34 vbhave ship $ */
3 
4   g_delimeter_char      VARCHAR2(1) := '	'; /* Tab as delimeter */
5   -- g_delimeter_char      VARCHAR2(1) := '~';
6   -- g_delimeter_char      VARCHAR2(1) := CHR(9); /* Tab as delimeter */
7   g_owner_name          VARCHAR2(30) := 'DDR';
8   g_max_col_value_size  INTEGER := 1000;
9   g_max_linesize        INTEGER := 32767;
10 
11   PROCEDURE Raise_Error (p_error_text IN VARCHAR2)
12   IS
13       l_error_text        VARCHAR2(240);
14   BEGIN
15       l_error_text := p_error_text;
16       Raise_Application_Error(-20001,l_error_text);
17   END Raise_Error;
18 
19   PROCEDURE Validate_Parameters (p_table_name IN VARCHAR2,p_file_name IN VARCHAR2)
20   IS
21       CURSOR cur_tab (p_table_name IN VARCHAR2) IS
22       SELECT 1
23       FROM   ALL_TABLES
24       WHERE  table_name = p_table_name
25       AND    owner = g_owner_name;
26 
27       l_dummy   INTEGER;
28   BEGIN
29       IF p_table_name IS NULL
30       THEN
31           Raise_Error('Table Name must be specified');
32       END IF;
33 
34       IF p_file_name IS NULL
35       THEN
36           Raise_Error('File Name must be specified');
37       END IF;
38 
39       /* Validate Table Name */
40       OPEN cur_tab (UPPER(p_table_name));
41       FETCH cur_tab INTO l_dummy;
42       IF cur_tab%NOTFOUND
43       THEN
44           Raise_Error('Invalid table name: ' || p_table_name);
45       END IF;
46       CLOSE cur_tab;
47   END Validate_Parameters;
48 
49   FUNCTION is_string (p_column_type IN VARCHAR2)
50   RETURN BOOLEAN
51   IS
52   BEGIN
53       RETURN (p_column_type IN ('CHAR', 'VARCHAR2'));
54   END;
55 
56   FUNCTION is_string (p_column_type IN string_tab,p_row_idx IN INTEGER)
57   RETURN BOOLEAN
58   IS
59   BEGIN
60       RETURN (p_column_type(p_row_idx) IN ('CHAR', 'VARCHAR2'));
61   END;
62 
63   FUNCTION is_number (p_column_type IN VARCHAR2)
64   RETURN BOOLEAN
65   IS
66   BEGIN
67       RETURN (p_column_type IN ('FLOAT', 'INTEGER', 'NUMBER'));
68   END;
69 
70   FUNCTION is_number (p_column_type IN string_tab,p_row_idx IN INTEGER)
71   RETURN BOOLEAN
72   IS
73   BEGIN
74       RETURN (p_column_type(p_row_idx) IN ('FLOAT', 'INTEGER', 'NUMBER'));
75   END;
76 
77   FUNCTION is_date (p_column_type IN VARCHAR2)
78   RETURN BOOLEAN
79   IS
80   BEGIN
81       RETURN (p_column_type = 'DATE');
82   END;
83 
84   FUNCTION is_date (p_column_type IN string_tab,p_row_idx IN INTEGER)
85   RETURN BOOLEAN
86   IS
87   BEGIN
88       RETURN (p_column_type(p_row_idx) = 'DATE');
89   END;
90 
91   FUNCTION Get_Directory_Name RETURN VARCHAR2
92   IS
93       l_dirname     VARCHAR2(100);
94   BEGIN
95       -- l_dirname := 'E:\Biswajit\BI-Projects\DSM\Praxis-Work\ETL\Scripts\Error\';
96       l_dirname := 'DDR_ERROR_DIR';
97       RETURN l_dirname;
98   END Get_Directory_Name;
99 
100   PROCEDURE Get_Column_Details (
101       p_table_name      IN VARCHAR2,
102       p_column_name     IN OUT NOCOPY string_tab,
103       p_column_type     IN OUT NOCOPY string_tab,
104       p_column_count    IN OUT NOCOPY NUMBER
105   )
106   IS
107       CURSOR cur_col(c_table_name IN VARCHAR2) IS
108       SELECT column_name, data_type, data_length, data_precision, data_scale
109       FROM   ALL_TAB_COLUMNS
110       WHERE  table_name = c_table_name
111       AND    owner = g_owner_name
112       ORDER BY column_id;
113   BEGIN
114       p_column_count := 0;
115       FOR rec_col IN cur_col (p_table_name)
116       LOOP
117          p_column_count := p_column_count + 1;
118          p_column_name(p_column_count) := rec_col.column_name;
119          p_column_type(p_column_count) := rec_col.data_type;
120       END LOOP;
121   END Get_Column_Details;
122 
123   PROCEDURE Get_File_Column_Details (
124       p_line                IN VARCHAR2,
125       p_file_column_name    IN OUT NOCOPY string_tab,
126       p_file_column_count   IN OUT NOCOPY NUMBER
127   )
128   IS
129       l_token     VARCHAR2(4000);
130   BEGIN
131       p_file_column_count := LENGTH(p_line) - LENGTH(REPLACE(p_line,g_delimeter_char,'')) + 1;
132 
133       FOR idx IN 0 .. (p_file_column_count-1)
134       LOOP
135           IF (idx=0)
136           THEN
137               IF INSTR(p_line,g_delimeter_char,1,1) <> 0
138               THEN
139                   l_token := SUBSTR(p_line,1,INSTR(p_line,g_delimeter_char,1,1)-1);
140               ELSE
141                   l_token := p_line;
142               END IF;
143           ELSE
144               IF INSTR(p_line,g_delimeter_char,1,idx+1) <> 0
145               THEN
146                   l_token := SUBSTR(p_line,INSTR(p_line,g_delimeter_char,1,idx)+1,
147                                                 INSTR(p_line,g_delimeter_char,1,idx+1)-INSTR(p_line,g_delimeter_char,1,idx)-1);
148               ELSE
149                   l_token  := SUBSTR(p_line,INSTR(p_line,g_delimeter_char,1,idx)+1);
150               END IF;
151           END IF;
152 
153           p_file_column_name(idx+1) := l_token;
154       END LOOP;
155 
156   END Get_File_Column_Details;
157 
158   FUNCTION Column_Exists (p_column_name_array IN string_tab,p_column_name IN VARCHAR2)
159   RETURN BOOLEAN
160   IS
161       l_return_value    BOOLEAN := FALSE;
162   BEGIN
163       FOR indx IN 1 .. p_column_name_array.COUNT
164       LOOP
165           IF p_column_name_array(indx) = p_column_name
166           THEN
167               l_return_value := TRUE;
168               EXIT;
169           END IF;
170       END LOOP;
171       RETURN l_return_value;
172   END Column_Exists;
173 
174   PROCEDURE Export_Error (
175         p_table_name          IN VARCHAR2,
176         p_load_id             IN NUMBER   DEFAULT NULL,
177         p_file_name           IN VARCHAR2 DEFAULT NULL
178   )
179   AS
180       l_where_clause        VARCHAR2(500);
181       l_file_name           VARCHAR2(100);
182   BEGIN
183       l_where_clause := 'WHERE ACTION_FLAG = ''N'' ';
184       IF p_load_id IS NOT NULL
185       THEN
186           l_where_clause := l_where_clause || ' AND LOAD_ID = ' || p_load_id;
187       END IF;
188 
189       l_file_name := p_file_name;
190       IF l_file_name IS NULL
191       THEN
192           IF p_load_id IS NOT NULL
193           THEN
194               l_file_name := p_table_name || '_' || TO_CHAR(p_load_id) || '.err';
195           ELSE
196               l_file_name := p_table_name || '.err';
197           END IF;
198       END IF;
199 
200       Export_Data(p_table_name,l_where_clause,NVL(p_file_name,l_file_name));
201   END Export_Error;
202 
203   PROCEDURE Export_Data (
204         p_table_name          IN VARCHAR2,
205         p_where_clause        IN VARCHAR2  DEFAULT NULL,
206         p_file_name           IN VARCHAR2  DEFAULT NULL
207   )
208   AS
209       l_dir_name            VARCHAR2(100);
210       l_table_name          VARCHAR2(30);
211       l_where_clause        VARCHAR2(500);
212       l_file                UTL_FILE.FILE_TYPE;
213       l_column_name         string_tab;
214       l_column_type         string_tab;
215       l_column_count        INTEGER;
216       l_column_list         VARCHAR2(10000);
217       l_SQL_stmt            VARCHAR2(10000);
218       cur_err               INTEGER;
219       l_string_value        VARCHAR2(1000);
220       l_number_value        NUMBER;
221       l_date_value          DATE;
222       l_return_value        INTEGER;
223       l_line                VARCHAR2(10000);
224       l_hdr_line            VARCHAR2(10000);
225   BEGIN
226       l_table_name := UPPER(p_table_name);
227       l_where_clause := LTRIM(UPPER(p_where_clause));
228 
229       Validate_Parameters(l_table_name,p_file_name);
230       Get_Column_Details(l_table_name,l_column_name,l_column_type,l_column_count);
231 
232       /* Build the SELECT statement to be executed */
233       FOR indx IN 1 .. l_column_count
234       LOOP
235           IF indx = 1
236           THEN
237               l_column_list := l_column_name(indx);
238               l_hdr_line := l_column_name(indx);
239           ELSE
240               l_column_list := l_column_list || ', ' || l_column_name(indx);
241               l_hdr_line := l_hdr_line || g_delimeter_char || l_column_name(indx);
242           END IF;
243       END LOOP;
244 
245       l_SQL_stmt := 'SELECT ' || l_column_list;
246       l_SQL_stmt := l_SQL_stmt || ' FROM ' || l_table_name;
247       IF l_where_clause IS NOT NULL
248       THEN
249           IF (l_where_clause NOT LIKE 'GROUP BY%' AND l_where_clause NOT LIKE 'ORDER BY%')
250           THEN
251               l_where_clause := ' WHERE ' || LTRIM (l_where_clause, 'WHERE');
252           END IF;
253       END IF;
254       l_SQL_stmt := l_SQL_stmt || l_where_clause;
255 
256       l_dir_name := Get_Directory_Name;
257       l_file := UTL_FILE.FOPEN(l_dir_name,p_file_name,'W',g_max_linesize);
258 
259       /* Retrieve the records */
260       cur_err := DBMS_SQL.OPEN_CURSOR;
261       DBMS_SQL.PARSE(cur_err,l_SQL_stmt,DBMS_SQL.NATIVE);
262       FOR col_indx IN 1 .. l_column_count
263       LOOP
264           IF is_string(l_column_type,col_indx)
265           THEN
266               DBMS_SQL.DEFINE_COLUMN (cur_err,col_indx,l_string_value,g_max_col_value_size);
267           ELSIF is_number(l_column_type,col_indx)
268           THEN
269               DBMS_SQL.DEFINE_COLUMN (cur_err,col_indx,l_number_value);
270           ELSIF is_date (l_column_type,col_indx)
271           THEN
272               DBMS_SQL.DEFINE_COLUMN (cur_err,col_indx,l_date_value);
273           END IF;
274       END LOOP;
275       l_return_value := DBMS_SQL.EXECUTE(cur_err);
276 
277       LOOP
278           l_return_value := DBMS_SQL.FETCH_ROWS(cur_err);
279           EXIT WHEN l_return_value = 0;
280 
281           IF DBMS_SQL.last_row_count = 1
282           THEN
283               /* Write the column header line in file */
284               UTL_FILE.PUT_LINE(l_file,l_hdr_line);
285           END IF;
286 
287           l_line := NULL;
288           FOR col_indx IN 1 .. l_column_count
289           LOOP
290               IF is_string(l_column_type,col_indx)
291               THEN
292                   DBMS_SQL.COLUMN_VALUE(cur_err,col_indx,l_string_value);
293               ELSIF is_number(l_column_type,col_indx)
294               THEN
295                   DBMS_SQL.COLUMN_VALUE(cur_err,col_indx,l_number_value);
296                   l_string_value := TO_CHAR (l_number_value);
297               ELSIF is_date(l_column_type,col_indx)
298               THEN
299                   DBMS_SQL.COLUMN_VALUE(cur_err,col_indx,l_date_value);
300                   l_string_value := TO_CHAR(l_date_value,'YYYY/MM/DD');
301               END IF;
302 
303               l_line := l_line || g_delimeter_char || l_string_value;
304           END LOOP;
305           l_line := SUBSTR(l_line,2);
306 
307           /* Write the line to the file */
308           UTL_FILE.PUT_LINE(l_file,l_line);
309       END LOOP;
310       DBMS_SQL.CLOSE_CURSOR(cur_err);
311 
312       UTL_FILE.FCLOSE(l_file);
313   EXCEPTION
314       WHEN OTHERS
315       THEN
316           IF UTL_FILE.IS_OPEN(l_file)
317           THEN
318               UTL_FILE.FCLOSE(l_file);
319           END IF;
320           IF DBMS_SQL.IS_OPEN(cur_err)
321           THEN
322               DBMS_SQL.CLOSE_CURSOR(cur_err);
323           END IF;
324           Raise_Error(SQLERRM);
325   END Export_Data;
326 
327   PROCEDURE Import_Error (
328         p_table_name          IN VARCHAR2,
329         p_file_name           IN VARCHAR2  DEFAULT NULL
330   )
331   AS
332       l_dir_name            VARCHAR2(100);
333       l_table_name          VARCHAR2(30);
334       l_file_name           VARCHAR2(100);
335       l_file                UTL_FILE.FILE_TYPE;
336       l_column_name         string_tab;
337       l_column_type         string_tab;
338       l_column_count        INTEGER;
339       l_update_stmt         VARCHAR2(10000);
340       l_insert_stmt         VARCHAR2(10000);
341       l_update_clause       VARCHAR2(10000);
342       l_insert_clause       VARCHAR2(10000);
343       l_values_clause       VARCHAR2(10000);
344       cur_err_upd           INTEGER;
345       cur_err_ins           INTEGER;
346       l_string_value        VARCHAR2(4000);
347       l_number_value        NUMBER;
348       l_date_value          DATE;
349       l_return_value        INTEGER;
350       l_line                VARCHAR2(10000);
351       l_col_type_by_name    string_index_by_char_tab;
352       l_file_column_name    string_tab;
353       l_file_column_count   INTEGER;
354       l_rec_count           INTEGER;
355       l_count               INTEGER;
356       l_file_column_value   string_tab;
357       l_update_count        INTEGER;
358       l_action_column_indx  INTEGER;
359   BEGIN
360       l_table_name := UPPER(p_table_name);
361       l_file_name := NVL(p_file_name,l_table_name || '.err');
362 
363       Validate_Parameters(l_table_name,l_file_name);
364       Get_Column_Details(l_table_name,l_column_name,l_column_type,l_column_count);
365 
366       FOR indx IN 1 .. l_column_count
367       LOOP
368           l_col_type_by_name(l_column_name(indx)) := l_column_type(indx);
369       END LOOP;
370 
371       l_dir_name := Get_Directory_Name;
372       l_file := UTL_FILE.FOPEN(l_dir_name,l_file_name,'R',g_max_linesize);
373 
374       /* Read and Process all records from the file */
375       cur_err_upd := DBMS_SQL.OPEN_CURSOR;
376       cur_err_ins := DBMS_SQL.OPEN_CURSOR;
377       l_rec_count := 0;
378       LOOP
379           BEGIN
380               UTL_FILE.GET_LINE(l_file,l_line,g_max_linesize);
381               l_rec_count := l_rec_count + 1;
382 
383               IF l_rec_count = 1  /* Assuming that the first line is the File Column Header Line */
384               THEN
385                   /* Build l_file_column_name array based on the read line */
386                   Get_File_Column_Details(l_line,l_file_column_name,l_file_column_count);
387 
388                   /* Build the Update and Insert Statements and Parse them */
389                   l_update_clause := 'UPDATE ' || l_table_name || ' SET ';
390                   l_insert_clause := 'INSERT INTO ' || l_table_name ||'(';
391                   l_values_clause := 'VALUES (';
392                   l_action_column_indx := 0;
393                   FOR indx IN 1 .. l_file_column_count
394                   LOOP
395                       IF l_file_column_name(indx) <> 'REC_ID'
396                       THEN
397                           l_update_clause := l_update_clause || ' ' || l_file_column_name(indx)
398                                                 || '=:' || l_file_column_name(indx) || ',';
399                       END IF;
400                       l_insert_clause := l_insert_clause || l_file_column_name(indx) || ',';
401                       l_values_clause := l_values_clause || ':' || l_file_column_name(indx) || ',';
402 
403                       IF l_file_column_name(indx) = 'ACTION_FLAG'
404                       THEN
405                           l_action_column_indx := indx;
406                       END IF;
407                   END LOOP;
408                   l_update_clause := SUBSTR(l_update_clause,1,LENGTH(l_update_clause)-1);
409                   l_insert_clause := SUBSTR(l_insert_clause,1,LENGTH(l_insert_clause)-1);
410                   l_values_clause := SUBSTR(l_values_clause,1,LENGTH(l_values_clause)-1);
411                   l_update_stmt := l_update_clause || ' WHERE REC_ID=:REC_ID';
412                   l_insert_stmt := l_insert_clause || ') ' || l_values_clause || ')';
416               ELSE
413 
414                   DBMS_SQL.PARSE(cur_err_upd,l_update_stmt,DBMS_SQL.NATIVE);
415                   DBMS_SQL.PARSE(cur_err_ins,l_insert_stmt,DBMS_SQL.NATIVE);
417                   Get_File_Column_Details(l_line,l_file_column_value,l_count);
418 
419                   IF l_file_column_value(l_action_column_indx) IN ('Y','D') /* i.e. ACTION_FLAG IN ('Y','D') */
420                   THEN
421                       /* Update the record into table */
422                       FOR indx IN 1 .. l_count
423                       LOOP
424                           IF is_string(l_col_type_by_name(l_file_column_name(indx)))
425                           THEN
426                               l_string_value := l_file_column_value(indx);
427                               DBMS_SQL.BIND_VARIABLE(cur_err_upd,l_file_column_name(indx),l_string_value);
428                           ELSIF is_number(l_col_type_by_name(l_file_column_name(indx)))
429                           THEN
430                               l_number_value := TO_NUMBER(l_file_column_value(indx));
431                               DBMS_SQL.BIND_VARIABLE(cur_err_upd,l_file_column_name(indx),l_number_value);
432                           ELSIF is_date(l_col_type_by_name(l_file_column_name(indx)))
433                           THEN
434                               l_date_value := TO_DATE(l_file_column_value(indx),'YYYY/MM/DD');
435                               DBMS_SQL.BIND_VARIABLE(cur_err_upd,l_file_column_name(indx),l_date_value);
436                           END IF;
437                       END LOOP;
438                       l_update_count := DBMS_SQL.EXECUTE(cur_err_upd);
439 
440                       /* Insert the record into table if Update fails */
441                       IF l_update_count = 0
442                       THEN
443                           FOR indx IN 1 .. l_count
444                           LOOP
445                               IF is_string(l_col_type_by_name(l_file_column_name(indx)))
446                               THEN
447                                   l_string_value := l_file_column_value(indx);
448                                   DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_string_value);
449                               ELSIF is_number(l_col_type_by_name(l_file_column_name(indx)))
450                               THEN
451                                   l_number_value := TO_NUMBER(l_file_column_value(indx));
452                                   DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_number_value);
453                               ELSIF is_date(l_col_type_by_name(l_file_column_name(indx)))
454                               THEN
455                                   l_date_value := TO_DATE(l_file_column_value(indx),'YYYY/MM/DD');
456                                   DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_date_value);
457                               END IF;
458                           END LOOP;
459                           l_update_count := DBMS_SQL.EXECUTE(cur_err_ins);
460                       END IF;
461                   END IF;
462 
463               END IF;
464 
465           EXCEPTION
466               WHEN NO_DATA_FOUND
467               THEN EXIT;
468           END;
469       END LOOP;
470 
471       DBMS_SQL.CLOSE_CURSOR(cur_err_upd);
472       DBMS_SQL.CLOSE_CURSOR(cur_err_ins);
473       UTL_FILE.FCLOSE(l_file);
474 
475       COMMIT;
476   EXCEPTION
477       WHEN OTHERS
478       THEN
479           IF UTL_FILE.IS_OPEN(l_file)
480           THEN
481               UTL_FILE.FCLOSE(l_file);
482           END IF;
483           IF DBMS_SQL.IS_OPEN(cur_err_upd)
484           THEN
485               DBMS_SQL.CLOSE_CURSOR(cur_err_upd);
486           END IF;
487           IF DBMS_SQL.IS_OPEN(cur_err_ins)
488           THEN
489               DBMS_SQL.CLOSE_CURSOR(cur_err_ins);
490           END IF;
491           Raise_Error(SQLERRM);
492   END Import_Error;
493 
494   PROCEDURE Import_Error (
495         p_table_name          IN VARCHAR2,
496         p_file_name           IN VARCHAR2  DEFAULT NULL,
497         p_err_table_name      IN VARCHAR2,
498         p_load_id             IN NUMBER    DEFAULT NULL,
499         p_tgt_table_type      IN VARCHAR2  DEFAULT 'I'
500   )
501   AS
502       l_err_table_name      VARCHAR2(30);
503   BEGIN
504       IF p_err_table_name IS NULL
505       THEN
506           l_err_table_name := 'DDR_E_' || SUBSTR(p_table_name,7);
507       ELSE
508           l_err_table_name := p_err_table_name;
509       END IF;
510       Import_Error(l_err_table_name,p_file_name);
511       Transfer_Data(l_err_table_name,p_table_name,p_load_id,p_tgt_table_type);
512   END Import_Error;
513 
514   PROCEDURE Import_Data (
515         p_table_name          IN VARCHAR2,
516         p_file_name           IN VARCHAR2  DEFAULT NULL
517   )
518   AS
519       l_dir_name            VARCHAR2(100);
520       l_table_name          VARCHAR2(30);
521       l_file_name           VARCHAR2(100);
522       l_file                UTL_FILE.FILE_TYPE;
523       l_column_name         string_tab;
524       l_column_type         string_tab;
525       l_column_count        INTEGER;
526       l_insert_stmt         VARCHAR2(10000);
527       l_insert_clause       VARCHAR2(10000);
528       l_values_clause       VARCHAR2(10000);
529       cur_err_ins           INTEGER;
530       l_string_value        VARCHAR2(4000);
534       l_line                VARCHAR2(10000);
531       l_number_value        NUMBER;
532       l_date_value          DATE;
533       l_return_value        INTEGER;
535       l_col_type_by_name    string_index_by_char_tab;
536       l_file_column_name    string_tab;
537       l_file_column_count   INTEGER;
538       l_rec_count           INTEGER;
539       l_count               INTEGER;
540       l_file_column_value   string_tab;
541       l_update_count        INTEGER;
542   BEGIN
543       l_table_name := UPPER(p_table_name);
544       l_file_name := NVL(p_file_name,l_table_name || '.txt');
545 
546       Validate_Parameters(l_table_name,l_file_name);
547       Get_Column_Details(l_table_name,l_column_name,l_column_type,l_column_count);
548 
549       FOR indx IN 1 .. l_column_count
550       LOOP
551           l_col_type_by_name(l_column_name(indx)) := l_column_type(indx);
552       END LOOP;
553 
554       l_dir_name := Get_Directory_Name;
555       l_file := UTL_FILE.FOPEN(l_dir_name,l_file_name,'R',g_max_linesize);
556 
557       /* Read and Process all records from the file */
558       cur_err_ins := DBMS_SQL.OPEN_CURSOR;
559       l_rec_count := 0;
560       LOOP
561           BEGIN
562               UTL_FILE.GET_LINE(l_file,l_line,g_max_linesize);
563               l_rec_count := l_rec_count + 1;
564 
565               IF l_rec_count = 1  /* Assuming that the first line is the File Column Header Line */
566               THEN
567                   /* Build l_file_column_name array based on the read line */
568                   Get_File_Column_Details(l_line,l_file_column_name,l_file_column_count);
569 
570                   /* Build the Insert Statement and Parse them */
571                   l_insert_clause := 'INSERT INTO ' || l_table_name ||'(';
572                   l_values_clause := 'VALUES (';
573                   FOR indx IN 1 .. l_file_column_count
574                   LOOP
575                       l_insert_clause := l_insert_clause || l_file_column_name(indx) || ',';
576                       l_values_clause := l_values_clause || ':' || l_file_column_name(indx) || ',';
577                   END LOOP;
578                   l_insert_clause := SUBSTR(l_insert_clause,1,LENGTH(l_insert_clause)-1);
579                   l_values_clause := SUBSTR(l_values_clause,1,LENGTH(l_values_clause)-1);
580                   l_insert_stmt := l_insert_clause || ') ' || l_values_clause || ')';
581 
582                   DBMS_SQL.PARSE(cur_err_ins,l_insert_stmt,DBMS_SQL.NATIVE);
583               ELSE
584                   Get_File_Column_Details(l_line,l_file_column_value,l_count);
585 
586                   /* Insert the record into table */
587                   FOR indx IN 1 .. l_count
588                   LOOP
589                       IF is_string(l_col_type_by_name(l_file_column_name(indx)))
590                       THEN
591                           l_string_value := l_file_column_value(indx);
592                           DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_string_value);
593                       ELSIF is_number(l_col_type_by_name(l_file_column_name(indx)))
594                       THEN
595                           l_number_value := TO_NUMBER(l_file_column_value(indx));
596                           DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_number_value);
597                       ELSIF is_date(l_col_type_by_name(l_file_column_name(indx)))
598                       THEN
599                           l_date_value := TO_DATE(l_file_column_value(indx),'YYYY/MM/DD');
600                           DBMS_SQL.BIND_VARIABLE(cur_err_ins,l_file_column_name(indx),l_date_value);
601                       END IF;
602                   END LOOP;
603                   l_update_count := DBMS_SQL.EXECUTE(cur_err_ins);
604 
605               END IF;
606 
607           EXCEPTION
608               WHEN NO_DATA_FOUND
609               THEN EXIT;
610           END;
611       END LOOP;
612 
613       DBMS_SQL.CLOSE_CURSOR(cur_err_ins);
614       UTL_FILE.FCLOSE(l_file);
615 
616       COMMIT;
617   EXCEPTION
618       WHEN OTHERS
619       THEN
620           IF UTL_FILE.IS_OPEN(l_file)
621           THEN
622               UTL_FILE.FCLOSE(l_file);
623           END IF;
624           IF DBMS_SQL.IS_OPEN(cur_err_ins)
625           THEN
626               DBMS_SQL.CLOSE_CURSOR(cur_err_ins);
627           END IF;
628           Raise_Error(SQLERRM);
629   END Import_Data;
630 
631   PROCEDURE Transfer_Data (
632         p_src_table_name      IN VARCHAR2,
633         p_tgt_table_name      IN VARCHAR2,
634         p_load_id             IN NUMBER    DEFAULT NULL,
635         p_tgt_table_type      IN VARCHAR2  DEFAULT 'I'
636   )
637   AS
638       l_src_table_name          VARCHAR2(30);
639       l_tgt_table_name          VARCHAR2(30);
640       l_tgt_table_type          VARCHAR2(30);
641       l_src_column_name         string_tab;
642       l_src_column_type         string_tab;
643       l_src_column_count        INTEGER;
644       l_src_column_list         VARCHAR2(10000);
645       l_select_stmt             VARCHAR2(10000);
646       l_where_clause            VARCHAR2(500);
647       l_tgt_column_name         string_tab;
648       l_tgt_column_type         string_tab;
649       l_tgt_column_count        INTEGER;
653       l_update_clause           VARCHAR2(10000);
650       l_tgt_col_type_by_name    string_index_by_char_tab;
651       l_update_stmt             VARCHAR2(10000);
652       l_insert_stmt             VARCHAR2(10000);
654       l_insert_clause           VARCHAR2(10000);
655       l_values_clause           VARCHAR2(10000);
656       l_delete_stmt             VARCHAR2(1000);
657       l_rest_where_clause       VARCHAR2(500);
658       cur_src                   INTEGER;
659       cur_tgt_upd               INTEGER;
660       cur_tgt_ins               INTEGER;
661       l_string_value            VARCHAR2(1000);
662       l_number_value            NUMBER;
663       l_date_value              DATE;
664       l_return_value            INTEGER;
665       l_update_count            INTEGER;
666   BEGIN
667       l_src_table_name := UPPER(p_src_table_name);
668       l_tgt_table_name := UPPER(p_tgt_table_name);
669       l_tgt_table_type := NVL(p_tgt_table_type,'I');
670 
671       Validate_Parameters(l_src_table_name,'DUMMY');
672       Validate_Parameters(l_tgt_table_name,'DUMMY');
673       IF p_tgt_table_type NOT IN ('I','S')
674       THEN
675           Raise_Error('Valid values for Target Table Type is ''I'' and ''S'' only');
676       END IF;
677 
678       /* Get Target Table Column details */
679       Get_Column_Details(l_tgt_table_name,l_tgt_column_name,l_tgt_column_type,l_tgt_column_count);
680       FOR indx IN 1 .. l_tgt_column_count
681       LOOP
682           l_tgt_col_type_by_name(l_tgt_column_name(indx)) := l_tgt_column_type(indx);
683       END LOOP;
684 
685       /* Get Source Table Column details */
686       Get_Column_Details(l_src_table_name,l_src_column_name,l_src_column_type,l_src_column_count);
687 
688       /* Build the SELECT statement to be executed */
689       FOR indx IN 1 .. l_src_column_count
690       LOOP
691           IF indx = 1
692           THEN
693               l_src_column_list := l_src_column_name(indx);
694           ELSE
695               l_src_column_list := l_src_column_list || ', ' || l_src_column_name(indx);
696           END IF;
697       END LOOP;
698 
699       l_select_stmt := 'SELECT ' || l_src_column_list;
700       l_select_stmt := l_select_stmt || ' FROM ' || l_src_table_name;
701       l_where_clause := ' WHERE ACTION_FLAG = ''Y'' ';
702       l_rest_where_clause := null;
703       IF p_load_id IS NOT NULL
704       THEN
705           l_where_clause := l_where_clause || ' AND LOAD_ID = ' || p_load_id;
706           l_rest_where_clause := l_rest_where_clause || ' AND LOAD_ID = ' || p_load_id;
707       END IF;
708       IF Column_Exists(l_src_column_name,'SRC_IDNT_FLAG')
709       THEN
710           l_where_clause := l_where_clause || ' AND SRC_IDNT_FLAG = ''' || l_tgt_table_type || '''';
711           l_rest_where_clause := l_rest_where_clause || ' AND SRC_IDNT_FLAG = ''' || l_tgt_table_type || '''';
712       END IF;
713       l_select_stmt := l_select_stmt || l_where_clause;
714 
715       cur_tgt_upd := DBMS_SQL.OPEN_CURSOR;
716       cur_tgt_ins := DBMS_SQL.OPEN_CURSOR;
717 
718       /* Build the Update and Insert Statements and Parse them */
719       l_update_clause := 'UPDATE ' || l_tgt_table_name || ' SET ';
720       l_insert_clause := 'INSERT INTO ' || l_tgt_table_name ||'(';
721       l_values_clause := 'VALUES (';
722       FOR indx IN 1 .. l_tgt_column_count
723       LOOP
724           /* Here check for the corresponding column existance in source table */
725           IF Column_Exists(l_src_column_name,l_tgt_column_name(indx))
726           THEN
727               IF l_tgt_column_name(indx) <> 'REC_ID'
728               THEN
729                   l_update_clause := l_update_clause || ' ' || l_tgt_column_name(indx)
730                                         || '=:' || l_tgt_column_name(indx) || ',';
731               END IF;
732               l_insert_clause := l_insert_clause || l_tgt_column_name(indx) || ',';
733               l_values_clause := l_values_clause || ':' || l_tgt_column_name(indx) || ',';
734           END IF;
735       END LOOP;
736       l_update_clause := SUBSTR(l_update_clause,1,LENGTH(l_update_clause)-1);
737       l_insert_clause := SUBSTR(l_insert_clause,1,LENGTH(l_insert_clause)-1);
738       l_values_clause := SUBSTR(l_values_clause,1,LENGTH(l_values_clause)-1);
739       l_update_stmt := l_update_clause || ' WHERE REC_ID=:REC_ID';
740       l_insert_stmt := l_insert_clause || ') ' || l_values_clause || ')';
741 
742       DBMS_SQL.PARSE(cur_tgt_upd,l_update_stmt,DBMS_SQL.NATIVE);
743       DBMS_SQL.PARSE(cur_tgt_ins,l_insert_stmt,DBMS_SQL.NATIVE);
744 
745       /* Retrieve the records from Source table */
746       cur_src := DBMS_SQL.OPEN_CURSOR;
747       DBMS_SQL.PARSE(cur_src,l_select_stmt,DBMS_SQL.NATIVE);
748       FOR col_indx IN 1 .. l_src_column_count
749       LOOP
750           IF is_string(l_src_column_type,col_indx)
751           THEN
752               DBMS_SQL.DEFINE_COLUMN (cur_src,col_indx,l_string_value,g_max_col_value_size);
753           ELSIF is_number(l_src_column_type,col_indx)
754           THEN
755               DBMS_SQL.DEFINE_COLUMN (cur_src,col_indx,l_number_value);
756           ELSIF is_date (l_src_column_type,col_indx)
757           THEN
758               DBMS_SQL.DEFINE_COLUMN (cur_src,col_indx,l_date_value);
759           END IF;
760       END LOOP;
761 
765           EXIT WHEN l_return_value = 0;
762       l_return_value := DBMS_SQL.EXECUTE(cur_src);
763       LOOP
764           l_return_value := DBMS_SQL.FETCH_ROWS(cur_src);
766 
767           FOR col_indx IN 1 .. l_src_column_count
768           LOOP
772                   THEN
769               IF Column_Exists(l_tgt_column_name,l_src_column_name(col_indx))
770               THEN
771                   IF is_string(l_src_column_type,col_indx)
773                       DBMS_SQL.COLUMN_VALUE(cur_src,col_indx,l_string_value);
774                       DBMS_SQL.BIND_VARIABLE(cur_tgt_upd,l_src_column_name(col_indx),l_string_value);
775                       DBMS_SQL.BIND_VARIABLE(cur_tgt_ins,l_src_column_name(col_indx),l_string_value);
776                   ELSIF is_number(l_src_column_type,col_indx)
777                   THEN
778                       DBMS_SQL.COLUMN_VALUE(cur_src,col_indx,l_number_value);
779                       DBMS_SQL.BIND_VARIABLE(cur_tgt_upd,l_src_column_name(col_indx),l_number_value);
780                       DBMS_SQL.BIND_VARIABLE(cur_tgt_ins,l_src_column_name(col_indx),l_number_value);
781                   ELSIF is_date(l_src_column_type,col_indx)
782                   THEN
783                       DBMS_SQL.COLUMN_VALUE(cur_src,col_indx,l_date_value);
784                       DBMS_SQL.BIND_VARIABLE(cur_tgt_upd,l_src_column_name(col_indx),l_date_value);
785                       DBMS_SQL.BIND_VARIABLE(cur_tgt_ins,l_src_column_name(col_indx),l_date_value);
786                   END IF;
787               END IF;
788           END LOOP;
789 
790           /* Execute the DML statement against the Target table */
791           l_update_count := DBMS_SQL.EXECUTE(cur_tgt_upd);
792 
793           /* Insert the record into table if Update fails */
794           IF l_update_count = 0
795           THEN
796               l_update_count := DBMS_SQL.EXECUTE(cur_tgt_ins);
797           END IF;
798       END LOOP;
799 
800       DBMS_SQL.CLOSE_CURSOR(cur_src);
801       DBMS_SQL.CLOSE_CURSOR(cur_tgt_upd);
802       DBMS_SQL.CLOSE_CURSOR(cur_tgt_ins);
803 
804       /* Delete Records from Target table for records marked with "ACTION_FLAG = 'D'" in Source Error table */
805       l_delete_stmt := 'DELETE FROM ' || l_tgt_table_name || ' WHERE REC_ID IN (SELECT REC_ID FROM ' || l_src_table_name;
806       l_delete_stmt := l_delete_stmt || ' WHERE ACTION_FLAG = ''D'' ' || l_rest_where_clause || ')';
807       EXECUTE IMMEDIATE l_delete_stmt;
808 
809       /* Delete Transfered Records (i.e. ACTION_FLAG = 'Y') as well as records marked with "ACTION_FLAG = 'D'"
810         from Source Error table */
811       l_delete_stmt := 'DELETE FROM ' || l_src_table_name || ' WHERE ACTION_FLAG IN (''Y'',''D'') ' || l_rest_where_clause;
812       EXECUTE IMMEDIATE l_delete_stmt;
813 
814       COMMIT;
815   EXCEPTION
816       WHEN OTHERS
817       THEN
818           IF DBMS_SQL.IS_OPEN(cur_src)
819           THEN
820               DBMS_SQL.CLOSE_CURSOR(cur_src);
821           END IF;
822           IF DBMS_SQL.IS_OPEN(cur_tgt_upd)
823           THEN
824               DBMS_SQL.CLOSE_CURSOR(cur_tgt_upd);
825           END IF;
826           IF DBMS_SQL.IS_OPEN(cur_tgt_ins)
827           THEN
828               DBMS_SQL.CLOSE_CURSOR(cur_tgt_ins);
829           END IF;
830           Raise_Error(SQLERRM);
831   END Transfer_Data;
832 
833   PROCEDURE refresh_mv (
834         p_list                 IN VARCHAR2,
835         p_method               IN VARCHAR2 DEFAULT NULL,
836         p_rollback_seg         IN VARCHAR2 DEFAULT NULL,
837         p_push_deferred_rpc    IN BOOLEAN  DEFAULT TRUE,
838         p_refresh_after_errors IN BOOLEAN  DEFAULT FALSE,
839         p_purge_option         IN BINARY_INTEGER DEFAULT 1,
840         p_parallelism          IN BINARY_INTEGER DEFAULT 0,
841         p_heap_size            IN BINARY_INTEGER DEFAULT 0,
842         p_atomic_refresh       IN BOOLEAN  DEFAULT TRUE,
843         p_job_id               IN VARCHAR2 DEFAULT NULL,
844         p_refreshed_by         IN VARCHAR2 DEFAULT NULL,
845         x_out                  OUT NOCOPY VARCHAR2,
846         x_message              OUT NOCOPY VARCHAR2
847   ) IS
848         v_seq NUMBER;
849   BEGIN
850 
851     SELECT ddr_u_mv_rfrsh_seq.nextval INTO v_seq FROM DUAL;
852     INSERT INTO ddr_u_mv_rfrsh_log(refresh_job_id
853                        ,refresh_sequence
857                        ,refreshed_by
854                        ,mv_name
855                        ,refresh_method
856                        ,error_message
858                        ,start_date
859                        ,end_date)
860                  VALUES(p_job_id
861                        ,v_seq
862                        ,p_list
863                        ,p_method
864                        ,p_refreshed_by
865                        ,NULL
866                        ,SYSDATE
867                        ,NULL);
868     COMMIT;
869 
870     DBMS_MVIEW.REFRESH(p_list, p_method, p_rollback_seg, p_push_deferred_rpc,
871                        p_refresh_after_errors, p_purge_option,
872                        p_parallelism, p_heap_size, p_atomic_refresh);
873 
874     UPDATE ddr_u_mv_rfrsh_log
875     SET    end_date = SYSDATE
876     WHERE  refresh_job_id = p_job_id
877     AND    refresh_sequence = v_seq
878     AND    mv_name = p_list;
879     COMMIT;
880 
881     x_out := 'S';
882     x_message := NULL;
883   EXCEPTION
884     WHEN OTHERS THEN
885       x_out := 'F';
886       x_message := SQLERRM;
887 
888       UPDATE ddr_u_mv_rfrsh_log
889       SET    end_date = SYSDATE,
890              error_message = x_message
891       WHERE  refresh_job_id = p_job_id
892       AND    refresh_sequence = v_seq
893       AND    mv_name = p_list;
894       COMMIT;
895 
896   END refresh_mv;
897 
898   FUNCTION get_mv_refresh_job_id RETURN VARCHAR2 IS
899   BEGIN
900     RETURN TO_CHAR(SYSDATE,'YYYYMMDDHH24MMSS');
901   END get_mv_refresh_job_id;
902 
903   PROCEDURE truncate_mv_log(
904         p_mv_log_name          IN VARCHAR2,
905         p_job_id               IN VARCHAR2 DEFAULT NULL,
906         p_refreshed_by         IN VARCHAR2 DEFAULT NULL,
907         x_out                  OUT NOCOPY VARCHAR2,
908         x_message              OUT NOCOPY VARCHAR2
909   ) IS
910         v_seq NUMBER;
911   BEGIN
912 
913     SELECT ddr_u_mv_rfrsh_seq.nextval INTO v_seq FROM DUAL;
914     INSERT INTO ddr_u_mv_rfrsh_log(refresh_job_id
915                        ,refresh_sequence
916                        ,mv_name
917                        ,refresh_method
918                        ,error_message
919                        ,refreshed_by
920                        ,start_date
921                        ,end_date)
922                  VALUES(p_job_id
923                        ,v_seq
924                        ,p_mv_log_name
925                        ,'TRUNCATE'
926                        ,p_refreshed_by
927                        ,NULL
928                        ,SYSDATE
929                        ,NULL);
930     COMMIT;
931 
932     EXECUTE IMMEDIATE 'TRUNCATE TABLE '||p_mv_log_name||' DROP STORAGE';
933 
934     UPDATE ddr_u_mv_rfrsh_log
935     SET    end_date = SYSDATE
936     WHERE  refresh_job_id = p_job_id
937     AND    refresh_sequence = v_seq
938     AND    mv_name = p_mv_log_name;
939     COMMIT;
940 
941     x_out := 'S';
942     x_message := NULL;
943   EXCEPTION
944     WHEN OTHERS THEN
945       x_out := 'F';
946       x_message := SQLERRM;
947       UPDATE ddr_u_mv_rfrsh_log
948       SET    end_date = SYSDATE,
949              error_message = x_message
950       WHERE  refresh_job_id = p_job_id
951       AND    refresh_sequence = v_seq
952       AND    mv_name = p_mv_log_name;
953       COMMIT;
954 
955   END truncate_mv_log;
956 
957 END ddr_etl_util_pkg;