DBA Data[Home] [Help]

PACKAGE BODY: APPS.MSD_CS_COLLECTION

Source


1 package body msd_cs_collection as
2 /* $Header: msdcsclb.pls 120.2.12010000.1 2008/05/15 07:09:28 lannapra ship $ */
3     /*
4        Constants
5     */
6     C_LOG_PROCESSED Constant varchar2(30) :='PROCESSED';
7     C_LOG_ERROR     Constant varchar2(30) :='ERROR';
8     C_DEFAULT_STREAM_NAME Constant varchar2(30) := 'SINGLE_STREAM';
9     C_COLLECT             Constant varchar2(10)  := 'C';
10     C_PULL                Constant varchar2(10)  := 'P';
11     /* Debug */
12     C_DEBUG               Constant varchar2(1) := 'N';
13     /* Bug# 4349618  To commit in Batches */
14     C_BATCH_SIZE          Constant NUMBER := 30000;
15     /*
16        Process Types
17     */
18     C_SOURCE_TO_STAGE Constant NUMBER := 1;
19     C_SOURCE_TO_FACT  Constant NUMBER := 2;
20     C_STAGE_TO_FACT   Constant NUMBER := 3;
21     /*  == */
22     g_level_pk_not_found    varchar2(30):='%%^^)(::Error::%%^^)(';
23     /*  Error Status of prog */
24     g_retcode   varchar2(30);
25     g_errbuf    varchar2(255);
26     /*                       */
27     /*
28     == Local Function/Procedures
29     */
30 
31 Procedure insert_update_Into_Headers (	p_cs_definition_id  in  number,
32 					p_cs_name           in  varchar2,
33 					p_instance_id       in  number,
34                                         p_refresh_num       in  number);
35 
36 Procedure Insert_Data_Into_Headers (  	p_cs_definition_id  in  number,
37 					p_cs_name           in  varchar2,
38 					p_instance_id       in  number,
39                                         p_refresh_num       in  number);
40 Procedure Process_1_Sub (
41         p_cs_rec         in out NOCOPY  msd_cs_definitions_v1%rowtype,
42         p_cs_name           in  varchar2,
43         p_source_view       in  varchar2,
44         p_target_table      in  varchar2,
45         p_instance_id       in  number,
46         p_sql_stmt          in  varchar2,
47         p_new_refresh_num   in  NUMBER);
48 
49     Function Build_SQL_Source(
50         p_cs_definition_id in number,
51         p_process_type     in number,
52         p_instance_id      in varchar2,
53         p_cs_name          in varchar2) return varchar2;
54 
55     Function Build_SQL_FOR_COLLECT_AND_VAL(
56         p_cs_definition_id in number,
57         p_process_type     in number,
58         p_source_view      in varchar2,
59         p_db_link          in varchar2,
60         p_cs_name          in varchar2) return varchar2;
61 
62     Function Build_SQL_INS_AS_SELECT(
63         p_cs_definition_id in number,
64         p_instance_id      in varchar2,
65         p_cs_name          in varchar2,
66         p_source_view      in varchar2,
67         p_db_link          in varchar2) return varchar2;
68 
69     Function Build_Where_Clause (
70         p_tokenized_where   in  varchar2,
71         p_default_where     in  varchar2,
72         p_parameter1        in  varchar2,
73         p_parameter2        in  varchar2,
74         p_parameter3        in  varchar2,
75         p_parameter4        in  varchar2,
76         p_parameter5        in  varchar2,
77         p_parameter6        in  varchar2,
78         p_parameter7        in  varchar2,
79         p_parameter8        in  varchar2,
80         p_parameter9        in  varchar2,
81         p_parameter10       in  varchar2,
82         p_request_id        in  number) return varchar2;
83 
84     Procedure log_processed (
85         crec_data in msd_cs_dfn_utl.g_typ_source_stream,
86         p_cs_rec        in msd_cs_definitions_v1%rowtype,
87         p_cs_name       in varchar2,
88         p_instance_id   in varchar2,
89         p_source_view   in varchar2,
90         p_target_table  in varchar2);
91 
92     Procedure log_error (
93         crec_data in msd_cs_dfn_utl.g_typ_source_stream,
94         p_cs_rec        in msd_cs_definitions_v1%rowtype,
95         p_cs_name       in varchar2,
96         p_instance_id   in varchar2,
97         p_error_message in varchar2,
98         p_source_view   in varchar2,
99         p_target_table  in varchar2);
100 
101     Procedure upd_stage_error (p_pk_id in number, p_process_status in varchar2, p_error_mesg in varchar2);
102 
103     Procedure Refresh_Target(p_process_type in varchar2,
104                              p_cs_definition_id in number,
105                              p_cs_name in varchar2,
106                              p_comp_refresh in varchar2,
107                              p_instance_id in number,
108                              p_new_refresh_num in NUMBER);
109 
110     Procedure ins_row_fact(
111         crec_data          in msd_cs_dfn_utl.g_typ_source_stream,
112         p_cs_rec           in msd_cs_definitions_v1%rowtype,
113         p_cs_name          in varchar2,
114         p_instance_id      in varchar2,
115         p_new_refresh_num  IN NUMBER);
116 
117     Procedure ins_row_staging (
118         crec_data       in  msd_cs_dfn_utl.g_typ_source_stream,
119         p_cs_rec        in msd_cs_definitions_v1%rowtype,
120         p_cs_name       in varchar2,
121         p_instance_id   in varchar2,
122         p_process_status in varchar2,
123         p_error_message in varchar2);
124 
125 
126     Procedure cs_collect_post_process (
127         p_cs_Rec        in msd_cs_definitions_v1%rowtype,
128         p_cs_name       in varchar2,
129         p_instance_id   in varchar2 );
130 
131     Procedure Process_1 (
132         p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
133         p_cs_name           in  varchar2,
134         p_db_link           in  varchar2,
135         p_source_view       in  varchar2,
136         p_target_table      in  varchar2,
137         p_process_type      in  number,
138         p_default_where     in  varchar2,
139         p_tokenized_where   in  varchar2,
140         p_comp_refresh      in  varchar2,
141         p_instance_id       in  number,
142         p_parameter1        in  varchar2,
143         p_parameter2        in  varchar2,
144         p_parameter3        in  varchar2,
145         p_parameter4        in  varchar2,
146         p_parameter5        in  varchar2,
147         p_parameter6        in  varchar2,
148         p_parameter7        in  varchar2,
149         p_parameter8        in  varchar2,
150         p_parameter9        in  varchar2,
151         p_parameter10       in  varchar2,
152         p_new_refresh_num   IN  NUMBER,
153         p_request_id        in  number);
154 
155     Procedure Process_2 (
156         p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
157         p_cs_name           in  varchar2,
158         p_db_link           in  varchar2,
159         p_source_view       in  varchar2,
160         p_target_table      in  varchar2,
161         p_process_type      in  number,
162         p_default_where     in  varchar2,
163         p_tokenized_where   in  varchar2,
164         p_comp_refresh      in  varchar2,
165         p_instance_id       in  number,
166         p_parameter1        in  varchar2,
167         p_parameter2        in  varchar2,
168         p_parameter3        in  varchar2,
169         p_parameter4        in  varchar2,
170         p_parameter5        in  varchar2,
171         p_parameter6        in  varchar2,
172         p_parameter7        in  varchar2,
173         p_parameter8        in  varchar2,
174         p_parameter9        in  varchar2,
175         p_parameter10       in  varchar2,
176         p_request_id        in  number);
177 
178     Function get_level_pk (
179         p_instance          in varchar2,
180         p_level_id          in number,
181         p_sr_level_value_pk in OUT NOCOPY varchar2 ,
182         p_level_value       in OUT NOCOPY varchar2,
183         p_level_value_pk    in OUT NOCOPY varchar2) return varchar2;
184 
185 
186     Procedure show_line(p_sql in    varchar2);
187 
188     Procedure debug_line(p_sql in    varchar2);
189 
190     Function validate_record (
191         crec_data       in out NOCOPY  msd_cs_dfn_utl.g_typ_source_stream,
192         p_cs_rec        in out NOCOPY  msd_Cs_definitions_v1%rowtype,
193         p_instance_id   in      varchar2,
194         p_err_mesg      out    NOCOPY  varchar2) return boolean;
195 
196     Function Build_Designator_Where_Clause(
197         p_cs_rec        in  msd_cs_definitions_v1%rowtype,
198         p_process_type  in  varchar2,
199         p_cs_name       in  varchar2) return varchar2;
200 
201 /* Logic Starts here */
202 Procedure Custom_Stream_Collection (
203                   errbuf           OUT NOCOPY  varchar2,
204                   retcode          OUT NOCOPY  varchar2,
205                   p_collection_type in  varchar2,
206                   p_validate_data   in  varchar2,
207                   p_definition_id   in  number,
208                   p_cs_name         in  varchar2,
209                   p_comp_refresh    in  varchar2,
210                   p_instance_id     in  number,
211                   p_parameter1      in  varchar2,
212                   p_parameter2      in  varchar2,
213                   p_parameter3      in  varchar2,
214                   p_parameter4      in  varchar2,
215                   p_parameter5      in  varchar2,
216                   p_parameter6      in  varchar2,
217                   p_parameter7      in  varchar2,
218                   p_parameter8      in  varchar2,
219                   p_parameter9      in  varchar2,
220                   p_parameter10     in  varchar2,
221                   p_request_id      in  number default 0) is
222 
223 
224     l_single_step_collection    varchar2(30):='Y';
225 
226     ll_name varchar2(80) := null;
227 
228     cursor c_get_cs is
229         select * from msd_cs_definitions_v1
230         where
231             cs_definition_id = p_definition_id and
232             nvl(valid_flag, 'N') = 'Y';
233 
234     l_sql_stmt      varchar2(32767);
235     l_cs_rec        msd_cs_definitions_v1%rowtype;
236     l_target        varchar2(60);
237     l_source        varchar2(60);
238     l_dblink        varchar2(60);
239     l_default_where varchar2(200);
240     l_cs_name       varchar2(255);
241     l_retcode       VARCHAR2(30);
242     L_PROCESS_TYPE  number;
243     l_conc_request_id  number;
244 
245     l_new_refresh_num  NUMBER;
246     l_comp_refresh     VARCHAR2(30);
247 
248 Begin
249 
250   select DESCRIPTION
251   into ll_name
252   from msd_cs_definitions
253   where cs_definition_id = p_definition_id;
254 
255   show_line('Stream Name   : ' || ll_name);
256 
257   debug_line('In Custom Stream Collection');
258   /* Initialize */
259   errbuf := 'Program Completed with Success';
260   retcode := '0';
261 
262   /*  Get profile */
263   l_single_step_collection := nvl(fnd_profile.value('MSD_ONE_STEP_COLLECTION'), 'N');
264 
265   l_conc_request_id := fnd_global.conc_request_id;
266   debug_line('Conc Reuquest ID : ' || l_conc_request_id);
267 
268   /* Validate definition id */
269   open c_get_cs;
270   fetch c_get_cs into l_cs_rec;
271   if c_get_cs%notfound then
272         /*  raise error */
273         retcode := 2;
274         errbuf := 'Custom Definition Not Found : ' || p_definition_id;
275         /* DWK close cursor */
276         close c_get_cs;
277         return;
278   end if;
279   /* DWK close cursor */
280   close c_get_cs;
281 
282   if p_collection_type = C_COLLECT  and l_cs_rec.source_view_name is null then
283         /* Print message/ raise error */
284         retcode := 2;
285         errbuf := 'Can not preform Collection - Source View is not specified.';
286         return;
287   end if;
288 
289   if nvl(l_cs_rec.multiple_stream_flag, 'N') <> 'Y' then
290       l_cs_name := C_DEFAULT_STREAM_NAME;
291   else
292       l_cs_name := p_cs_name;
293   end if;
294   /* Instance must be specified for collection */
295   if p_collection_type = C_COLLECT  and p_instance_id is null then
296         /* Print message/raise error */
297         retcode :=2;
298         errbuf := 'Instance must be specified';
299         return;
300   end if;
301   /*
302      Fetch database link only if stream source type is 'SOURCE'
303   */
304   if l_cs_rec.cs_type in ('SOURCE') and p_collection_type = C_COLLECT then
305        msd_common_utilities.get_db_link(p_instance_id, l_dblink, l_retcode);
306 	   if (l_retcode = -1) then
307 	      	retcode := 2;
308 	      	errbuf := 'Error while getting db_link';
309 	        return;
310 	   end if;
311   end if;
312 
313   /*--------------  For Collection ----------------------------*/
314   IF ( p_collection_type = C_COLLECT )  THEN
315 
316       /* Check and push setup parameters if it is not done so previously */
317       MSD_PUSH_SETUP_DATA.chk_push_setup(   errbuf,
318                                             retcode,
319                                             p_instance_id);
320       IF (nvl(retcode, 0) <> 0) THEN
321           return;
322       END IF;
323 
324       IF (l_single_step_collection = 'Y' and p_validate_data = 'Y') THEN
325           /* One Step Collection will be internally transformed into
326              Two Step Collection and Pull.
327              Set Source and Target
328              Set Global var for  processing error record and marking processed */
329 
330           /* Collect into staging without Validation. Validation will be done in PULL */
331           l_target := 'MSD_ST_CS_DATA';
332           l_process_type := C_SOURCE_TO_STAGE;
333           l_source := l_cs_rec.source_view_name;
334           /* Internally transformed 2 step collection, always performs complete
335              refresh for collection part */
336           l_comp_refresh := 'Y';
337 
338           l_default_where := Build_Designator_Where_Clause( l_cs_rec,
339                                                             l_process_type,
340                                                             p_cs_name);
341           Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
342                          l_cs_name, l_comp_refresh, p_instance_id,
343                          l_new_refresh_num);
344           Process_2(
345                      p_cs_rec            => l_cs_rec,
346                      p_cs_name           => l_cs_name,
347                      p_db_link           => l_dblink,
348                      p_source_view       => l_source,
349                      p_target_table      => l_target,
350                      p_process_type      => l_process_type,
351                      p_default_where     => l_default_where,
352                      p_tokenized_where   => l_cs_rec.collect_addtl_where_clause,
353                      p_comp_refresh      => l_comp_refresh,
354                      p_instance_id       => p_instance_id,
355                      p_parameter1        => p_parameter1,
356                      p_parameter2        => p_parameter2,
357                      p_parameter3        => p_parameter3,
358                      p_parameter4        => p_parameter4,
359                      p_parameter5        => p_parameter5,
360                      p_parameter6        => p_parameter6,
361                      p_parameter7        => p_parameter7,
362                      p_parameter8        => p_parameter8,
363                      p_parameter9        => p_parameter9,
364                      p_parameter10       => p_parameter10,
365                      p_request_id        => l_conc_request_id);
366 
367        /* Custom Steam Collection Post Process
368           After data has been collected from source to staging */
369        cs_collect_post_process(l_cs_rec,
370                            l_cs_name,
371                            p_instance_id);
372 
373           /* Pull
374               Set Source and Target
375               Set Global var for  processing error record and marking processed */
376           l_target := 'MSD_CS_DATA';
377           l_source := 'MSD_ST_CS_DATA';
378           l_process_type := C_STAGE_TO_FACT;
379           l_default_where := Build_Designator_Where_Clause( l_cs_rec,
380                                                             l_process_type,
381                                                             p_cs_name);
382           /* Get a new seq number for pull part */
383           SELECT msd.msd_last_refresh_number_s.nextval into
384                  l_new_refresh_Num from dual;
385 
386           /* Refresh Target */
387           Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
388                          l_cs_name, p_comp_refresh, p_instance_id,
389                          l_new_refresh_num);
390           Process_1(
391                      p_cs_rec            => l_cs_rec,
392                      p_cs_name           => l_cs_name,
393                      p_db_link           => l_dblink,
394                      p_source_view       => l_source,
395                      p_process_type      => C_STAGE_TO_FACT,
396                      p_target_table      => l_target,
397                      p_default_where     => l_default_where,
398                      p_tokenized_where   => NULL,
399                      p_comp_refresh      => p_comp_refresh,
400                      p_instance_id       => p_instance_id,
401                      p_parameter1        => p_parameter1,
402                      p_parameter2        => p_parameter2,
403                      p_parameter3        => p_parameter3,
404                      p_parameter4        => p_parameter4,
405                      p_parameter5        => p_parameter5,
406                      p_parameter6        => p_parameter6,
407                      p_parameter7        => p_parameter7,
408                      p_parameter8        => p_parameter8,
409                      p_parameter9        => p_parameter9,
410                      p_parameter10       => p_parameter10,
411                      p_new_refresh_num   => l_new_refresh_num,
412                      p_request_id        => l_conc_request_id);
413 
414       ELSIF (l_single_step_collection = 'N' and p_validate_data = 'Y') THEN
415           /*
416             Set Source and Target
417             Set Global var for  processing error record and marking processed
418           */
419           l_target := 'MSD_ST_CS_DATA';
420           l_process_type := C_SOURCE_TO_STAGE;
421           l_source := l_cs_rec.source_view_name;
422           l_default_where := Build_Designator_Where_Clause( l_cs_rec,
423                                                             l_process_type,
424                                                             p_cs_name);
425           Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
426                          l_cs_name, p_comp_refresh, p_instance_id,
427                          l_new_refresh_num);
428           Process_1(
429                      p_cs_rec            => l_cs_rec,
430                      p_cs_name           => l_cs_name,
431                      p_db_link           => l_dblink,
432                      p_source_view       => l_source,
433                      p_target_table      => l_target,
434                      p_process_type      => l_process_type,
435                      p_default_where     => l_default_where,
436                      p_tokenized_where   => l_cs_rec.collect_addtl_where_clause,
437                      p_comp_refresh      => p_comp_refresh,
438                      p_instance_id       => p_instance_id,
439                      p_parameter1        => p_parameter1,
440                      p_parameter2        => p_parameter2,
441                      p_parameter3        => p_parameter3,
442                      p_parameter4        => p_parameter4,
443                      p_parameter5        => p_parameter5,
444                      p_parameter6        => p_parameter6,
445                      p_parameter7        => p_parameter7,
446                      p_parameter8        => p_parameter8,
447                      p_parameter9        => p_parameter9,
448                      p_parameter10       => p_parameter10,
449                      p_new_refresh_num   => l_new_refresh_num,
450                      p_request_id        => l_conc_request_id);
451 
452        /* Custom Steam Collection Post Process
453           After data has been collected from source to staging
454        */
455           cs_collect_post_process(l_cs_rec,
456                                   l_cs_name,
457                                   p_instance_id);
458 
459       ELSIF (l_single_step_collection = 'Y' and p_validate_data = 'N') THEN
460           /* Invalid Option. Raise Error */
461           retcode := 2;
462           errbuf := 'Invalid option - Single Step Collection must perform Validation';
463           return;
464       ELSIF (l_single_step_collection = 'N' and p_validate_data = 'N') THEN
465           /* Collect into staging without Validation */
466           l_target := 'MSD_ST_CS_DATA';
467           l_process_type := C_SOURCE_TO_STAGE;
468           l_source := l_cs_rec.source_view_name;
469           l_default_where := Build_Designator_Where_Clause( l_cs_rec,
470                                                             l_process_type,
471                                                             p_cs_name);
472           Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
473                          l_cs_name, p_comp_refresh, p_instance_id,
474                          l_new_refresh_num);
475           Process_2(
476                      p_cs_rec            => l_cs_rec,
477                      p_cs_name           => l_cs_name,
478                      p_db_link           => l_dblink,
479                      p_source_view       => l_source,
480                      p_target_table      => l_target,
481                      p_process_type      => l_process_type,
482                      p_default_where     => l_default_where,
483                      p_tokenized_where   => l_cs_rec.collect_addtl_where_clause,
484                      p_comp_refresh      => p_comp_refresh,
485                      p_instance_id       => p_instance_id,
486                      p_parameter1        => p_parameter1,
487                      p_parameter2        => p_parameter2,
488                      p_parameter3        => p_parameter3,
489                      p_parameter4        => p_parameter4,
490                      p_parameter5        => p_parameter5,
491                      p_parameter6        => p_parameter6,
492                      p_parameter7        => p_parameter7,
493                      p_parameter8        => p_parameter8,
494                      p_parameter9        => p_parameter9,
495                      p_parameter10       => p_parameter10,
496                      p_request_id        => l_conc_request_id);
497 
498        /* Custom Steam Collection Post Process
499           After data has been collected from source to staging
500        */
501           cs_collect_post_process(l_cs_Rec,
502                                   l_cs_name,
503                                   p_instance_id);
504 
505 
506       END IF;  /* End of ELSE IF */
507 
508   /*---------------------  For Pull ----------------------------*/
509   ELSIF (p_collection_type = C_PULL) THEN
510      IF p_validate_data = 'Y' THEN
511         /*
512          Set Source and Target
513          Set Global var for  processing error record and marking processed
514         */
515         l_target := 'MSD_CS_DATA';
516         l_source := 'MSD_ST_CS_DATA';
517         l_process_type := C_STAGE_TO_FACT;
518         l_default_where := Build_Designator_Where_Clause(
519             l_cs_rec        ,
520             l_process_type  ,
521             p_cs_name       );
522 
523         /* Get a new seq number for PULL part  */
524         SELECT msd.msd_last_refresh_number_s.nextval into
525                l_new_refresh_Num from dual;
526 
527         /* Refresh Target */
528         Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
529                        l_cs_name,p_comp_refresh, p_instance_id,
530                        l_new_refresh_num);
531 
532         Process_1(
533             p_cs_rec            => l_cs_rec,
534             p_cs_name           => l_cs_name,
535             p_db_link           => l_dblink,
536             p_source_view       => l_source,
537             p_process_type      => C_STAGE_TO_FACT,
538             p_target_table      => l_target,
539             p_default_where     => l_default_where,
540             p_tokenized_where   => NULL,
541             p_comp_refresh      => p_comp_refresh,
542             p_instance_id       => p_instance_id,
543             p_parameter1        => p_parameter1,
544             p_parameter2        => p_parameter2,
545             p_parameter3        => p_parameter3,
546             p_parameter4        => p_parameter4,
547             p_parameter5        => p_parameter5,
548             p_parameter6        => p_parameter6,
549             p_parameter7        => p_parameter7,
550             p_parameter8        => p_parameter8,
551             p_parameter9        => p_parameter9,
552             p_parameter10       => p_parameter10,
553             p_new_refresh_num   => l_new_refresh_num,
554             p_request_id        => l_conc_request_id);
555 
556      ELSE
557          /* Invalid Option. Raise Error;*/
558          retcode := 2;
559          errbuf := 'Invalid option - Pull must perform Validation';
560          return;
561      END IF;  /* End of  p_validate_data = 'Y'  */
562   END IF;  /* End of Collect or Pull */
563 
564 
565   IF (l_target = 'MSD_CS_DATA') THEN
566      /* Delete cs fact rows that are not used by any demand plans */
567      MSD_TRANSLATE_FACT_DATA.clean_fact_data( errbuf,
568                                               retcode,
569                                               l_target);
570   END IF;
571 
572 
573   retcode := g_retcode;
574   errbuf  := g_errbuf;
575 
576   commit;
577 
578 Exception
579     When others then
580         retcode := 2;
581         errbuf := substr( sqlerrm, 1, 255);
582         rollback;
583 End;
584 
585 Procedure Process_1 (
586     p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
587     p_cs_name           in  varchar2,
588     p_db_link           in  varchar2,
589     p_source_view       in  varchar2,
590     p_target_table      in  varchar2,
591     p_process_type      in  number,
592     p_default_where     in  varchar2,
593     p_tokenized_where   in  varchar2,
594     p_comp_refresh      in  varchar2,
595     p_instance_id       in  number,
596     p_parameter1        in  varchar2,
597     p_parameter2        in  varchar2,
598     p_parameter3        in  varchar2,
599     p_parameter4        in  varchar2,
600     p_parameter5        in  varchar2,
601     p_parameter6        in  varchar2,
602     p_parameter7        in  varchar2,
603     p_parameter8        in  varchar2,
604     p_parameter9        in  varchar2,
605     p_parameter10       in  varchar2,
606     p_new_refresh_num   IN  NUMBER,
607     p_request_id        in  number) is
608 
609     TYPE cur_type is REF CURSOR;
610 
611     l_cur       cur_type;
612     l_rec       msd_cs_dfn_utl.G_TYP_SOURCE_STREAM;
613     l_valid     boolean;
614     l_err_msg   varchar2(1000);
615     l_sql_stmt  varchar2(5000);
616     l_where     varchar2(3000);
617 
618 Begin
619 debug_line('In Process_1');
620     /*
621      This process does following
622      1. Fetches data using cursor (source/staging)
623      2. Validates data
624      3. If Error
625           3.1 Mark/Save Erroneous data in staging
626         else
627           3.100 Save in Target (Fact/Staging)
628           3.101 Mark record Processed
629         end if
630     */
631 
632     l_sql_stmt := Build_SQL_FOR_COLLECT_AND_VAL
633         (p_cs_rec.cs_definition_id, p_process_type,
634          p_source_view, p_db_link, p_cs_name);
635 
636     l_sql_stmt :=  l_sql_stmt || ' WHERE 1 = 1';
637 
638     l_where := build_where_clause(
639                                    p_tokenized_where   ,
640                                    p_default_where     ,
641                                    p_parameter1        ,
642                                    p_parameter2        ,
643                                    p_parameter3        ,
644                                    p_parameter4        ,
645                                    p_parameter5        ,
646                                    p_parameter6        ,
647                                    p_parameter7        ,
648                                    p_parameter8        ,
649                                    p_parameter9        ,
650                                    p_parameter10       ,
651                                    p_request_id      );
652 
653     if l_where is not null then
654         l_sql_stmt := l_sql_stmt || ' AND ' || l_where;
655     end if;
656 
657     /* DWK.  Do not include instace = 0 into fact table when we PULL data */
658     IF (p_process_type =  C_STAGE_TO_FACT) THEN
659 	l_sql_stmt := l_sql_stmt ||
660 		' AND ' || 'attribute_1 <> 0';
661     END IF;
662 
663 debug_line('length for l_sql_stmt :' || length(l_sql_stmt));
664 debug_line('length for l_where :' || length(l_where));
665 debug_line('before debug line');
666 debug_line(l_sql_stmt);
667 debug_line('after debug line');
668 
669     /* Use Dynamic SQL to fetch and process rows */
670     Process_1_Sub (
671                      p_cs_rec        ,
672                      p_cs_name       ,
673                      p_source_view   ,
674                      p_target_table  ,
675                      p_instance_id   ,
676                      l_sql_stmt,
677                      p_new_refresh_num);
678 
679     /* Delete Successfully processed Staging rows if the process was Staging to Fact */
680     /* DWK  Don't delete any row with instance = 0 */
681     /* Also, removed cs_name = p_cs_name condition from WHERE clause */
682 
683     IF p_process_type = C_STAGE_TO_FACT THEN
684         delete from MSD_ST_CS_DATA
685         where
686             cs_definition_id = p_cs_rec.cs_definition_id and
687      	    process_Status = C_LOG_PROCESSED and
688 	    attribute_1 <> '0';
689     END IF;
690 
691 Exception
692 When others then
693     show_line(sqlerrm);
694     raise;
695 
696 End;
697 
698 
699 Procedure cs_collect_post_process (
700         p_cs_rec        in msd_cs_definitions_v1%rowtype,
701         p_cs_name       in varchar2,
702         p_instance_id   in varchar2 ) is
703 
704     cursor c1 is
705         select 'Y'
706         from msd_st_cs_data
707         where cs_definition_id = p_cs_rec.cs_definition_id
708         and cs_name = p_cs_name
709         and attribute_1 = p_instance_id
710         and attribute_49 = '1'
711         and rownum < 2;
712 
713         l_exists varchar2(10):='N';
714 
715 Begin
716 
717     /* Is this Sales Forecast Stream */
718     if p_cs_rec.name in (
719         'MSD_SALES_FCST_BESTCASE', 'MSD_SALES_FCST_PIPELINE',
720         'MSD_SALES_FCST_REALISTIC', 'MSD_SALES_FCST_WGTPLINE',
721         'MSD_SALES_FCST_WORSTCASE' ) then
722 
723         open c1;
724         fetch c1 into l_exists;
725         close c1;
726 
727         If l_exists = 'Y' then
728            delete from msd_st_cs_data
729            where cs_definition_id = p_cs_Rec.cs_definition_id
730            and cs_name = p_cs_name
731            and attribute_1 = p_instance_id
732            and attribute_49 = '2';
733 
734         end if;
735 
736     end if;
737 
738     /* Collect Current On-Hand Inventory data from ODS table for SOP data stream */
739 
740     if p_cs_rec.name = 'MSD_ONHAND_INVENTORY' then
741 
742         insert into msd_st_cs_data (
743            CS_ST_DATA_ID,
744            CS_DEFINITION_ID,
745            CS_NAME,
746            ATTRIBUTE_1,
747            ATTRIBUTE_2,
748            ATTRIBUTE_3,
749            ATTRIBUTE_6,
750            ATTRIBUTE_7,
751            ATTRIBUTE_10,
752            ATTRIBUTE_11,
753            ATTRIBUTE_34,
754            ATTRIBUTE_41,
755            ATTRIBUTE_43,
756            ATTRIBUTE_50,
757            ATTRIBUTE_51,
758            CREATION_DATE,
759            CREATED_BY,
760            LAST_UPDATE_DATE,
761            LAST_UPDATED_BY,
762            LAST_UPDATE_LOGIN
763            )
764          select msd_st_cs_data_s.nextval,
765                 to_char(p_cs_rec.cs_definition_id),
766                 'SINGLE_STREAM',
767                 to_char(inv.sr_instance_id),
768                 inv.prd_level_id,
769                 inv.prd_sr_level_pk,
770                 inv.geo_level_id,
771                 inv.geo_sr_level_pk,
772                 inv.org_level_id,
773                 inv.org_sr_level_pk,
774                 inv.time_level_id,
775                 to_char(inv.quantity),
776                 to_char(sysdate, 'YYYY/MM/DD'),
777                 inv.dcs_level_id,
778                 inv.dcs_sr_level_pk,
779                 to_char(sysdate),
780                 to_char(fnd_global.user_id),
781                 to_char(sysdate),
782                 to_char(fnd_global.user_id),
783                 to_char(fnd_global.login_id)
784          from msd_curr_onhand_inventory_v inv
785          where inv.sr_instance_id = p_instance_id;
786 
787     end if;
788 
789 Exception
790 When others then
791     show_line(sqlerrm);
792     raise;
793 End;
794 
795 
796 Procedure log_error (
797     crec_data       in msd_cs_dfn_utl.g_typ_source_stream,
798     p_cs_rec        in msd_cs_definitions_v1%rowtype,
799     p_cs_name       in varchar2,
800     p_instance_id   in varchar2,
801     p_error_message in varchar2,
802     p_source_view   in varchar2,
803     p_target_table  in varchar2) is
804 Begin
805     /*
806      Error Logging depends on source and target.
807     */
808     debug_line('In Log Error');
809     if (p_target_table = 'MSD_CS_DATA' and p_source_view <> 'MSD_ST_CS_DATA') or
810        (p_target_table = 'MSD_ST_CS_DATA') then
811         /*
812          if data is collected directly from source to Fact table or
813          data is collected into staging table then
814          insert erroneous row in staging table with Status "Error"
815         */
816         ins_row_staging(crec_data, p_cs_rec, p_cs_name,
817                        nvl(p_instance_id, crec_data.instance),
818                        C_LOG_ERROR,
819                        p_error_message);
820     else
821         /* i.e. Data is Pulled from Staging to Fact. Then update the staging row
822            with Status 'invalid'
823         */
824         upd_stage_error(crec_data.pk_id, C_LOG_ERROR, p_error_message);
825     end if;
826 
827 Exception
828 When others then
829     show_line(sqlerrm);
830     raise;
831 
832 End;
833 
834 Procedure upd_stage_error (p_pk_id in number, p_process_status in varchar2, p_error_mesg in varchar2) is
835 Begin
836     debug_line('In upd_stage_error');
837     update msd_st_cs_data
838     set
839         error_desc = p_error_mesg,
840         process_status = p_process_status
841     where cs_st_data_id = p_pk_id;
842 
843 Exception
844 When others then
845     show_line(sqlerrm);
846     raise;
847 
848 End;
849 
850 Procedure log_processed (
851     crec_data       in msd_cs_dfn_utl.g_typ_source_stream,
852     p_cs_rec        in msd_cs_definitions_v1%rowtype,
853     p_cs_name       in varchar2,
854     p_instance_id   in varchar2,
855     p_source_view   in varchar2,
856     p_target_table  in varchar2) is
857 Begin
858     debug_line('In log_processed');
859     /* Process Logging depends on source and target.
860     */
861     if (p_target_table = 'MSD_CS_DATA' and p_source_view <> 'MSD_ST_CS_DATA') or
862        (p_target_table = 'MSD_ST_CS_DATA') then
863         /*
864          if data is collected directly from source to Fact table or
865          data is collected into staging table then
866          Processing can not be logged or is not yet done
867         */
868         null;
869     else
870         /* i.e. Data is Pulled from Staging to Fact. Then update the staging row
871          with Status PROCESSED
872        */
873         upd_stage_error(crec_data.pk_id, C_LOG_PROCESSED, null);
874     end if;
875 
876     if p_target_table = 'MSD_CS_DATA' then
877 
878         null;
879     end if;
880 
881 Exception
882 When others then
883     show_line(sqlerrm);
884     raise;
885 
886 End;
887 
888 Procedure ins_row_staging (
889     crec_data in  msd_cs_dfn_utl.g_typ_source_stream,
890     p_cs_rec        in msd_cs_definitions_v1%rowtype,
891     p_cs_name       in varchar2,
892     p_instance_id   in varchar2,
893     p_process_status in varchar2,
894     p_error_message in varchar2) is
895 Begin
896 --    debug_line('In ins_row_staging');
897     insert into msd_st_cs_data
898         (cs_st_data_id, cs_definition_id, cs_name,
899          attribute_1, attribute_2, attribute_3, attribute_4,
900          attribute_5, attribute_6, attribute_7, attribute_8, attribute_9,
901          attribute_10, attribute_11, attribute_12, attribute_13,
902          attribute_14, attribute_15, attribute_16, attribute_17,
903          attribute_18, attribute_19, attribute_20, attribute_21, attribute_22,
904          attribute_23, attribute_24, attribute_25, attribute_26, attribute_27,
905          attribute_28, attribute_29, attribute_30, attribute_31,
906          attribute_32, attribute_33, attribute_34, attribute_35, attribute_36,
907          attribute_37, attribute_38, attribute_39, attribute_40,
908          attribute_41, attribute_42, attribute_43, attribute_44, attribute_45,
909          attribute_46, attribute_47, attribute_48, attribute_49,
910          attribute_50, attribute_51, attribute_52, attribute_53, attribute_54,
911 	 attribute_55, attribute_56, attribute_57, attribute_58, attribute_59,
912 	 attribute_60,
913          process_status, error_desc,
914          created_by, creation_date, last_update_date, last_updated_by, last_update_login
915          )
916     values
917     /* Fix for designator name crec_data.designator instead of p_cs_name */
918         (msd_st_cs_data_s.nextval, p_cs_rec.cs_definition_id, crec_data.designator,
919          p_instance_id,
920          crec_data.prd_level_id, crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk,
921          crec_data.geo_level_id, crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk,
922          crec_data.org_level_id, crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk,
923          crec_data.prd_parent_level_id,    crec_data.prd_parent_sr_level_value_pk,
924          crec_data.prd_parent_level_value, crec_data.prd_parent_level_value_pk,
925          crec_data.rep_level_id, crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk,
926          crec_data.chn_level_id, crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk,
927          crec_data.ud1_level_id, crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk,
928          crec_data.ud2_level_id, crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk,
929          crec_data.tim_level_id, crec_data.attribute_35, crec_data.attribute_36, crec_data.attribute_37,
930          crec_data.attribute_38, crec_data.attribute_39, crec_data.attribute_40, crec_data.attribute_41,
931          crec_data.attribute_42, crec_data.attribute_43, crec_data.attribute_44, crec_data.attribute_45,
932          crec_data.attribute_46, crec_data.attribute_47, crec_data.attribute_48, crec_data.attribute_49,
933          crec_data.dcs_level_id, crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk,
934 	 crec_data.attribute_54, crec_data.attribute_55, crec_data.attribute_56, crec_data.attribute_57,
935 	 crec_data.attribute_58, crec_data.attribute_59, crec_data.attribute_60,
936          p_process_status, p_error_message,
937          fnd_global.user_id, sysdate, sysdate, fnd_global.user_id, fnd_global.login_id);
938 
939 Exception
940 When others then
941     show_line(sqlerrm);
942     raise;
943 
944 End;
945 
946 Procedure ins_row_fact(
947                        crec_data                in msd_cs_dfn_utl.g_typ_source_stream,
948                        p_cs_rec                 in msd_cs_definitions_v1%rowtype,
949                        p_cs_name                in varchar2,
950                        p_instance_id            in varchar2,
951                        p_new_refresh_num        in NUMBER) is
952 Begin
953     debug_line('In ins_row_fact');
954     insert into msd_cs_data
955         (cs_data_id, cs_definition_id, cs_name,
956          attribute_1, attribute_2, attribute_3, attribute_4,
957          attribute_5, attribute_6, attribute_7, attribute_8, attribute_9,
958          attribute_10, attribute_11, attribute_12, attribute_13,
959          attribute_14, attribute_15, attribute_16, attribute_17,
960          attribute_18, attribute_19, attribute_20, attribute_21, attribute_22,
961          attribute_23, attribute_24, attribute_25, attribute_26, attribute_27,
962          attribute_28, attribute_29, attribute_30, attribute_31,
963          attribute_32, attribute_33, attribute_34, attribute_35, attribute_36,
964          attribute_37, attribute_38, attribute_39, attribute_40,
965          attribute_41, attribute_42, attribute_43, attribute_44, attribute_45,
966          attribute_46, attribute_47, attribute_48, attribute_49,
967          attribute_50, attribute_51, attribute_52, attribute_53, attribute_54,
968 	 attribute_55, attribute_56, attribute_57, attribute_58, attribute_59,
969 	 attribute_60,
970          created_by, creation_date, last_update_date, last_updated_by,last_update_login,
971          created_by_refresh_num, last_refresh_num, action_code)
972     values
973         /* Fix for designator name crec_data.designator instead of p_cs_name */
974         (msd_cs_data_s.nextval, p_cs_rec.cs_definition_id, crec_data.designator ,
975          p_instance_id,
976          crec_data.prd_level_id, crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk,
977          crec_data.geo_level_id, crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk,
978          crec_data.org_level_id, crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk,
979          crec_data.prd_parent_level_id, crec_data.prd_parent_sr_level_value_pk,
980          crec_data.prd_parent_level_value, crec_data.prd_parent_level_value_pk,
981          crec_data.rep_level_id, crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk,
982          crec_data.chn_level_id, crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk,
983          crec_data.ud1_level_id, crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk,
984          crec_data.ud2_level_id, crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk,
985          crec_data.tim_level_id, crec_data.attribute_35, crec_data.attribute_36, crec_data.attribute_37,
986          crec_data.attribute_38, crec_data.attribute_39, crec_data.attribute_40, crec_data.attribute_41,
987          crec_data.attribute_42, crec_data.attribute_43, crec_data.attribute_44, crec_data.attribute_45,
988          crec_data.attribute_46, crec_data.attribute_47, crec_data.attribute_48, crec_data.attribute_49,
989          crec_data.dcs_level_id, crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk,
990 	 crec_data.attribute_54, crec_data.attribute_55, crec_data.attribute_56, crec_data.attribute_57,
991 	 crec_data.attribute_58, crec_data.attribute_59, crec_data.attribute_60,
992 	 fnd_global.user_id, sysdate, sysdate, fnd_global.user_id,fnd_global.login_id,
993          p_new_refresh_num, p_new_refresh_num, 'I');
994 
995 Exception
996 When others then
997     show_line(sqlerrm);
998     raise;
999 
1000 End;
1001 
1002 Procedure Process_2 (
1003     p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
1004     p_cs_name           in  varchar2,
1005     p_db_link           in  varchar2,
1006     p_source_view       in  varchar2,
1007     p_target_table      in  varchar2,
1008     p_process_type      in  number,
1009     p_default_where     in  varchar2,
1010     p_tokenized_where   in  varchar2,
1011     p_comp_refresh      in  varchar2,
1012     p_instance_id       in  number,
1013     p_parameter1        in  varchar2,
1014     p_parameter2        in  varchar2,
1015     p_parameter3        in  varchar2,
1016     p_parameter4        in  varchar2,
1017     p_parameter5        in  varchar2,
1018     p_parameter6        in  varchar2,
1019     p_parameter7        in  varchar2,
1020     p_parameter8        in  varchar2,
1021     p_parameter9        in  varchar2,
1022     p_parameter10       in  varchar2,
1023     p_request_id          in  number) is
1024 
1025     l_ins_stmt  varchar2(32767);
1026     l_where     varchar2(500);
1027 
1028 
1029 Begin
1030     debug_line('In Process_2');
1031     /*
1032      This Procedure inserts data in staging table from source view
1033      without performing any validation.
1034     */
1035 
1036 
1037     l_ins_stmt := Build_SQL_INS_AS_SELECT(
1038         p_cs_definition_id => p_cs_rec.cs_definition_id,
1039         p_instance_id      => p_instance_id,
1040         p_cs_name          => p_cs_name,
1041         p_source_view      => p_source_view,
1042         p_db_link          => p_db_link);
1043 
1044 
1045     l_where := build_where_clause(
1046         p_tokenized_where   ,
1047         p_default_where     ,
1048         p_parameter1        ,
1049         p_parameter2        ,
1050         p_parameter3        ,
1051         p_parameter4        ,
1052         p_parameter5        ,
1053         p_parameter6        ,
1054         p_parameter7        ,
1055         p_parameter8        ,
1056         p_parameter9        ,
1057         p_parameter10       ,
1058         p_request_id );
1059 
1060 
1061     if l_where is not null then
1062         l_ins_stmt := l_ins_stmt || ' where ' || l_where;
1063     end if;
1064 
1065 
1066 
1067     /* Execute SQL */
1068     debug_line(l_ins_stmt);
1069 
1070     Execute immediate l_ins_stmt;
1071 
1072 Exception
1073 When others then
1074     show_line(sqlerrm);
1075     raise;
1076 
1077 End;
1078 
1079 Function Build_SQL_FOR_COLLECT_AND_VAL(
1080                                         p_cs_definition_id in number,
1081                                         p_process_type     in number,
1082                                         p_source_view      in varchar2,
1083                                         p_db_link          in varchar2,
1084                                         p_cs_name          in varchar2)
1085                                         return varchar2 is
1086 
1087     l_sql_stmt  varchar2(32767);
1088 Begin
1089     debug_line('In Build_SQL_FOR_COLLECT_AND_VAL');
1090     /*
1091      This method will be used in the following cases
1092       Source  ------ Staging (Process - Collect (Single Step = 'N'). Validation - 'Y')
1093       Source  ------ Fact    (Process - Collect (Single Step = 'Y'). Validation - 'Y')
1094       Staging ------ Fact    (Process - Pull    (Single Step = N/A). Validation - 'Y')
1095     */
1096     l_sql_stmt := Build_SQL_Source(p_cs_definition_id, p_process_type, NULL, p_cs_name);
1097     /*
1098      Append data specific to Single Step needs
1099     */
1100     if p_source_view = 'MSD_ST_CS_DATA' then
1101         l_sql_stmt := 'Select  cs_st_data_id PK_ID, ' || l_sql_stmt || ' from ' || p_source_view ;
1102     else
1103         l_sql_stmt := 'Select null pk_id, ' || l_sql_stmt || ' from ' || p_source_view || p_db_link;
1104     end if;
1105 
1106     return l_sql_stmt;
1107 
1108 
1109 Exception
1110 When others then
1111     show_line(sqlerrm);
1112     raise;
1113 
1114 End;
1115 
1116 Function Build_SQL_INS_AS_SELECT(
1117     p_cs_definition_id in number,
1118     p_instance_id      in varchar2,
1119     p_cs_name          in varchar2,
1120     p_source_view      in varchar2,
1121     p_db_link          in varchar2) return varchar2 is
1122 
1123     l_sql_stmt  varchar2(32767);
1124 Begin
1125     debug_line('In Build_SQL_INS_AS_SELECT');
1126     /*
1127      This method will be used in the following cases
1128       Source  ------ Staging (Process - Collect (Single Step = 'N'). Validation - 'N')
1129     */
1130     l_sql_stmt := Build_SQL_Source(p_cs_definition_id, C_SOURCE_TO_STAGE,
1131                                    p_instance_id, p_cs_name);
1132 
1133 /* DWK Move cs_name from top to at the bottom of insert statement since
1134    l_sql_stmt will have forecast_designator inside. */
1135     l_sql_stmt := 'Insert into MSD_ST_CS_DATA (cs_st_data_id , cs_definition_id, ' ||
1136         'attribute_1, attribute_2, attribute_3, attribute_4, attribute_5, '      ||
1137         'attribute_6, attribute_7, attribute_8, attribute_9, attribute_10,'      ||
1138         'attribute_11, attribute_12, attribute_13, attribute_14, attribute_15, ' ||
1139         'attribute_16, attribute_17, attribute_18, attribute_19, attribute_20, ' ||
1140         'attribute_21, attribute_22, attribute_23, attribute_24, attribute_25, ' ||
1141         'attribute_26, attribute_27, attribute_28, attribute_29, attribute_30,'  ||
1142         'attribute_31, attribute_32, attribute_33, attribute_34, attribute_35, ' ||
1143         'attribute_36, attribute_37, attribute_38, attribute_39, attribute_40,'  ||
1144         'attribute_41, attribute_42, attribute_43, attribute_44, attribute_45, ' ||
1145         'attribute_46, attribute_47, attribute_48, attribute_49, attribute_50, ' ||
1146         'attribute_51, attribute_52, attribute_53, attribute_54, attribute_55, ' ||
1147         'attribute_56, attribute_57, attribute_58, attribute_59, attribute_60,'  ||
1148         'cs_name ) ' || ' select ' || 'msd_st_cs_Data_s.nextval, ' || p_cs_definition_id ||
1149         ', ' ||  l_sql_stmt || ' from ' || p_source_view || p_db_link;
1150 
1151     return l_sql_stmt;
1152 
1153 
1154 Exception
1155 When others then
1156     show_line(sqlerrm);
1157     raise;
1158 
1159 End;
1160 
1161 Function Build_SQL_Source(
1162     p_cs_definition_id in number,
1163     p_process_type     in number,
1164     p_instance_id      in varchar2,
1165     p_cs_name          in varchar2) return varchar2 is
1166 
1167     Type l_type_sql_struct is RECORD (
1168         tabcol_name    varchar2(60),
1169         srccol_name    varchar2(60));
1170     Type l_type_sql_struct_array is TABLE of l_type_sql_struct;
1171 
1172     CURSOR c1 IS
1173     select * from msd_cs_defn_column_dtls_v
1174     where cs_definition_id = p_cs_definition_id;
1175 
1176     CURSOR c_multi_stream IS
1177     SELECT multiple_stream_flag FROM msd_cs_definitions
1178     WHERE cs_definition_id = p_cs_definition_id;
1179 
1180     CURSOR c_cs_name IS
1181     select source_view_column_name
1182     from msd_cs_defn_column_dtls_v
1183     where cs_definition_id = p_cs_definition_id and
1184           table_column = 'CS_NAME';
1185 
1186     l_struct l_type_sql_struct_array;
1187     l_sql_stmt       varchar2(32767);
1188     l_multi_stream   VARCHAR2(30);
1189 
1190 /*
1191     Function conv_to_sql_struct (a in varchar2, b in varchar2)
1192                                 return l_type_sql_struct is
1193         x l_type_sql_struct;
1194     Begin
1195         x.tabcol_name := a;
1196         x.srccol_name := b;
1197         return x;
1198     End;
1199 */
1200 
1201 Begin
1202     debug_line('In Build_SQL_Source');
1203     /* p_source_or_stage  = 0 menas build select for source view,
1204        p_source_or_stage  = non 0 menas build select for staging  */
1205     /* Initialize array */
1206 
1207     l_struct := l_type_sql_struct_array(null);
1208 
1209     /* Build Array with default values - Table_column_name is 'cs_name, attribute_1' ...
1210      and source_column_name is 'NULL' */
1211 
1212     for i in 1..60 loop
1213        l_struct.extend;
1214        l_struct(i).tabcol_name := 'ATTRIBUTE_' || i;
1215        l_struct(i).srccol_name := 'NULL';
1216     end loop;
1217     l_struct.extend;
1218     l_struct(61).tabcol_name := 'CS_NAME';
1219 
1220     /* Fetch source column name from the mappings table and update the array */
1221     for c1_rec in c1 loop
1222 
1223         for i IN 1..61 loop
1224             if l_struct(i).tabcol_name = c1_rec.table_column then
1225                 if c1_rec.identifier_type = 'INSTANCE' then
1226                     if p_instance_id is not null then
1227                         l_struct(i).srccol_name := '''' || p_instance_id || '''';
1228                     end if;
1229                 elsif c1_rec.identifier_type = 'DATE' then
1230                     if c1_rec.source_view_column_name  is not null then
1231                         l_struct(i).srccol_name := 'to_char(' ||c1_rec.source_view_column_name || ', ''YYYY/MM/DD'')';
1232                     end if;
1233                 else
1234                     if c1_rec.source_view_column_name  is not null then
1235                         l_struct(i).srccol_name := c1_rec.source_view_column_name ;
1236                     end if;
1237                 end if;
1238                 exit;
1239             end if;
1240         end loop;
1241     end loop;
1242 
1243     /* DWK If this stream is multiple stream and there is no column mapping
1244        for CS_NAME then, assume user will populate the CS_NAME
1245        from Collection */
1246 
1247     OPEN  c_multi_stream;
1248     FETCH c_multi_stream INTO l_multi_stream;
1249     CLOSE c_multi_stream;
1250 
1251     IF nvl(l_multi_stream, 'N') = 'Y' THEN
1252        /* After column mapping, if source column for CS_NAME is still null
1253           then  assume user will populate the CS_NAME  from Collection */
1254 
1255        IF ( l_struct(61).srccol_name IS NULL ) THEN
1256           l_struct(61).srccol_name := '''' || replace(p_cs_name, '''', '''''') || '''';
1257        END IF;
1258 
1259     ELSE   /* Single stream */
1260        l_struct(61).srccol_name := '''' || C_DEFAULT_STREAM_NAME || '''' ;
1261     END IF;
1262 
1263     /* Builds SQL stmt */
1264     for i in 1..61 loop
1265 
1266         if p_process_type in (C_SOURCE_TO_FACT, C_SOURCE_TO_STAGE) then
1267             if l_sql_stmt is null then
1268                 l_sql_stmt := l_sql_stmt || l_struct(i).srccol_name;
1269             else
1270                 l_sql_stmt := l_sql_stmt || ', ' || l_struct(i).srccol_name;
1271             end if;
1272         else
1273             /* staging to fact (assumption always for Validate = 'Yes'
1274                append staging table column name + column alias ("Source view name")
1275             */
1276             if l_sql_stmt is null then
1277                 l_sql_stmt := l_sql_stmt || l_struct(i).tabcol_name;
1278             else
1279                 l_sql_stmt := l_sql_stmt || ', ' || l_struct(i).tabcol_name ;
1280             end if;
1281         end if;
1282 
1283     end loop;
1284 
1285     debug_line('l_sql_stmt : ' || l_sql_stmt);
1286     return l_sql_stmt;
1287 
1288 Exception
1289 When others then
1290     show_line(sqlerrm);
1291     raise;
1292 
1293 End;
1294 
1295 Procedure show_line(p_sql in    varchar2) is
1296     i   number:=1;
1297 Begin
1298     while i <= length(p_sql)
1299     loop
1300  --     dbms_output.put_line (substr(p_sql, i, 255));
1301         fnd_file.put_line(fnd_file.log,substr(p_sql, i, 255));
1302 	null;
1303         i := i+255;
1304     end loop;
1305 End;
1306 
1307 Function validate_record (
1308     crec_data       in out NOCOPY  msd_cs_dfn_utl.g_typ_source_stream,
1309     p_cs_rec        in out NOCOPY  msd_Cs_definitions_v1%rowtype,
1310     p_instance_id   in     varchar2,
1311     p_err_mesg      out    NOCOPY  varchar2) return boolean is
1312 
1313     l_comments1         varchar2(1000);
1314     l_comments2         varchar2(1000);
1315     l_first_record      boolean:=TRUE;
1316     l_dummy_date        date;
1317     l_dummy_number      number;
1318 
1319     l_prd_found        varchar2(30);
1320     l_prd_parent_found varchar2(30);
1321     l_geo_found        varchar2(30);
1322     l_org_found        varchar2(30);
1323     l_chn_found        varchar2(30);
1324     l_rep_found        varchar2(30);
1325     l_ud1_found        varchar2(30);
1326     l_ud2_found        varchar2(30);
1327     l_tim_found        varchar2(30);
1328     l_dcs_found        varchar2(30);
1329 
1330     l_count            number(2) := 0;
1331 
1332 Begin
1333 
1334    /*  crec_data  -> actual record that you want to validate(ex, rows in staging table)
1335        p_cs_rec   -> information in custom stream definition
1336    */
1337 
1338 --    debug_line('In validate_record');
1339     /* Get Product LEVEL_PK */
1340     if nvl(p_cs_rec.prd_level_collect_flag, 'N') = 'Y' then
1341         l_prd_found := get_level_pk(p_instance_id, crec_data.prd_level_id,
1342                        crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk);
1343 
1344         IF ( crec_data.prd_parent_sr_level_value_pk IS NOT NULL or
1345              crec_data.prd_parent_level_value IS NOT NULL or
1346              crec_data.prd_parent_level_value_pk IS NOT NULL ) THEN
1347             /* DWK Get Product Dimension's Parent LEVEL_PK */
1348             l_prd_parent_found := get_level_pk(p_instance_id, crec_data.prd_parent_level_id,
1349                                       crec_data.prd_parent_sr_level_value_pk,
1350                                       crec_data.prd_parent_level_value,
1351                                       crec_data.prd_parent_level_value_pk);
1352 
1353         ELSE  /* IF there is no parent item then make it null */
1354             crec_data.prd_parent_level_id := NULL;
1355         END IF;
1356     end if;
1357     /*  Get ORG LEVEL_PK */
1358     if nvl(p_cs_rec.org_level_collect_flag, 'N') = 'Y' then
1359         l_org_found := get_level_pk(p_instance_id, crec_data.org_level_id,
1360                        crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk);
1361     end if;
1362     /* Get Geo LEVEL_PK */
1363     if nvl(p_cs_rec.geo_level_collect_flag, 'N') = 'Y' then
1364         l_geo_found := get_level_pk(p_instance_id, crec_data.geo_level_id,
1365                        crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk);
1366     end if;
1367 
1368     /* Get CHN LEVEL_PK */
1369     if nvl(p_cs_rec.chn_level_collect_flag, 'N') = 'Y' then
1370         l_chn_found := get_level_pk(p_instance_id, crec_data.chn_level_id,
1371                        crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk);
1372     end if;
1373     /* Get REP LEVEL_PK */
1374     if nvl(p_cs_rec.rep_level_collect_flag, 'N') = 'Y' then
1375         l_rep_found := get_level_pk(p_instance_id, crec_data.rep_level_id,
1376                        crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk);
1377     end if;
1378     /* Get UD1 LEVEL_PK */
1379     if nvl(p_cs_rec.ud1_level_collect_flag, 'N') = 'Y' then
1380         l_ud1_found := get_level_pk(p_instance_id, crec_data.ud1_level_id,
1381                        crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk);
1382     end if;
1383     /* Get UD2 LEVEL_PK */
1384     if nvl(p_cs_rec.ud2_level_collect_flag, 'N') = 'Y' then
1385         l_ud2_found := get_level_pk(p_instance_id, crec_data.ud2_level_id,
1386                        crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk);
1387     end if;
1388 
1389     /* Get Demand Class LEVEL_PK */
1390     if nvl(p_cs_rec.dcs_level_collect_flag, 'N') = 'Y' then
1391         l_dcs_found := get_level_pk(p_instance_id, crec_data.dcs_level_id,
1392                        crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk);
1393     end if;
1394 
1395     select
1396         decode(l_prd_found, g_level_pk_not_found, 'PRD ', null) ||
1397         /* DWK Check level pk of parent item for dependent demand data */
1398         decode(l_prd_parent_found, g_level_pk_not_found, 'PRD_PARENT ', null) ||
1399         decode(l_org_found, g_level_pk_not_found, 'ORG ', null) ||
1400         decode(l_geo_found, g_level_pk_not_found, 'GEO ', null) ||
1401         decode(l_chn_found, g_level_pk_not_found, 'CHN ', null) ||
1402         decode(l_rep_found, g_level_pk_not_found, 'REP ', null) ||
1403         decode(l_ud1_found, g_level_pk_not_found, 'UD1 ', null) ||
1404         decode(l_ud2_found, g_level_pk_not_found, 'UD2 ', null) ||
1405         decode(l_dcs_found, g_level_pk_not_found, 'DCS ', null)
1406     into
1407         l_comments2
1408     from
1409         dual;
1410 
1411     /* Level validation */
1412 
1413     if nvl(p_cs_rec.strict_flag, 'N') = 'Y' then
1414         /* if level_id is not defined at the definition level then the level_id
1415            of first record fetched will be used for validation
1416         */
1417         if l_first_record then
1418 
1419             l_first_record := FALSE;
1420 /* New */
1421             if p_cs_rec.prd_level_id is null and nvl(p_cs_rec.prd_level_collect_flag, 'N') = 'Y' then
1422                 p_cs_rec.prd_level_id := crec_data.prd_level_id;
1423             end if;
1424 
1425             if p_cs_rec.org_level_id is null and nvl(p_cs_rec.org_level_collect_flag, 'N') = 'Y' then
1426                 p_cs_rec.org_level_id := crec_data.org_level_id;
1427             end if;
1428 
1429             if p_cs_rec.geo_level_id is null and nvl(p_cs_rec.geo_level_collect_flag, 'N') = 'Y' then
1430                 p_cs_rec.geo_level_id := crec_data.geo_level_id;
1431             end if;
1432 
1433             if p_cs_rec.chn_level_id is null  and nvl(p_cs_rec.chn_level_collect_flag, 'N') = 'Y' then
1434                 p_cs_rec.chn_level_id := crec_data.chn_level_id;
1435             end if;
1436 
1437             if p_cs_rec.rep_level_id is null and nvl(p_cs_rec.rep_level_collect_flag, 'N') = 'Y' then
1438                 p_cs_rec.rep_level_id := crec_data.rep_level_id;
1439             end if;
1440 
1441             if p_cs_rec.ud1_level_id is null and nvl(p_cs_rec.ud1_level_collect_flag, 'N') = 'Y' then
1442                 p_cs_rec.ud1_level_id := crec_data.ud1_level_id;
1443             end if;
1444 
1445             if p_cs_rec.ud2_level_id is null and nvl(p_cs_rec.ud2_level_collect_flag, 'N') = 'Y' then
1446                 p_cs_rec.ud2_level_id := crec_data.ud2_level_id;
1447             end if;
1448 
1449             if p_cs_rec.tim_level_id is null and nvl(p_cs_rec.tim_level_collect_flag, 'N') = 'Y' then
1450                 /* Attribute_34 is tim_level_id */
1451                 p_cs_rec.tim_level_id := crec_data.tim_level_id;
1452             end if;
1453 
1454             if p_cs_rec.dcs_level_id is null and nvl(p_cs_rec.dcs_level_collect_flag, 'N') = 'Y' then
1455                 p_cs_rec.dcs_level_id := crec_data.dcs_level_id;
1456             end if;
1457 
1458         end if;
1459 
1460         Select
1461             decode(crec_data.prd_level_id,
1462                    null, decode(p_cs_rec.prd_level_collect_flag,
1463                                 'Y', 'PRD ',
1464                                  null),
1465                    p_cs_rec.prd_level_id, null,
1466                    'PRD ')  ||
1467             /* DWK  IF dependent demand data are collected, its parents level id should be 1 */
1468             decode(nvl(crec_data.prd_parent_level_id, '1'), '1', null, 'PRD_PARENT ') ||
1469             decode(crec_data.org_level_id,
1470                    null, decode(p_cs_rec.org_level_collect_flag,
1471                                 'Y', 'ORG ',
1472                                  null),
1473                    p_cs_rec.org_level_id, null,
1474                    'ORG ')  ||
1475             decode(crec_data.geo_level_id,
1476                    null, decode(p_cs_rec.geo_level_collect_flag,
1477                                 'Y', 'GEO ',
1478                                  null),
1479                    p_cs_rec.geo_level_id, null,
1480                    'GEO ')  ||
1481             decode(crec_data.rep_level_id,
1482                    null, decode(p_cs_rec.rep_level_collect_flag,
1483                                 'Y', 'REP ',
1484                                  null),
1485                    p_cs_rec.rep_level_id, null,
1486                    'REP ')  ||
1487             decode(crec_data.chn_level_id,
1488                    null, decode(p_cs_rec.chn_level_collect_flag,
1489                                 'Y', 'CHN ',
1490                                  null),
1491                    p_cs_rec.chn_level_id, null,
1492                    'CHN ')  ||
1493             decode(crec_data.ud1_level_id,
1494                    null, decode(p_cs_rec.ud1_level_collect_flag,
1495                                 'Y', 'UD1 ',
1496                                  null),
1497                    p_cs_rec.ud1_level_id, null,
1498                    'UD1 ')  ||
1499             decode(crec_data.ud2_level_id,
1500                    null, decode(p_cs_rec.ud2_level_collect_flag,
1501                                 'Y', 'UD2 ',
1502                                  null),
1503                    p_cs_rec.ud2_level_id, null,
1504                    'UD2 ')  ||
1505             decode(crec_data.tim_level_id,
1506                    null, decode(p_cs_rec.tim_level_collect_flag,
1507                                 'Y', 'TIM ',
1508                                  null),
1509                    p_cs_rec.tim_level_id, null,
1510                    'TIM ')  ||
1511             decode(crec_data.dcs_level_id,
1512                    null, decode(p_cs_rec.dcs_level_collect_flag,
1513                                 'Y', 'DCS ',
1514                                  null),
1515                    p_cs_rec.dcs_level_id, null,
1516                    'DCS ' )
1517         into
1518             l_comments1
1519         from  dual;
1520     ELSE  /* p_cs_rec.strict_flag = 'N' */
1521 
1522        /* Check whether that time level id exists in fnd lookup or not */
1523 
1524        IF ( nvl(p_cs_rec.tim_level_collect_flag, 'N') = 'Y') THEN
1525           select count(*) into l_count
1526           from fnd_lookup_values
1527           where lookup_type = 'MSD_PERIOD_TYPE' and
1528           nvl(crec_data.tim_level_id, '999.99') = lookup_code and
1529           rownum <= 1;
1530 
1531           IF ( l_count < 1 ) THEN
1532              select 'TIM' into l_comments1 from dual;
1533           END IF;
1534        END IF;
1535 
1536     END IF;
1537 
1538 
1539 
1540     /* MSD_CS_DATALOAD_INVALID_LVLID - Invalid Level ID for Dimensions    */
1541     /* MSD_CS_DATALOAD_INVALID_DIM     - Invalid Dimensions */
1542     select decode(l_comments2, null, null, 'MSD_CS_DATALOAD_INVALID_DIM : ' || l_comments2) ||
1543            decode(l_comments1, null, null, 'MSD_CS_DATALOAD_INVALID_LVLID : ' || l_comments1)
1544     into p_err_mesg
1545     from dual;
1546 
1547     /* Validate Date Format */
1548     Begin
1549         select to_date(crec_data.attribute_43, 'YYYY/MM/DD')
1550             into l_dummy_date
1551         from dual;
1552     Exception
1553     When others then
1554         p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_DATE_FORMAT : ATTRIBUTE_43';
1555     End;
1556 
1557     /* Validate Amount Number Format */
1558     Begin
1559       -- Check Amount
1560       if (p_cs_rec.measurement_type in (1,3,4)) then
1561 	  l_dummy_number := crec_data.attribute_42;
1562       end if;
1563     Exception
1564     When others then
1565        p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_42';
1566     End;
1567 
1568     /* Validate Quantity Number Format */
1569     Begin
1570       -- Check Quantity
1571       if (p_cs_rec.measurement_type in (2,4,5)) then
1572 	l_dummy_number := crec_data.attribute_41;
1573       end if;
1574     Exception
1575     When others then
1576        p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_41';
1577     End;
1578 
1579     /* Validate Price Number Format */
1580     Begin
1581       -- Check Price
1582       if (p_cs_rec.measurement_type in (3,5)) then
1583 	l_dummy_number := crec_data.attribute_44;
1584       end if;
1585     Exception
1586     When others then
1587        p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_44';
1588     End;
1589 
1590     if p_err_mesg is null then
1591         return TRUE;
1592     else
1593         return FALSE;
1594     end if;
1595 
1596 Exception
1597 When others then
1598     show_line(sqlerrm);
1599     raise;
1600 
1601 End;
1602 
1603 Function get_level_pk (
1604     p_instance          in varchar2,
1605     p_level_id          in number,
1606     p_sr_level_value_pk in OUT NOCOPY varchar2,
1607     p_level_value       in OUT NOCOPY varchar2,
1608     p_level_value_pk    in OUT NOCOPY varchar2) return varchar2 is
1609 
1610     Cursor c1 is
1611     select level_pk, level_value
1612     from
1613         msd_level_values
1614     where
1615         instance = p_instance and
1616         level_id = p_level_id and
1617         sr_level_pk = p_sr_level_value_pk;
1618 
1619     Cursor c2 is
1620     select level_pk
1621     from
1622         msd_level_values
1623     where
1624         instance = p_instance and
1625         level_id = p_level_id and
1626         level_value = p_level_value;
1627 
1628     Cursor c3 is
1629     select sr_level_pk, level_value
1630     from
1631         msd_level_values
1632     where
1633         instance = p_instance and
1634         level_id = p_level_id and
1635         level_pk = p_level_value_pk;
1636 
1637     l_level_pk  varchar2(255):=g_level_pk_not_found;
1638     l_level_val varchar2(2000);
1639 Begin
1640 
1641 --    debug_line('In get_level_pk');
1642     if p_level_id is null then
1643        return null;
1644        /* i.e. no data collected for dimension
1645        */
1646     end if;
1647 
1648     if p_instance is null or nvl(p_sr_level_value_pk, nvl(p_level_value, p_level_value_pk)) is null then
1649         /* insufficient parameters */
1650         /* l_level_pk := g_level_pk_not_found; */
1651         return null;
1652     else
1653         if p_sr_level_value_pk is not null then
1654             open c1;
1655             fetch c1 into l_level_pk, p_level_value;
1656             if c1%notfound then
1657                 l_level_pk := g_level_pk_not_found;
1658             end if;
1659             close c1;
1660         elsif p_level_value is not null then
1661             open c2;
1662             fetch c2 into l_level_pk;
1663             if c2%notfound then
1664                 l_level_pk := g_level_pk_not_found;
1665             end if;
1666             close c2;
1667         else /* p_level_value_pk is not null */
1668             open c3;
1669             fetch c3 into p_sr_level_value_pk, p_level_value;
1670             if c3%notfound then
1671                 l_level_pk := g_level_pk_not_found;
1672             else
1673                l_level_pk := p_level_value_pk;
1674             end if;
1675             close c3;
1676         end if;
1677 
1678     end if;
1679 
1680     if l_level_pk <> g_level_pk_not_found then
1681       p_level_value_pk := l_level_pk;
1682     else
1683         debug_line(' p_instance ' || p_instance || ' p_level_id ' || p_level_id ||
1684                ' p_sr_level_value_pk ' || p_sr_level_value_pk || ' p_level_value ' || p_level_value ||
1685                ' p_level_value_pk  ' || p_level_value_pk);
1686     end if;
1687 
1688     return l_level_pk;
1689 
1690 Exception
1691 When others then
1692     show_line(sqlerrm);
1693     raise;
1694 
1695 End;
1696 
1697 Function Build_Where_Clause (
1698     p_tokenized_where   in  varchar2,
1699     p_default_where     in  varchar2,
1700     p_parameter1        in  varchar2,
1701     p_parameter2        in  varchar2,
1702     p_parameter3        in  varchar2,
1703     p_parameter4        in  varchar2,
1704     p_parameter5        in  varchar2,
1705     p_parameter6        in  varchar2,
1706     p_parameter7        in  varchar2,
1707     p_parameter8        in  varchar2,
1708     p_parameter9        in  varchar2,
1709     p_parameter10       in  varchar2,
1710     p_request_id        in  number) return varchar2 is
1711 
1712     Type param_list_type is varray(10) of varchar2(255);
1713 
1714 
1715     l_para_list param_list_type;
1716     l_where     varchar2(3000);
1717 
1718     Procedure find_and_subst_param ( p_where_cond  in out NOCOPY varchar2,
1719                                      p_val             in varchar2,
1720                                      p_request_id      in number,
1721                                      p_para_num        in number) is
1722 
1723         start_pos number;
1724         end_pos   number;
1725         para_type varchar2(10);
1726 
1727         l_default_col            varchar2(300) := NULL;
1728         l_count                  number := 0;
1729 
1730         l_dblink                 varchar2(100) := NULL;
1731         l_retcode	         number := 0;
1732         l_multi_flag             varchar2(30) := 'N';
1733 
1734 --   'CHAR:Prompt_Name:ValueSet_Name:Remote_Yes_No:Multi_Yes_NO:Default_Column_Name_For_Multi'
1735 
1736     Begin
1737         debug_line('In find_and_subst_param');
1738         start_pos := instr(p_where_cond, '&&', 1);
1739         para_type := substr(p_where_cond, start_pos + 2, 7);
1740         end_pos   := instr(p_where_cond, '''', start_pos);
1741 
1742         if substr(upper(para_type), 1, 5) = 'CHAR:' then /* Character type */
1743            l_multi_flag :=
1744                nvl(upper(msd_cs_defn_utl2.get_char_property(p_where_cond, start_pos, end_pos, 4)), 'N');
1745 
1746            l_default_col := msd_cs_defn_utl2.get_char_property(p_where_cond, start_pos, end_pos, 5);
1747 
1748            IF l_multi_flag = 'Y' THEN
1749              /* If multi input parar then check whether user entered
1750                 any values for the multi input parameters */
1751               select count(1) into l_count from msd_cs_coll_parameters
1752                                     where conc_request_id = p_request_id and
1753                                     parameter_number = p_para_num;
1754               /* If user hasn't entered any multi input parameters then
1755                  use user specified default column name */
1756               IF (l_count = 0 AND l_default_col IS NOT NULL) THEN
1757                  p_where_cond := substr(p_where_cond, 1, start_pos - 2) ||
1758                                  l_default_col ||
1759                                  substr(p_where_cond, end_pos + 1);
1760               ELSE
1761                  p_where_cond :=  substr(p_where_cond, 1, start_pos - 2) ||
1762                                ' (SELECT parameter_code FROM msd_cs_coll_parameters ' ||
1763                                ' WHERE conc_request_id = ' || p_request_id ||
1764                                ' AND parameter_number = ' || p_para_num || ' ) ' ||
1765                                substr(p_where_cond, end_pos + 1);
1766               END IF;
1767            ELSE
1768               p_where_cond := substr(p_where_cond, 1, start_pos - 1) ||
1769                               replace(p_val, '''', '''''') ||
1770                               substr(p_where_cond, end_pos);
1771            END IF;
1772         elsif substr(upper(para_type), 1, 7) = 'NUMBER:' then /* Number type*/
1773             p_where_cond := substr(p_where_cond, 1, start_pos - 2) || p_val ||
1774                             substr(p_where_cond, end_pos + 1);
1775         elsif substr(upper(para_type), 1, 5) = 'DATE:' then /* Date type */
1776             p_where_cond := substr(p_where_cond, 1, start_pos - 2) || 'to_date(''' || p_val || ''', ''YYYYMMDD'')' ||
1777                             substr(p_where_cond, end_pos + 1);
1778         end if;
1779 
1780 debug_line(p_where_cond);
1781 
1782     End;
1783 
1784 
1785     Procedure substitute_parameter (
1786         p_where_cond    in out NOCOPY varchar2,
1787         p_param_list    in     param_list_type,
1788         p_request_id    in     number) is
1789 
1790         i number := 1;
1791     Begin
1792         debug_line('In substitute_parameter');
1793        /* DP-CRM Code changes by easwaran */
1794         while (i < 11 )
1795         loop
1796             find_and_subst_param( p_where_cond, p_param_list(i), p_request_id, i);
1797             i := i + 1;
1798         end loop;
1799 
1800     End;
1801 
1802     Procedure make_para_list(
1803         p_parameter1    in     varchar2,
1804         p_parameter2    in     varchar2,
1805         p_parameter3    in     varchar2,
1806         p_parameter4    in     varchar2,
1807         p_parameter5    in     varchar2,
1808         p_parameter6    in     varchar2,
1809         p_parameter7    in     varchar2,
1810         p_parameter8    in     varchar2,
1811         p_parameter9    in     varchar2,
1812         p_parameter10   in     varchar2,
1813         p_para_list     in out NOCOPY param_list_type) is
1814     Begin
1815         debug_line('In make_para_list');
1816         p_para_list := param_list_type (p_parameter1, p_parameter2, p_parameter3, p_parameter4, p_parameter5,
1817                                         p_parameter6, p_parameter7, p_parameter8, p_parameter9, p_parameter10);
1818 
1819     End;
1820 
1821 
1822 Begin
1823     debug_line('In Build_Where_Clause');
1824     if p_tokenized_where is not null then
1825        /*
1826          convert parameters into an array.
1827        */
1828         make_para_list( p_parameter1, p_parameter2, p_parameter3, p_parameter4, p_parameter5,
1829                         p_parameter6, p_parameter7, p_parameter8, p_parameter9, p_parameter10,
1830                         l_para_list);
1831 
1832         /* Build additional Where */
1833         l_where := p_tokenized_where;
1834 
1835         substitute_parameter ( l_where, l_para_list, p_request_id);
1836 
1837     end if;
1838     if l_where is not null then
1839         if p_default_where is not null then
1840             l_where := p_default_where || ' and ' || l_where;
1841         end if;
1842     else
1843         l_where := p_default_where;
1844     end if;
1845 
1846     return l_where;
1847 
1848 Exception
1849 When others then
1850     show_line(sqlerrm);
1851     raise;
1852 
1853 End;
1854 
1855 Procedure Refresh_Target(
1856                           p_process_type      in varchar2,
1857                           p_cs_definition_id  in number,
1858                           p_cs_name           in varchar2,
1859                           p_comp_refresh      in varchar2,
1860                           p_instance_id       in number,
1861                           p_new_refresh_num   in NUMBER) is
1862 
1863     l_sql_stmt  varchar2(2000);
1864 
1865     cursor C_GET_DEL_CRIT is
1866     select distinct attribute_1 instance, cs_name
1867     from msd_st_cs_data
1868     where cs_definition_id = p_cs_definition_id and
1869           cs_name = nvl(p_cs_name, cs_name);
1870 
1871     /* DWK  create a separe cursor to fetch instance in single stream case */
1872     cursor c_get_del_crit_single is
1873     select distinct attribute_1 instance
1874     from msd_st_cs_data
1875     where cs_definition_id = p_cs_definition_id;
1876 
1877     cursor c_multi_stream is
1878     select nvl(multiple_stream_flag,'N')
1879     from msd_cs_definitions
1880     where cs_definition_id = p_cs_definition_id;
1881 
1882     l_multi_flag  VARCHAR2(10);
1883 
1884 Begin
1885     debug_line('In refresh_target');
1886     if p_comp_refresh = 'Y' then
1887 
1888 /*        if p_process_type = C_SOURCE_TO_FACT then
1889             delete from msd_cs_data where cs_definition_id = p_cs_definition_id
1890                         and cs_name = nvl(p_cs_name, cs_name) and attribute_1 = nvl(p_instance_id, attribute_1);
1891 */
1892         IF p_process_type = C_SOURCE_TO_STAGE then
1893             delete from msd_st_cs_data where cs_definition_id = p_cs_definition_id
1894                         and cs_name = nvl(p_cs_name, cs_name) and attribute_1 = nvl(p_instance_id, attribute_1);
1895 
1896         elsif p_process_type = C_STAGE_TO_FACT then
1897             /* DWK  For single stream, ignore the CS_NAME column for refresh */
1898             open c_multi_stream;
1899             fetch c_multi_stream into l_multi_flag;
1900             close c_multi_stream;
1901 
1902             IF (l_multi_flag = 'Y') THEN
1903                For l_rec IN c_get_del_crit LOOP
1904                   UPDATE msd_cs_data
1905                   SET Action_code = 'D',
1906                       last_refresh_num = p_new_refresh_num
1907                   WHERE cs_definition_id = p_cs_definition_id and
1908                         cs_name = l_rec.cs_name and
1909                         attribute_1 = l_rec.instance and
1910                         action_code = 'I';
1911                END LOOP;
1912 
1913             ELSE    /* For single stream, ignore the cs_name in delete stmt */
1914                For l_rec IN c_get_del_crit_single LOOP
1915                   UPDATE msd_cs_data
1916                   SET Action_code = 'D',
1917                       last_refresh_num = p_new_refresh_num
1918                   WHERE cs_definition_id = p_cs_definition_id and
1919                         attribute_1 = l_rec.instance and
1920                         action_code = 'I';
1921 
1922                END LOOP;
1923             END IF;
1924 
1925         end if;   /* End of C_STAGE_TO_FACT */
1926     else /* Not Complete Refresh
1927           /* Delete data from staging table to avoid double couting when user runs
1928            collection source to stage without complete refresh checkbox checked
1929            This will make custom stream collection behaviour same as other
1930            collection (Bookking/Shipment)
1931          */
1932         IF p_process_type = C_SOURCE_TO_STAGE then
1933             delete from msd_st_cs_data
1934                 where cs_definition_id = p_cs_definition_id and
1935                       cs_name = nvl(p_cs_name, cs_name) and
1936                       attribute_1 = nvl(p_instance_id, attribute_1);
1937         END IF;
1938     end if;   /* End of p_comp_refresh Y */
1939 
1940 Exception
1941 When others then
1942     show_line(sqlerrm);
1943     raise;
1944 
1945 End;
1946 
1947 Procedure Process_1_Sub (
1948         p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
1949         p_cs_name           in  varchar2,
1950         p_source_view       in  varchar2,
1951         p_target_table      in  varchar2,
1952         p_instance_id       in  number,
1953         p_sql_stmt          in  varchar2,
1954         p_new_refresh_num   IN  NUMBER) is
1955 
1956     TYPE cur_type is REF CURSOR;
1957     l_cur       cur_type;
1958     l_rec       msd_cs_dfn_utl.G_TYP_SOURCE_STREAM;
1959 
1960     l_valid     boolean;
1961     l_err_msg   varchar2(1000);
1962 
1963     l_success_rows  number:=0;
1964     l_error_rows    number:=0;
1965 
1966     /* Bug# 4349618  To commit in Batches */
1967     l_counter	    number:=0;
1968     l_commit_flag   number:=0;
1969 
1970 /* DWK */
1971    l_temp_designator         VARCHAR2(40) := NULL;
1972    l_temp_instance_id        NUMBER := NULL;
1973 
1974 Begin
1975     debug_line('In Process_1_Sub');
1976 
1977     open l_cur for p_sql_stmt;
1978     LOOP
1979         fetch l_cur into l_rec;
1980         exit when l_cur%notfound;
1981 
1982         l_valid := null;
1983         l_err_msg := null;
1984 
1985         debug_line('Validating ' || l_rec.pk_id);
1986 
1987 
1988         l_valid := validate_record (l_rec, p_cs_rec, nvl(l_rec.instance,p_instance_id), l_err_msg);
1989 
1990         IF l_valid THEN
1991             /* IMP : Instance is p_instance in case of Collect
1992                      and l_rec.instance in case of PULLL i.e. from the staging
1993 		     table */
1994             IF  (p_target_table = 'MSD_CS_DATA') THEN
1995                 ins_row_fact(l_rec, p_cs_rec, l_rec.designator,
1996                              nvl(p_instance_id, l_rec.instance),
1997                              p_new_refresh_num);
1998 
1999                 /* Insert designator into headers talbe when designator get modified. */
2000 		IF ( l_rec.designator <> nvl(l_temp_designator,'-99999999~!@') OR
2001 		   nvl(p_instance_id,l_rec.instance) <> nvl(l_temp_instance_id,-99999999) ) THEN
2002 		   l_temp_designator  := l_rec.designator;
2003 		   l_temp_instance_id := nvl(p_instance_id,l_rec.instance);
2004 
2005 		   /* DWK  Populate MSD_CS_DATA_HEADERS table after inserting rows
2006 		      into FACT table */
2007 		   insert_update_Into_Headers (	p_cs_rec.cs_definition_id,
2008 						l_rec.designator,
2009 						nvl(p_instance_id,l_rec.instance), p_new_refresh_num);
2010 		END IF;
2011 	    ELSE
2012                 ins_row_staging(l_rec, p_cs_rec, l_rec.designator, nvl(p_instance_id, l_rec.instance), null, null);
2013             END IF;
2014 
2015             /* Mark record Processed */
2016             log_processed(l_rec, p_cs_rec, l_rec.designator, nvl(p_instance_id, l_rec.instance), p_source_view, p_target_table);
2017 
2018             /* Count Success Rows */
2019             l_success_rows := l_success_rows + 1;
2020 
2021             /* Bug# 4349618  To commit in Batches */
2022             l_counter	:= l_counter + 1;
2023 
2024         ELSE   /* IF not Valid */
2025 
2026             /*  Log Error */
2027             log_error(l_rec, p_cs_rec, l_rec.designator,
2028                       nvl(p_instance_id, l_rec.instance),
2029                       l_err_msg, p_source_view, p_target_table);
2030             /* Count Erroneous Rows */
2031             l_error_rows := l_error_rows + 1;
2032 
2033             /* Bug# 4349618  To commit in Batches */
2034             l_counter	:= l_counter + 1;
2035 
2036         END IF;
2037 
2038         /* Bug# 4349618  To commit in Batches */
2039 	SELECT mod( l_counter, C_BATCH_SIZE)
2040 		INTO l_commit_flag
2041 		FROM dual;
2042 
2043 	IF l_commit_flag = 0 THEN
2044 		debug_line( 'Inside Process_1_Sub: commiting inside the loop.');
2045 		commit;
2046 	END IF;
2047 
2048     END LOOP;
2049 
2050     /* Bug$ 4349618  To commit in Batches*/
2051     debug_line( 'Inside Process_1_Sub: commiting after the loop ends.');
2052     commit;
2053 
2054 
2055    if l_error_rows > 0 then
2056         g_retcode := '1';
2057         g_errbuf := 'There were erroneous records in Collect/Pull.';
2058     end if;
2059 
2060     if l_success_rows = 0 and l_error_rows = 0then
2061         g_retcode := '1';
2062         g_errbuf := 'There were no rows fetched.';
2063     end if;
2064 
2065     /* Print Results */
2066 
2067     show_line('Valid Records   : ' || l_success_rows);
2068     show_line('Invalid Records : ' || l_error_rows);
2069 
2070     close l_cur;
2071 
2072 Exception
2073 When others then
2074     show_line(sqlerrm);
2075     show_line(p_sql_stmt);
2076     close l_cur;
2077     raise;
2078 End;
2079 
2080 Function Build_Designator_Where_Clause(
2081     p_cs_rec        in  msd_cs_definitions_v1%rowtype,
2082     p_process_type  in  varchar2,
2083     p_cs_name       in  varchar2) return varchar2 is
2084 
2085     Cursor C1 is
2086      select source_view_column_name
2087      from msd_cs_defn_column_dtls
2088      where
2089         cs_definition_id = p_cs_rec.cs_definition_id and
2090         table_column = 'CS_NAME';
2091 
2092     l_where_cond    varchar2(500);
2093     l_col_name      varchar2(60);
2094 
2095 Begin
2096     debug_Line('In Build_Designator_Where_Clause');
2097 
2098     /* Build filter for designator(cs_name) */
2099     if p_process_type in (C_STAGE_TO_FACT) then
2100 
2101       if p_cs_name is not null then
2102           l_where_cond := 'cs_name = ' || '''' || replace(p_cs_name, '''', '''''') || '''';
2103       end if;
2104     else
2105         if nvl(p_cs_rec.multiple_stream_flag, 'N') = 'Y' and p_cs_name is not null then
2106             open c1;
2107             fetch c1 into l_col_name;
2108             close c1;
2109 
2110             if l_col_name is null then
2111                 null;
2112                 /*Raise Error*/
2113             else
2114                 l_where_cond := l_col_name || ' = ' || '''' || replace(p_cs_name, '''', '''''') || '''';
2115             end if;
2116         end if;
2117     end if;
2118 
2119     /* Add Default Where */
2120     if p_process_type = C_STAGE_TO_FACT then
2121         if l_where_cond is not null then
2122             l_where_cond := l_where_cond || ' and cs_definition_id = ' || p_cs_rec.cs_definition_id;
2123         else
2124             l_where_cond := ' cs_definition_id = ' || p_cs_rec.cs_definition_id;
2125         end if;
2126     end if;
2127 
2128     return l_where_cond;
2129 
2130 Exception
2131 When others then
2132     show_line(sqlerrm);
2133     raise;
2134 
2135 End;
2136 
2137 Procedure debug_line(p_sql in    varchar2)is
2138 Begin
2139     if c_debug = 'Y' then
2140         show_line(p_sql);
2141     end if;
2142 End;
2143 
2144 /* DWK */
2145 /*************************************************************************************************
2146 PROCEDURE Insert_update_Into_Headers
2147 
2148 This procedure will decide whether insert cs_definition_id, cs_name, and instance into
2149 msd_cs_data_headers table or not and insert row if necessary.
2150 **************************************************************************************************/
2151 Procedure insert_update_Into_Headers (	p_cs_definition_id  in  number,
2152 					p_cs_name           in  varchar2,
2153 					p_instance_id       in  number,
2154                                         p_refresh_num       in number) is
2155 
2156 
2157 p_count    NUMBER:=0;
2158 
2159 BEGIN
2160 
2161    SELECT count(*) INTO p_count FROM msd_cs_data_headers_v1
2162    WHERE instance = p_instance_id AND
2163       cs_definition_id = p_cs_definition_id AND
2164       cs_name = p_cs_name;
2165 
2166    IF ( p_count = 0 ) THEN
2167       Insert_Data_Into_Headers (p_cs_definition_id,
2168 				p_cs_name,
2169 				p_instance_id,
2170                                 p_refresh_num);
2171    ELSE
2172 
2173       update msd_cs_data_headers
2174       set last_refresh_num = p_refresh_num
2175       where cs_definition_id = p_cs_definition_id
2176       and instance = p_instance_id
2177       and cs_name = p_cs_name;
2178 
2179    END IF;
2180 
2181 
2182 Exception
2183 When others then
2184     show_line(sqlerrm);
2185     raise;
2186 
2187 END insert_update_Into_Headers;
2188 
2189 
2190 /*************************************************************************************************
2191 PROCEDURE Insert_Data_Into_Headers
2192 
2193 This procedure will insert cs_definition_id, cs_name, and instance into
2194 msd_cs_data_headers table.
2195 **************************************************************************************************/
2196 Procedure Insert_Data_Into_Headers (	p_cs_definition_id  in  number,
2197 					p_cs_name           in  varchar2,
2198 					p_instance_id       in  number,
2199                                         p_refresh_num       in  number) is
2200 
2201 
2202 BEGIN
2203 
2204    INSERT INTO msd_cs_data_headers
2205 	(	CS_DATA_HEADER_ID,
2206 		INSTANCE,
2207 		CS_DEFINITION_ID,
2208 		CS_NAME,
2209 		LAST_UPDATE_DATE,
2210 		LAST_UPDATED_BY,
2211 		CREATION_DATE,
2212 		CREATED_BY,
2213 		LAST_UPDATE_LOGIN,
2214                 LAST_REFRESH_NUM
2215 	)
2216    VALUES (	msd_cs_data_headers_s.nextval,
2217 		p_instance_id,
2218 		p_cs_definition_id,
2219 		p_cs_name,
2220 		sysdate,
2221 		fnd_global.user_id,
2222 		sysdate,
2223 		fnd_global.user_id,
2224 		fnd_global.login_id,
2225                 p_refresh_num
2226 	);
2227 
2228 
2229 
2230 Exception
2231   When others then
2232     show_line('Error in inserting into MSD_CS_DATA_HEADERS');
2233     show_line(sqlerrm);
2234     raise;
2235 
2236 END Insert_Data_Into_Headers;
2237 
2238 End;