DBA Data[Home] [Help]

PACKAGE BODY: APPS.MSD_CS_COLLECTION

Source


1 package body msd_cs_collection as
2 /* $Header: msdcsclb.pls 120.3 2010/04/30 09:02:55 lannapra ship $ */
3     /*
4        Constants
5     */
6     C_LOG_PROCESSED Constant varchar2(30) :='PROCESSED';
7     C_LOG_ERROR     Constant varchar2(30) :='ERROR';
8     C_DEFAULT_STREAM_NAME Constant varchar2(30) := 'SINGLE_STREAM';
9     C_COLLECT             Constant varchar2(10)  := 'C';
10     C_PULL                Constant varchar2(10)  := 'P';
11     /* Debug */
12     C_DEBUG               Constant varchar2(1) := 'N';
13     /* Bug# 4349618  To commit in Batches */
14     C_BATCH_SIZE          Constant NUMBER := 30000;
15     /*
16        Process Types
17     */
18     C_SOURCE_TO_STAGE Constant NUMBER := 1;
19     C_SOURCE_TO_FACT  Constant NUMBER := 2;
20     C_STAGE_TO_FACT   Constant NUMBER := 3;
21     /*  == */
22     g_level_pk_not_found    varchar2(30):='%%^^)(::Error::%%^^)(';
23     /*  Error Status of prog */
24     g_retcode   varchar2(30);
25     g_errbuf    varchar2(255);
26     /*                       */
27     /*
28     == Local Function/Procedures
29     */
30 
31 Procedure insert_update_Into_Headers (	p_cs_definition_id  in  number,
32 					p_cs_name           in  varchar2,
33 					p_instance_id       in  number,
34                                         p_refresh_num       in  number);
35 
36 Procedure Insert_Data_Into_Headers (  	p_cs_definition_id  in  number,
37 					p_cs_name           in  varchar2,
38 					p_instance_id       in  number,
39                                         p_refresh_num       in  number);
40 Procedure Process_1_Sub (
41         p_cs_rec         in out NOCOPY  msd_cs_definitions_v1%rowtype,
42         p_cs_name           in  varchar2,
43         p_source_view       in  varchar2,
44         p_target_table      in  varchar2,
45         p_instance_id       in  number,
46         p_sql_stmt          in  varchar2,
47         p_new_refresh_num   in  NUMBER);
48 
49     Function Build_SQL_Source(
50         p_cs_definition_id in number,
51         p_process_type     in number,
52         p_instance_id      in varchar2,
53         p_cs_name          in varchar2) return varchar2;
54 
55     Function Build_SQL_FOR_COLLECT_AND_VAL(
56         p_cs_definition_id in number,
57         p_process_type     in number,
58         p_source_view      in varchar2,
59         p_db_link          in varchar2,
60         p_cs_name          in varchar2) return varchar2;
61 
62     Function Build_SQL_INS_AS_SELECT(
63         p_cs_definition_id in number,
64         p_instance_id      in varchar2,
65         p_cs_name          in varchar2,
66         p_source_view      in varchar2,
67         p_db_link          in varchar2) return varchar2;
68 
69     Function Build_Where_Clause (
70         p_tokenized_where   in  varchar2,
71         p_default_where     in  varchar2,
72         p_parameter1        in  varchar2,
73         p_parameter2        in  varchar2,
74         p_parameter3        in  varchar2,
75         p_parameter4        in  varchar2,
76         p_parameter5        in  varchar2,
77         p_parameter6        in  varchar2,
78         p_parameter7        in  varchar2,
79         p_parameter8        in  varchar2,
80         p_parameter9        in  varchar2,
81         p_parameter10       in  varchar2,
82         p_request_id        in  number) return varchar2;
83 
84     Procedure log_processed (
85         crec_data in msd_cs_dfn_utl.g_typ_source_stream,
86         p_cs_rec        in msd_cs_definitions_v1%rowtype,
87         p_cs_name       in varchar2,
88         p_instance_id   in varchar2,
89         p_source_view   in varchar2,
90         p_target_table  in varchar2);
91 
92     Procedure log_error (
93         crec_data in msd_cs_dfn_utl.g_typ_source_stream,
94         p_cs_rec        in msd_cs_definitions_v1%rowtype,
95         p_cs_name       in varchar2,
96         p_instance_id   in varchar2,
97         p_error_message in varchar2,
98         p_source_view   in varchar2,
99         p_target_table  in varchar2);
100 
101     Procedure upd_stage_error (p_pk_id in number, p_process_status in varchar2, p_error_mesg in varchar2);
102 
103     Procedure Refresh_Target(p_process_type in varchar2,
104                              p_cs_definition_id in number,
105                              p_cs_name in varchar2,
106                              p_comp_refresh in varchar2,
107                              p_instance_id in number,
108                              p_new_refresh_num in NUMBER);
109 
110     Procedure ins_row_fact(
111         crec_data          in msd_cs_dfn_utl.g_typ_source_stream,
112         p_cs_rec           in msd_cs_definitions_v1%rowtype,
113         p_cs_name          in varchar2,
114         p_instance_id      in varchar2,
115         p_new_refresh_num  IN NUMBER);
116 
117     Procedure ins_row_staging (
118         crec_data       in  msd_cs_dfn_utl.g_typ_source_stream,
119         p_cs_rec        in msd_cs_definitions_v1%rowtype,
120         p_cs_name       in varchar2,
121         p_instance_id   in varchar2,
122         p_process_status in varchar2,
123         p_error_message in varchar2);
124 
125 
126     Procedure cs_collect_post_process (
127         p_cs_Rec        in msd_cs_definitions_v1%rowtype,
128         p_cs_name       in varchar2,
129         p_instance_id   in varchar2 );
130 
131     Procedure Process_1 (
132         p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
133         p_cs_name           in  varchar2,
134         p_db_link           in  varchar2,
135         p_source_view       in  varchar2,
136         p_target_table      in  varchar2,
137         p_process_type      in  number,
138         p_default_where     in  varchar2,
139         p_tokenized_where   in  varchar2,
140         p_comp_refresh      in  varchar2,
141         p_instance_id       in  number,
142         p_parameter1        in  varchar2,
143         p_parameter2        in  varchar2,
144         p_parameter3        in  varchar2,
145         p_parameter4        in  varchar2,
146         p_parameter5        in  varchar2,
147         p_parameter6        in  varchar2,
148         p_parameter7        in  varchar2,
149         p_parameter8        in  varchar2,
150         p_parameter9        in  varchar2,
151         p_parameter10       in  varchar2,
152         p_new_refresh_num   IN  NUMBER,
153         p_request_id        in  number);
154 
155     Procedure Process_2 (
156         p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
157         p_cs_name           in  varchar2,
158         p_db_link           in  varchar2,
159         p_source_view       in  varchar2,
160         p_target_table      in  varchar2,
161         p_process_type      in  number,
162         p_default_where     in  varchar2,
163         p_tokenized_where   in  varchar2,
164         p_comp_refresh      in  varchar2,
165         p_instance_id       in  number,
166         p_parameter1        in  varchar2,
167         p_parameter2        in  varchar2,
168         p_parameter3        in  varchar2,
169         p_parameter4        in  varchar2,
170         p_parameter5        in  varchar2,
171         p_parameter6        in  varchar2,
172         p_parameter7        in  varchar2,
173         p_parameter8        in  varchar2,
174         p_parameter9        in  varchar2,
175         p_parameter10       in  varchar2,
176         p_request_id        in  number);
177 
178     Function get_level_pk (
179         p_instance          in varchar2,
180         p_level_id          in number,
181         p_sr_level_value_pk in OUT NOCOPY varchar2 ,
182         p_level_value       in OUT NOCOPY varchar2,
183         p_level_value_pk    in OUT NOCOPY varchar2) return varchar2;
184 
185 
186     Procedure show_line(p_sql in    varchar2);
187 
188     Procedure debug_line(p_sql in    varchar2);
189 
190     Function validate_record (
191         crec_data       in out NOCOPY  msd_cs_dfn_utl.g_typ_source_stream,
192         p_cs_rec        in out NOCOPY  msd_Cs_definitions_v1%rowtype,
193         p_instance_id   in      varchar2,
194         p_err_mesg      out    NOCOPY  varchar2) return boolean;
195 
196     Function Build_Designator_Where_Clause(
197         p_cs_rec        in  msd_cs_definitions_v1%rowtype,
198         p_process_type  in  varchar2,
199         p_cs_name       in  varchar2) return varchar2;
200 
201 /* Logic Starts here */
202 Procedure Custom_Stream_Collection (
203                   errbuf           OUT NOCOPY  varchar2,
204                   retcode          OUT NOCOPY  varchar2,
205                   p_collection_type in  varchar2,
206                   p_validate_data   in  varchar2,
207                   p_definition_id   in  number,
208                   p_cs_name         in  varchar2,
209                   p_comp_refresh    in  varchar2,
210                   p_instance_id     in  number,
211                   p_parameter1      in  varchar2,
212                   p_parameter2      in  varchar2,
213                   p_parameter3      in  varchar2,
214                   p_parameter4      in  varchar2,
215                   p_parameter5      in  varchar2,
216                   p_parameter6      in  varchar2,
217                   p_parameter7      in  varchar2,
218                   p_parameter8      in  varchar2,
219                   p_parameter9      in  varchar2,
220                   p_parameter10     in  varchar2,
221                   p_request_id      in  number default 0) is
222 
223 
224     l_single_step_collection    varchar2(30):='Y';
225 
226     ll_name varchar2(80) := null;
227 
228     cursor c_get_cs is
229         select * from msd_cs_definitions_v1
230         where
231             cs_definition_id = p_definition_id and
232             nvl(valid_flag, 'N') = 'Y';
233 
234     l_sql_stmt      varchar2(32767);
235     l_cs_rec        msd_cs_definitions_v1%rowtype;
236     l_target        varchar2(60);
237     l_source        varchar2(60);
238     l_dblink        varchar2(60);
239     l_default_where varchar2(200);
240     l_cs_name       varchar2(255);
241     l_retcode       VARCHAR2(30);
242     L_PROCESS_TYPE  number;
243     l_conc_request_id  number;
244 
245     l_new_refresh_num  NUMBER;
246     l_comp_refresh     VARCHAR2(30);
247 
248 Begin
249 
250   select DESCRIPTION
251   into ll_name
252   from msd_cs_definitions
253   where cs_definition_id = p_definition_id;
254 
255   show_line('Stream Name   : ' || ll_name);
256 
257   debug_line('In Custom Stream Collection');
258   /* Initialize */
259   errbuf := 'Program Completed with Success';
260   retcode := '0';
261 
262   /*  Get profile */
263   l_single_step_collection := nvl(fnd_profile.value('MSD_ONE_STEP_COLLECTION'), 'N');
264 
265   l_conc_request_id := fnd_global.conc_request_id;
266   debug_line('Conc Reuquest ID : ' || l_conc_request_id);
267 
268   /* Validate definition id */
269   open c_get_cs;
270   fetch c_get_cs into l_cs_rec;
271   if c_get_cs%notfound then
272         /*  raise error */
273         retcode := 2;
274         errbuf := 'Custom Definition Not Found : ' || p_definition_id;
275         /* DWK close cursor */
276         close c_get_cs;
277         return;
278   end if;
279   /* DWK close cursor */
280   close c_get_cs;
281 
282   if p_collection_type = C_COLLECT  and l_cs_rec.source_view_name is null then
283         /* Print message/ raise error */
284         retcode := 2;
285         errbuf := 'Can not preform Collection - Source View is not specified.';
286         return;
287   end if;
288 
289   if nvl(l_cs_rec.multiple_stream_flag, 'N') <> 'Y' then
290       l_cs_name := C_DEFAULT_STREAM_NAME;
291   else
292       l_cs_name := p_cs_name;
293   end if;
294   /* Instance must be specified for collection */
295   if p_collection_type = C_COLLECT  and p_instance_id is null then
296         /* Print message/raise error */
297         retcode :=2;
298         errbuf := 'Instance must be specified';
299         return;
300   end if;
301   /*
302      Fetch database link only if stream source type is 'SOURCE'
303   */
304   if l_cs_rec.cs_type in ('SOURCE') and p_collection_type = C_COLLECT then
305        msd_common_utilities.get_db_link(p_instance_id, l_dblink, l_retcode);
306 	   if (l_retcode = -1) then
307 	      	retcode := 2;
308 	      	errbuf := 'Error while getting db_link';
309 	        return;
310 	   end if;
311   end if;
312 
313   /*--------------  For Collection ----------------------------*/
314   IF ( p_collection_type = C_COLLECT )  THEN
315 
316       /* Check and push setup parameters if it is not done so previously */
317       MSD_PUSH_SETUP_DATA.chk_push_setup(   errbuf,
318                                             retcode,
319                                             p_instance_id);
320       IF (nvl(retcode, 0) <> 0) THEN
321           return;
322       END IF;
323 
324       IF (l_single_step_collection = 'Y' and p_validate_data = 'Y') THEN
325           /* One Step Collection will be internally transformed into
326              Two Step Collection and Pull.
327              Set Source and Target
328              Set Global var for  processing error record and marking processed */
329 
330           /* Collect into staging without Validation. Validation will be done in PULL */
331           l_target := 'MSD_ST_CS_DATA';
332           l_process_type := C_SOURCE_TO_STAGE;
333           l_source := l_cs_rec.source_view_name;
334           /* Internally transformed 2 step collection, always performs complete
335              refresh for collection part */
336           l_comp_refresh := 'Y';
337 
338           l_default_where := Build_Designator_Where_Clause( l_cs_rec,
339                                                             l_process_type,
340                                                             p_cs_name);
341           Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
342                          l_cs_name, l_comp_refresh, p_instance_id,
343                          l_new_refresh_num);
344           Process_2(
345                      p_cs_rec            => l_cs_rec,
346                      p_cs_name           => l_cs_name,
347                      p_db_link           => l_dblink,
348                      p_source_view       => l_source,
349                      p_target_table      => l_target,
350                      p_process_type      => l_process_type,
351                      p_default_where     => l_default_where,
352                      p_tokenized_where   => l_cs_rec.collect_addtl_where_clause,
353                      p_comp_refresh      => l_comp_refresh,
354                      p_instance_id       => p_instance_id,
355                      p_parameter1        => p_parameter1,
356                      p_parameter2        => p_parameter2,
357                      p_parameter3        => p_parameter3,
358                      p_parameter4        => p_parameter4,
359                      p_parameter5        => p_parameter5,
360                      p_parameter6        => p_parameter6,
361                      p_parameter7        => p_parameter7,
362                      p_parameter8        => p_parameter8,
363                      p_parameter9        => p_parameter9,
364                      p_parameter10       => p_parameter10,
365                      p_request_id        => l_conc_request_id);
366 
367        /* Custom Steam Collection Post Process
368           After data has been collected from source to staging */
369        cs_collect_post_process(l_cs_rec,
370                            l_cs_name,
371                            p_instance_id);
372 
373           /* Pull
374               Set Source and Target
375               Set Global var for  processing error record and marking processed */
376           l_target := 'MSD_CS_DATA';
377           l_source := 'MSD_ST_CS_DATA';
378           l_process_type := C_STAGE_TO_FACT;
379           l_default_where := Build_Designator_Where_Clause( l_cs_rec,
380                                                             l_process_type,
381                                                             p_cs_name);
382           /* Get a new seq number for pull part */
383           SELECT msd.msd_last_refresh_number_s.nextval into
384                  l_new_refresh_Num from dual;
385 
386           /* Refresh Target */
387           Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
388                          l_cs_name, p_comp_refresh, p_instance_id,
389                          l_new_refresh_num);
390           Process_1(
391                      p_cs_rec            => l_cs_rec,
392                      p_cs_name           => l_cs_name,
393                      p_db_link           => l_dblink,
394                      p_source_view       => l_source,
395                      p_process_type      => C_STAGE_TO_FACT,
396                      p_target_table      => l_target,
397                      p_default_where     => l_default_where,
398                      p_tokenized_where   => NULL,
399                      p_comp_refresh      => p_comp_refresh,
400                      p_instance_id       => p_instance_id,
401                      p_parameter1        => p_parameter1,
402                      p_parameter2        => p_parameter2,
403                      p_parameter3        => p_parameter3,
404                      p_parameter4        => p_parameter4,
405                      p_parameter5        => p_parameter5,
406                      p_parameter6        => p_parameter6,
407                      p_parameter7        => p_parameter7,
408                      p_parameter8        => p_parameter8,
409                      p_parameter9        => p_parameter9,
410                      p_parameter10       => p_parameter10,
411                      p_new_refresh_num   => l_new_refresh_num,
412                      p_request_id        => l_conc_request_id);
413 
414       ELSIF (l_single_step_collection = 'N' and p_validate_data = 'Y') THEN
415           /*
416             Set Source and Target
417             Set Global var for  processing error record and marking processed
418           */
419           l_target := 'MSD_ST_CS_DATA';
420           l_process_type := C_SOURCE_TO_STAGE;
421           l_source := l_cs_rec.source_view_name;
422           l_default_where := Build_Designator_Where_Clause( l_cs_rec,
423                                                             l_process_type,
424                                                             p_cs_name);
425           Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
426                          l_cs_name, p_comp_refresh, p_instance_id,
427                          l_new_refresh_num);
428           Process_1(
429                      p_cs_rec            => l_cs_rec,
430                      p_cs_name           => l_cs_name,
431                      p_db_link           => l_dblink,
432                      p_source_view       => l_source,
433                      p_target_table      => l_target,
434                      p_process_type      => l_process_type,
435                      p_default_where     => l_default_where,
436                      p_tokenized_where   => l_cs_rec.collect_addtl_where_clause,
437                      p_comp_refresh      => p_comp_refresh,
438                      p_instance_id       => p_instance_id,
439                      p_parameter1        => p_parameter1,
440                      p_parameter2        => p_parameter2,
441                      p_parameter3        => p_parameter3,
442                      p_parameter4        => p_parameter4,
443                      p_parameter5        => p_parameter5,
444                      p_parameter6        => p_parameter6,
445                      p_parameter7        => p_parameter7,
446                      p_parameter8        => p_parameter8,
447                      p_parameter9        => p_parameter9,
448                      p_parameter10       => p_parameter10,
449                      p_new_refresh_num   => l_new_refresh_num,
450                      p_request_id        => l_conc_request_id);
451 
452        /* Custom Steam Collection Post Process
453           After data has been collected from source to staging
454        */
455           cs_collect_post_process(l_cs_rec,
456                                   l_cs_name,
457                                   p_instance_id);
458 
459       ELSIF (l_single_step_collection = 'Y' and p_validate_data = 'N') THEN
460           /* Invalid Option. Raise Error */
461           retcode := 2;
462           errbuf := 'Invalid option - Single Step Collection must perform Validation';
463           return;
464       ELSIF (l_single_step_collection = 'N' and p_validate_data = 'N') THEN
465           /* Collect into staging without Validation */
466           l_target := 'MSD_ST_CS_DATA';
467           l_process_type := C_SOURCE_TO_STAGE;
468           l_source := l_cs_rec.source_view_name;
469           l_default_where := Build_Designator_Where_Clause( l_cs_rec,
470                                                             l_process_type,
471                                                             p_cs_name);
472           Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
473                          l_cs_name, p_comp_refresh, p_instance_id,
474                          l_new_refresh_num);
475           Process_2(
476                      p_cs_rec            => l_cs_rec,
477                      p_cs_name           => l_cs_name,
478                      p_db_link           => l_dblink,
479                      p_source_view       => l_source,
480                      p_target_table      => l_target,
481                      p_process_type      => l_process_type,
482                      p_default_where     => l_default_where,
483                      p_tokenized_where   => l_cs_rec.collect_addtl_where_clause,
484                      p_comp_refresh      => p_comp_refresh,
485                      p_instance_id       => p_instance_id,
486                      p_parameter1        => p_parameter1,
487                      p_parameter2        => p_parameter2,
488                      p_parameter3        => p_parameter3,
489                      p_parameter4        => p_parameter4,
490                      p_parameter5        => p_parameter5,
491                      p_parameter6        => p_parameter6,
492                      p_parameter7        => p_parameter7,
493                      p_parameter8        => p_parameter8,
494                      p_parameter9        => p_parameter9,
495                      p_parameter10       => p_parameter10,
496                      p_request_id        => l_conc_request_id);
497 
498        /* Custom Steam Collection Post Process
499           After data has been collected from source to staging
500        */
501           cs_collect_post_process(l_cs_Rec,
502                                   l_cs_name,
503                                   p_instance_id);
504 
505 
506       END IF;  /* End of ELSE IF */
507 
508   /*---------------------  For Pull ----------------------------*/
509   ELSIF (p_collection_type = C_PULL) THEN
510      IF p_validate_data = 'Y' THEN
511         /*
512          Set Source and Target
513          Set Global var for  processing error record and marking processed
514         */
515         l_target := 'MSD_CS_DATA';
516         l_source := 'MSD_ST_CS_DATA';
517         l_process_type := C_STAGE_TO_FACT;
518         l_default_where := Build_Designator_Where_Clause(
519             l_cs_rec        ,
520             l_process_type  ,
521             p_cs_name       );
522 
523         /* Get a new seq number for PULL part  */
524         SELECT msd.msd_last_refresh_number_s.nextval into
525                l_new_refresh_Num from dual;
526 
527         /* Refresh Target */
528         Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
529                        l_cs_name,p_comp_refresh, p_instance_id,
530                        l_new_refresh_num);
531 
532         Process_1(
533             p_cs_rec            => l_cs_rec,
534             p_cs_name           => l_cs_name,
535             p_db_link           => l_dblink,
536             p_source_view       => l_source,
537             p_process_type      => C_STAGE_TO_FACT,
538             p_target_table      => l_target,
539             p_default_where     => l_default_where,
540             p_tokenized_where   => NULL,
541             p_comp_refresh      => p_comp_refresh,
542             p_instance_id       => p_instance_id,
543             p_parameter1        => p_parameter1,
544             p_parameter2        => p_parameter2,
545             p_parameter3        => p_parameter3,
546             p_parameter4        => p_parameter4,
547             p_parameter5        => p_parameter5,
548             p_parameter6        => p_parameter6,
549             p_parameter7        => p_parameter7,
550             p_parameter8        => p_parameter8,
551             p_parameter9        => p_parameter9,
552             p_parameter10       => p_parameter10,
553             p_new_refresh_num   => l_new_refresh_num,
554             p_request_id        => l_conc_request_id);
555 
556      ELSE
557          /* Invalid Option. Raise Error;*/
558          retcode := 2;
559          errbuf := 'Invalid option - Pull must perform Validation';
560          return;
561      END IF;  /* End of  p_validate_data = 'Y'  */
562   END IF;  /* End of Collect or Pull */
563 
564 
565   IF (l_target = 'MSD_CS_DATA') THEN
566      /* Delete cs fact rows that are not used by any demand plans */
567      MSD_TRANSLATE_FACT_DATA.clean_fact_data( errbuf,
568                                               retcode,
569                                               l_target,
570 					      p_definition_id);
571   END IF;
572 
573 
574   retcode := g_retcode;
575   errbuf  := g_errbuf;
576 
577   commit;
578 
579 Exception
580     When others then
581         retcode := 2;
582         errbuf := substr( sqlerrm, 1, 255);
583         rollback;
584 End;
585 
586 Procedure Process_1 (
587     p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
588     p_cs_name           in  varchar2,
589     p_db_link           in  varchar2,
590     p_source_view       in  varchar2,
591     p_target_table      in  varchar2,
592     p_process_type      in  number,
593     p_default_where     in  varchar2,
594     p_tokenized_where   in  varchar2,
595     p_comp_refresh      in  varchar2,
596     p_instance_id       in  number,
597     p_parameter1        in  varchar2,
598     p_parameter2        in  varchar2,
599     p_parameter3        in  varchar2,
600     p_parameter4        in  varchar2,
601     p_parameter5        in  varchar2,
602     p_parameter6        in  varchar2,
603     p_parameter7        in  varchar2,
604     p_parameter8        in  varchar2,
605     p_parameter9        in  varchar2,
606     p_parameter10       in  varchar2,
607     p_new_refresh_num   IN  NUMBER,
608     p_request_id        in  number) is
609 
610     TYPE cur_type is REF CURSOR;
611 
612     l_cur       cur_type;
613     l_rec       msd_cs_dfn_utl.G_TYP_SOURCE_STREAM;
614     l_valid     boolean;
615     l_err_msg   varchar2(1000);
616     l_sql_stmt  varchar2(5000);
617     l_where     varchar2(3000);
618 
619 Begin
620 debug_line('In Process_1');
621     /*
622      This process does following
623      1. Fetches data using cursor (source/staging)
624      2. Validates data
625      3. If Error
626           3.1 Mark/Save Erroneous data in staging
627         else
628           3.100 Save in Target (Fact/Staging)
629           3.101 Mark record Processed
630         end if
631     */
632 
633     l_sql_stmt := Build_SQL_FOR_COLLECT_AND_VAL
634         (p_cs_rec.cs_definition_id, p_process_type,
635          p_source_view, p_db_link, p_cs_name);
636 
637     l_sql_stmt :=  l_sql_stmt || ' WHERE 1 = 1';
638 
639     l_where := build_where_clause(
640                                    p_tokenized_where   ,
641                                    p_default_where     ,
642                                    p_parameter1        ,
643                                    p_parameter2        ,
644                                    p_parameter3        ,
645                                    p_parameter4        ,
646                                    p_parameter5        ,
647                                    p_parameter6        ,
648                                    p_parameter7        ,
649                                    p_parameter8        ,
650                                    p_parameter9        ,
651                                    p_parameter10       ,
652                                    p_request_id      );
653 
654     if l_where is not null then
655         l_sql_stmt := l_sql_stmt || ' AND ' || l_where;
656     end if;
657 
658     /* DWK.  Do not include instace = 0 into fact table when we PULL data */
659     IF (p_process_type =  C_STAGE_TO_FACT) THEN
660 	l_sql_stmt := l_sql_stmt ||
661 		' AND ' || 'attribute_1 <> 0';
662     END IF;
663 
664 debug_line('length for l_sql_stmt :' || length(l_sql_stmt));
665 debug_line('length for l_where :' || length(l_where));
666 debug_line('before debug line');
667 debug_line(l_sql_stmt);
668 debug_line('after debug line');
669 
670     /* Use Dynamic SQL to fetch and process rows */
671     Process_1_Sub (
672                      p_cs_rec        ,
673                      p_cs_name       ,
674                      p_source_view   ,
675                      p_target_table  ,
676                      p_instance_id   ,
677                      l_sql_stmt,
678                      p_new_refresh_num);
679 
680     /* Delete Successfully processed Staging rows if the process was Staging to Fact */
681     /* DWK  Don't delete any row with instance = 0 */
682     /* Also, removed cs_name = p_cs_name condition from WHERE clause */
683 
684     IF p_process_type = C_STAGE_TO_FACT THEN
685         delete from MSD_ST_CS_DATA
686         where
687             cs_definition_id = p_cs_rec.cs_definition_id and
688      	    process_Status = C_LOG_PROCESSED and
689 	    attribute_1 <> '0';
690     END IF;
691 
692 Exception
693 When others then
694     show_line(sqlerrm);
695     raise;
696 
697 End;
698 
699 
700 Procedure cs_collect_post_process (
701         p_cs_rec        in msd_cs_definitions_v1%rowtype,
702         p_cs_name       in varchar2,
703         p_instance_id   in varchar2 ) is
704 
705     cursor c1 is
706         select 'Y'
707         from msd_st_cs_data
708         where cs_definition_id = p_cs_rec.cs_definition_id
709         and cs_name = p_cs_name
710         and attribute_1 = p_instance_id
711         and attribute_49 = '1'
712         and rownum < 2;
713 
714         l_exists varchar2(10):='N';
715 
716 Begin
717 
718     /* Is this Sales Forecast Stream */
719     if p_cs_rec.name in (
720         'MSD_SALES_FCST_BESTCASE', 'MSD_SALES_FCST_PIPELINE',
721         'MSD_SALES_FCST_REALISTIC', 'MSD_SALES_FCST_WGTPLINE',
722         'MSD_SALES_FCST_WORSTCASE' ) then
723 
724         open c1;
725         fetch c1 into l_exists;
726         close c1;
727 
728         If l_exists = 'Y' then
729            delete from msd_st_cs_data
730            where cs_definition_id = p_cs_Rec.cs_definition_id
731            and cs_name = p_cs_name
732            and attribute_1 = p_instance_id
733            and attribute_49 = '2';
734 
735         end if;
736 
737     end if;
738 
739     /* Collect Current On-Hand Inventory data from ODS table for SOP data stream */
740 
741     if p_cs_rec.name = 'MSD_ONHAND_INVENTORY' then
742 
743         insert into msd_st_cs_data (
744            CS_ST_DATA_ID,
745            CS_DEFINITION_ID,
746            CS_NAME,
747            ATTRIBUTE_1,
748            ATTRIBUTE_2,
749            ATTRIBUTE_3,
750            ATTRIBUTE_6,
751            ATTRIBUTE_7,
752            ATTRIBUTE_10,
753            ATTRIBUTE_11,
754            ATTRIBUTE_34,
755            ATTRIBUTE_41,
756            ATTRIBUTE_43,
757            ATTRIBUTE_50,
758            ATTRIBUTE_51,
759            CREATION_DATE,
760            CREATED_BY,
761            LAST_UPDATE_DATE,
762            LAST_UPDATED_BY,
763            LAST_UPDATE_LOGIN
764            )
765          select msd_st_cs_data_s.nextval,
766                 to_char(p_cs_rec.cs_definition_id),
767                 'SINGLE_STREAM',
768                 to_char(inv.sr_instance_id),
769                 inv.prd_level_id,
770                 inv.prd_sr_level_pk,
771                 inv.geo_level_id,
772                 inv.geo_sr_level_pk,
773                 inv.org_level_id,
774                 inv.org_sr_level_pk,
775                 inv.time_level_id,
776                 to_char(inv.quantity),
777                 to_char(sysdate, 'YYYY/MM/DD'),
778                 inv.dcs_level_id,
779                 inv.dcs_sr_level_pk,
780                 to_char(sysdate),
781                 to_char(fnd_global.user_id),
782                 to_char(sysdate),
783                 to_char(fnd_global.user_id),
784                 to_char(fnd_global.login_id)
785          from msd_curr_onhand_inventory_v inv
786          where inv.sr_instance_id = p_instance_id;
787 
788     end if;
789 
790 Exception
791 When others then
792     show_line(sqlerrm);
793     raise;
794 End;
795 
796 
797 Procedure log_error (
798     crec_data       in msd_cs_dfn_utl.g_typ_source_stream,
799     p_cs_rec        in msd_cs_definitions_v1%rowtype,
800     p_cs_name       in varchar2,
801     p_instance_id   in varchar2,
802     p_error_message in varchar2,
803     p_source_view   in varchar2,
804     p_target_table  in varchar2) is
805 Begin
806     /*
807      Error Logging depends on source and target.
808     */
809     debug_line('In Log Error');
810     if (p_target_table = 'MSD_CS_DATA' and p_source_view <> 'MSD_ST_CS_DATA') or
811        (p_target_table = 'MSD_ST_CS_DATA') then
812         /*
813          if data is collected directly from source to Fact table or
814          data is collected into staging table then
815          insert erroneous row in staging table with Status "Error"
816         */
817         ins_row_staging(crec_data, p_cs_rec, p_cs_name,
818                        nvl(p_instance_id, crec_data.instance),
819                        C_LOG_ERROR,
820                        p_error_message);
821     else
822         /* i.e. Data is Pulled from Staging to Fact. Then update the staging row
823            with Status 'invalid'
824         */
825         upd_stage_error(crec_data.pk_id, C_LOG_ERROR, p_error_message);
826     end if;
827 
828 Exception
829 When others then
830     show_line(sqlerrm);
831     raise;
832 
833 End;
834 
835 Procedure upd_stage_error (p_pk_id in number, p_process_status in varchar2, p_error_mesg in varchar2) is
836 Begin
837     debug_line('In upd_stage_error');
838     update msd_st_cs_data
839     set
840         error_desc = p_error_mesg,
841         process_status = p_process_status
842     where cs_st_data_id = p_pk_id;
843 
844 Exception
845 When others then
846     show_line(sqlerrm);
847     raise;
848 
849 End;
850 
851 Procedure log_processed (
852     crec_data       in msd_cs_dfn_utl.g_typ_source_stream,
853     p_cs_rec        in msd_cs_definitions_v1%rowtype,
854     p_cs_name       in varchar2,
855     p_instance_id   in varchar2,
856     p_source_view   in varchar2,
857     p_target_table  in varchar2) is
858 Begin
859     debug_line('In log_processed');
860     /* Process Logging depends on source and target.
861     */
862     if (p_target_table = 'MSD_CS_DATA' and p_source_view <> 'MSD_ST_CS_DATA') or
863        (p_target_table = 'MSD_ST_CS_DATA') then
864         /*
865          if data is collected directly from source to Fact table or
866          data is collected into staging table then
867          Processing can not be logged or is not yet done
868         */
869         null;
870     else
871         /* i.e. Data is Pulled from Staging to Fact. Then update the staging row
872          with Status PROCESSED
873        */
874         upd_stage_error(crec_data.pk_id, C_LOG_PROCESSED, null);
875     end if;
876 
877     if p_target_table = 'MSD_CS_DATA' then
878 
879         null;
880     end if;
881 
882 Exception
883 When others then
884     show_line(sqlerrm);
885     raise;
886 
887 End;
888 
889 Procedure ins_row_staging (
890     crec_data in  msd_cs_dfn_utl.g_typ_source_stream,
891     p_cs_rec        in msd_cs_definitions_v1%rowtype,
892     p_cs_name       in varchar2,
893     p_instance_id   in varchar2,
894     p_process_status in varchar2,
895     p_error_message in varchar2) is
896 Begin
897 --    debug_line('In ins_row_staging');
898     insert into msd_st_cs_data
899         (cs_st_data_id, cs_definition_id, cs_name,
900          attribute_1, attribute_2, attribute_3, attribute_4,
901          attribute_5, attribute_6, attribute_7, attribute_8, attribute_9,
902          attribute_10, attribute_11, attribute_12, attribute_13,
903          attribute_14, attribute_15, attribute_16, attribute_17,
904          attribute_18, attribute_19, attribute_20, attribute_21, attribute_22,
905          attribute_23, attribute_24, attribute_25, attribute_26, attribute_27,
906          attribute_28, attribute_29, attribute_30, attribute_31,
907          attribute_32, attribute_33, attribute_34, attribute_35, attribute_36,
908          attribute_37, attribute_38, attribute_39, attribute_40,
909          attribute_41, attribute_42, attribute_43, attribute_44, attribute_45,
910          attribute_46, attribute_47, attribute_48, attribute_49,
911          attribute_50, attribute_51, attribute_52, attribute_53, attribute_54,
912 	 attribute_55, attribute_56, attribute_57, attribute_58, attribute_59,
913 	 attribute_60,
914          process_status, error_desc,
915          created_by, creation_date, last_update_date, last_updated_by, last_update_login
916          )
917     values
918     /* Fix for designator name crec_data.designator instead of p_cs_name */
919         (msd_st_cs_data_s.nextval, p_cs_rec.cs_definition_id, crec_data.designator,
920          p_instance_id,
921          crec_data.prd_level_id, crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk,
922          crec_data.geo_level_id, crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk,
923          crec_data.org_level_id, crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk,
924          crec_data.prd_parent_level_id,    crec_data.prd_parent_sr_level_value_pk,
925          crec_data.prd_parent_level_value, crec_data.prd_parent_level_value_pk,
926          crec_data.rep_level_id, crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk,
927          crec_data.chn_level_id, crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk,
928          crec_data.ud1_level_id, crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk,
929          crec_data.ud2_level_id, crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk,
930          crec_data.tim_level_id, crec_data.attribute_35, crec_data.attribute_36, crec_data.attribute_37,
931          crec_data.attribute_38, crec_data.attribute_39, crec_data.attribute_40, crec_data.attribute_41,
932          crec_data.attribute_42, crec_data.attribute_43, crec_data.attribute_44, crec_data.attribute_45,
933          crec_data.attribute_46, crec_data.attribute_47, crec_data.attribute_48, crec_data.attribute_49,
934          crec_data.dcs_level_id, crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk,
935 	 crec_data.attribute_54, crec_data.attribute_55, crec_data.attribute_56, crec_data.attribute_57,
936 	 crec_data.attribute_58, crec_data.attribute_59, crec_data.attribute_60,
937          p_process_status, p_error_message,
938          fnd_global.user_id, sysdate, sysdate, fnd_global.user_id, fnd_global.login_id);
939 
940 Exception
941 When others then
942     show_line(sqlerrm);
943     raise;
944 
945 End;
946 
947 Procedure ins_row_fact(
948                        crec_data                in msd_cs_dfn_utl.g_typ_source_stream,
949                        p_cs_rec                 in msd_cs_definitions_v1%rowtype,
950                        p_cs_name                in varchar2,
951                        p_instance_id            in varchar2,
952                        p_new_refresh_num        in NUMBER) is
953 Begin
954     debug_line('In ins_row_fact');
955     insert into msd_cs_data
956         (cs_data_id, cs_definition_id, cs_name,
957          attribute_1, attribute_2, attribute_3, attribute_4,
958          attribute_5, attribute_6, attribute_7, attribute_8, attribute_9,
959          attribute_10, attribute_11, attribute_12, attribute_13,
960          attribute_14, attribute_15, attribute_16, attribute_17,
961          attribute_18, attribute_19, attribute_20, attribute_21, attribute_22,
962          attribute_23, attribute_24, attribute_25, attribute_26, attribute_27,
963          attribute_28, attribute_29, attribute_30, attribute_31,
964          attribute_32, attribute_33, attribute_34, attribute_35, attribute_36,
965          attribute_37, attribute_38, attribute_39, attribute_40,
966          attribute_41, attribute_42, attribute_43, attribute_44, attribute_45,
967          attribute_46, attribute_47, attribute_48, attribute_49,
968          attribute_50, attribute_51, attribute_52, attribute_53, attribute_54,
969 	 attribute_55, attribute_56, attribute_57, attribute_58, attribute_59,
970 	 attribute_60,
971          created_by, creation_date, last_update_date, last_updated_by,last_update_login,
972          created_by_refresh_num, last_refresh_num, action_code)
973     values
974         /* Fix for designator name crec_data.designator instead of p_cs_name */
975         (msd_cs_data_s.nextval, p_cs_rec.cs_definition_id, crec_data.designator ,
976          p_instance_id,
977          crec_data.prd_level_id, crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk,
978          crec_data.geo_level_id, crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk,
979          crec_data.org_level_id, crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk,
980          crec_data.prd_parent_level_id, crec_data.prd_parent_sr_level_value_pk,
981          crec_data.prd_parent_level_value, crec_data.prd_parent_level_value_pk,
982          crec_data.rep_level_id, crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk,
983          crec_data.chn_level_id, crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk,
984          crec_data.ud1_level_id, crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk,
985          crec_data.ud2_level_id, crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk,
986          crec_data.tim_level_id, crec_data.attribute_35, crec_data.attribute_36, crec_data.attribute_37,
987          crec_data.attribute_38, crec_data.attribute_39, crec_data.attribute_40, crec_data.attribute_41,
988          crec_data.attribute_42, crec_data.attribute_43, crec_data.attribute_44, crec_data.attribute_45,
989          crec_data.attribute_46, crec_data.attribute_47, crec_data.attribute_48, crec_data.attribute_49,
990          crec_data.dcs_level_id, crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk,
991 	 crec_data.attribute_54, crec_data.attribute_55, crec_data.attribute_56, crec_data.attribute_57,
992 	 crec_data.attribute_58, crec_data.attribute_59, crec_data.attribute_60,
993 	 fnd_global.user_id, sysdate, sysdate, fnd_global.user_id,fnd_global.login_id,
994          p_new_refresh_num, p_new_refresh_num, 'I');
995 
996 Exception
997 When others then
998     show_line(sqlerrm);
999     raise;
1000 
1001 End;
1002 
1003 Procedure Process_2 (
1004     p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
1005     p_cs_name           in  varchar2,
1006     p_db_link           in  varchar2,
1007     p_source_view       in  varchar2,
1008     p_target_table      in  varchar2,
1009     p_process_type      in  number,
1010     p_default_where     in  varchar2,
1011     p_tokenized_where   in  varchar2,
1012     p_comp_refresh      in  varchar2,
1013     p_instance_id       in  number,
1014     p_parameter1        in  varchar2,
1015     p_parameter2        in  varchar2,
1016     p_parameter3        in  varchar2,
1017     p_parameter4        in  varchar2,
1018     p_parameter5        in  varchar2,
1019     p_parameter6        in  varchar2,
1020     p_parameter7        in  varchar2,
1021     p_parameter8        in  varchar2,
1022     p_parameter9        in  varchar2,
1023     p_parameter10       in  varchar2,
1024     p_request_id          in  number) is
1025 
1026     l_ins_stmt  varchar2(32767);
1027     l_where     varchar2(500);
1028 
1029 
1030 Begin
1031     debug_line('In Process_2');
1032     /*
1033      This Procedure inserts data in staging table from source view
1034      without performing any validation.
1035     */
1036 
1037 
1038     l_ins_stmt := Build_SQL_INS_AS_SELECT(
1039         p_cs_definition_id => p_cs_rec.cs_definition_id,
1040         p_instance_id      => p_instance_id,
1041         p_cs_name          => p_cs_name,
1042         p_source_view      => p_source_view,
1043         p_db_link          => p_db_link);
1044 
1045 
1046     l_where := build_where_clause(
1047         p_tokenized_where   ,
1048         p_default_where     ,
1049         p_parameter1        ,
1050         p_parameter2        ,
1051         p_parameter3        ,
1052         p_parameter4        ,
1053         p_parameter5        ,
1054         p_parameter6        ,
1055         p_parameter7        ,
1056         p_parameter8        ,
1057         p_parameter9        ,
1058         p_parameter10       ,
1059         p_request_id );
1060 
1061 
1062     if l_where is not null then
1063         l_ins_stmt := l_ins_stmt || ' where ' || l_where;
1064     end if;
1065 
1066 
1067 
1068     /* Execute SQL */
1069     debug_line(l_ins_stmt);
1070 
1071     Execute immediate l_ins_stmt;
1072 
1073 Exception
1074 When others then
1075     show_line(sqlerrm);
1076     raise;
1077 
1078 End;
1079 
1080 Function Build_SQL_FOR_COLLECT_AND_VAL(
1081                                         p_cs_definition_id in number,
1082                                         p_process_type     in number,
1083                                         p_source_view      in varchar2,
1084                                         p_db_link          in varchar2,
1085                                         p_cs_name          in varchar2)
1086                                         return varchar2 is
1087 
1088     l_sql_stmt  varchar2(32767);
1089 Begin
1090     debug_line('In Build_SQL_FOR_COLLECT_AND_VAL');
1091     /*
1092      This method will be used in the following cases
1093       Source  ------ Staging (Process - Collect (Single Step = 'N'). Validation - 'Y')
1094       Source  ------ Fact    (Process - Collect (Single Step = 'Y'). Validation - 'Y')
1095       Staging ------ Fact    (Process - Pull    (Single Step = N/A). Validation - 'Y')
1096     */
1097     l_sql_stmt := Build_SQL_Source(p_cs_definition_id, p_process_type, NULL, p_cs_name);
1098     /*
1099      Append data specific to Single Step needs
1100     */
1101     if p_source_view = 'MSD_ST_CS_DATA' then
1102         l_sql_stmt := 'Select  cs_st_data_id PK_ID, ' || l_sql_stmt || ' from ' || p_source_view ;
1103     else
1104         l_sql_stmt := 'Select null pk_id, ' || l_sql_stmt || ' from ' || p_source_view || p_db_link;
1105     end if;
1106 
1107     return l_sql_stmt;
1108 
1109 
1110 Exception
1111 When others then
1112     show_line(sqlerrm);
1113     raise;
1114 
1115 End;
1116 
1117 Function Build_SQL_INS_AS_SELECT(
1118     p_cs_definition_id in number,
1119     p_instance_id      in varchar2,
1120     p_cs_name          in varchar2,
1121     p_source_view      in varchar2,
1122     p_db_link          in varchar2) return varchar2 is
1123 
1124     l_sql_stmt  varchar2(32767);
1125 Begin
1126     debug_line('In Build_SQL_INS_AS_SELECT');
1127     /*
1128      This method will be used in the following cases
1129       Source  ------ Staging (Process - Collect (Single Step = 'N'). Validation - 'N')
1130     */
1131     l_sql_stmt := Build_SQL_Source(p_cs_definition_id, C_SOURCE_TO_STAGE,
1132                                    p_instance_id, p_cs_name);
1133 
1134 /* DWK Move cs_name from top to at the bottom of insert statement since
1135    l_sql_stmt will have forecast_designator inside. */
1136     l_sql_stmt := 'Insert into MSD_ST_CS_DATA (cs_st_data_id , cs_definition_id, ' ||
1137         'attribute_1, attribute_2, attribute_3, attribute_4, attribute_5, '      ||
1138         'attribute_6, attribute_7, attribute_8, attribute_9, attribute_10,'      ||
1139         'attribute_11, attribute_12, attribute_13, attribute_14, attribute_15, ' ||
1140         'attribute_16, attribute_17, attribute_18, attribute_19, attribute_20, ' ||
1141         'attribute_21, attribute_22, attribute_23, attribute_24, attribute_25, ' ||
1142         'attribute_26, attribute_27, attribute_28, attribute_29, attribute_30,'  ||
1143         'attribute_31, attribute_32, attribute_33, attribute_34, attribute_35, ' ||
1144         'attribute_36, attribute_37, attribute_38, attribute_39, attribute_40,'  ||
1145         'attribute_41, attribute_42, attribute_43, attribute_44, attribute_45, ' ||
1146         'attribute_46, attribute_47, attribute_48, attribute_49, attribute_50, ' ||
1147         'attribute_51, attribute_52, attribute_53, attribute_54, attribute_55, ' ||
1148         'attribute_56, attribute_57, attribute_58, attribute_59, attribute_60,'  ||
1149         'cs_name ) ' || ' select ' || 'msd_st_cs_Data_s.nextval, ' || p_cs_definition_id ||
1150         ', ' ||  l_sql_stmt || ' from ' || p_source_view || p_db_link;
1151 
1152     return l_sql_stmt;
1153 
1154 
1155 Exception
1156 When others then
1157     show_line(sqlerrm);
1158     raise;
1159 
1160 End;
1161 
1162 Function Build_SQL_Source(
1163     p_cs_definition_id in number,
1164     p_process_type     in number,
1165     p_instance_id      in varchar2,
1166     p_cs_name          in varchar2) return varchar2 is
1167 
1168     Type l_type_sql_struct is RECORD (
1169         tabcol_name    varchar2(60),
1170         srccol_name    varchar2(60));
1171     Type l_type_sql_struct_array is TABLE of l_type_sql_struct;
1172 
1173     CURSOR c1 IS
1174     select * from msd_cs_defn_column_dtls_v
1175     where cs_definition_id = p_cs_definition_id;
1176 
1177     CURSOR c_multi_stream IS
1178     SELECT multiple_stream_flag FROM msd_cs_definitions
1179     WHERE cs_definition_id = p_cs_definition_id;
1180 
1181     CURSOR c_cs_name IS
1182     select source_view_column_name
1183     from msd_cs_defn_column_dtls_v
1184     where cs_definition_id = p_cs_definition_id and
1185           table_column = 'CS_NAME';
1186 
1187     l_struct l_type_sql_struct_array;
1188     l_sql_stmt       varchar2(32767);
1189     l_multi_stream   VARCHAR2(30);
1190 
1191 /*
1192     Function conv_to_sql_struct (a in varchar2, b in varchar2)
1193                                 return l_type_sql_struct is
1194         x l_type_sql_struct;
1195     Begin
1196         x.tabcol_name := a;
1197         x.srccol_name := b;
1198         return x;
1199     End;
1200 */
1201 
1202 Begin
1203     debug_line('In Build_SQL_Source');
1204     /* p_source_or_stage  = 0 menas build select for source view,
1205        p_source_or_stage  = non 0 menas build select for staging  */
1206     /* Initialize array */
1207 
1208     l_struct := l_type_sql_struct_array(null);
1209 
1210     /* Build Array with default values - Table_column_name is 'cs_name, attribute_1' ...
1211      and source_column_name is 'NULL' */
1212 
1213     for i in 1..60 loop
1214        l_struct.extend;
1215        l_struct(i).tabcol_name := 'ATTRIBUTE_' || i;
1216        l_struct(i).srccol_name := 'NULL';
1217     end loop;
1218     l_struct.extend;
1219     l_struct(61).tabcol_name := 'CS_NAME';
1220 
1221     /* Fetch source column name from the mappings table and update the array */
1222     for c1_rec in c1 loop
1223 
1224         for i IN 1..61 loop
1225             if l_struct(i).tabcol_name = c1_rec.table_column then
1226                 if c1_rec.identifier_type = 'INSTANCE' then
1227                     if p_instance_id is not null then
1228                         l_struct(i).srccol_name := '''' || p_instance_id || '''';
1229                     end if;
1230                 elsif c1_rec.identifier_type = 'DATE' then
1231                     if c1_rec.source_view_column_name  is not null then
1232                         l_struct(i).srccol_name := 'to_char(' ||c1_rec.source_view_column_name || ', ''YYYY/MM/DD'')';
1233                     end if;
1234                 else
1235                     if c1_rec.source_view_column_name  is not null then
1236                         l_struct(i).srccol_name := c1_rec.source_view_column_name ;
1237                     end if;
1238                 end if;
1239                 exit;
1240             end if;
1241         end loop;
1242     end loop;
1243 
1244     /* DWK If this stream is multiple stream and there is no column mapping
1245        for CS_NAME then, assume user will populate the CS_NAME
1246        from Collection */
1247 
1248     OPEN  c_multi_stream;
1249     FETCH c_multi_stream INTO l_multi_stream;
1250     CLOSE c_multi_stream;
1251 
1252     IF nvl(l_multi_stream, 'N') = 'Y' THEN
1253        /* After column mapping, if source column for CS_NAME is still null
1254           then  assume user will populate the CS_NAME  from Collection */
1255 
1256        IF ( l_struct(61).srccol_name IS NULL ) THEN
1257           l_struct(61).srccol_name := '''' || replace(p_cs_name, '''', '''''') || '''';
1258        END IF;
1259 
1260     ELSE   /* Single stream */
1261        l_struct(61).srccol_name := '''' || C_DEFAULT_STREAM_NAME || '''' ;
1262     END IF;
1263 
1264     /* Builds SQL stmt */
1265     for i in 1..61 loop
1266 
1267         if p_process_type in (C_SOURCE_TO_FACT, C_SOURCE_TO_STAGE) then
1268             if l_sql_stmt is null then
1269                 l_sql_stmt := l_sql_stmt || l_struct(i).srccol_name;
1270             else
1271                 l_sql_stmt := l_sql_stmt || ', ' || l_struct(i).srccol_name;
1272             end if;
1273         else
1274             /* staging to fact (assumption always for Validate = 'Yes'
1275                append staging table column name + column alias ("Source view name")
1276             */
1277             if l_sql_stmt is null then
1278                 l_sql_stmt := l_sql_stmt || l_struct(i).tabcol_name;
1279             else
1280                 l_sql_stmt := l_sql_stmt || ', ' || l_struct(i).tabcol_name ;
1281             end if;
1282         end if;
1283 
1284     end loop;
1285 
1286     debug_line('l_sql_stmt : ' || l_sql_stmt);
1287     return l_sql_stmt;
1288 
1289 Exception
1290 When others then
1291     show_line(sqlerrm);
1292     raise;
1293 
1294 End;
1295 
1296 Procedure show_line(p_sql in    varchar2) is
1297     i   number:=1;
1298 Begin
1299     while i <= length(p_sql)
1300     loop
1301  --     dbms_output.put_line (substr(p_sql, i, 255));
1302         fnd_file.put_line(fnd_file.log,substr(p_sql, i, 255));
1303 	null;
1304         i := i+255;
1305     end loop;
1306 End;
1307 
1308 Function validate_record (
1309     crec_data       in out NOCOPY  msd_cs_dfn_utl.g_typ_source_stream,
1310     p_cs_rec        in out NOCOPY  msd_Cs_definitions_v1%rowtype,
1311     p_instance_id   in     varchar2,
1312     p_err_mesg      out    NOCOPY  varchar2) return boolean is
1313 
1314     l_comments1         varchar2(1000);
1315     l_comments2         varchar2(1000);
1316     l_first_record      boolean:=TRUE;
1317     l_dummy_date        date;
1318     l_dummy_number      number;
1319 
1320     l_prd_found        varchar2(30);
1321     l_prd_parent_found varchar2(30);
1322     l_geo_found        varchar2(30);
1323     l_org_found        varchar2(30);
1324     l_chn_found        varchar2(30);
1325     l_rep_found        varchar2(30);
1326     l_ud1_found        varchar2(30);
1327     l_ud2_found        varchar2(30);
1328     l_tim_found        varchar2(30);
1329     l_dcs_found        varchar2(30);
1330 
1331     l_count            number(2) := 0;
1332 
1333 Begin
1334 
1335    /*  crec_data  -> actual record that you want to validate(ex, rows in staging table)
1336        p_cs_rec   -> information in custom stream definition
1337    */
1338 
1339 --    debug_line('In validate_record');
1340     /* Get Product LEVEL_PK */
1341     if nvl(p_cs_rec.prd_level_collect_flag, 'N') = 'Y' then
1342         l_prd_found := get_level_pk(p_instance_id, crec_data.prd_level_id,
1343                        crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk);
1344 
1345         IF ( crec_data.prd_parent_sr_level_value_pk IS NOT NULL or
1346              crec_data.prd_parent_level_value IS NOT NULL or
1347              crec_data.prd_parent_level_value_pk IS NOT NULL ) THEN
1348             /* DWK Get Product Dimension's Parent LEVEL_PK */
1349             l_prd_parent_found := get_level_pk(p_instance_id, crec_data.prd_parent_level_id,
1350                                       crec_data.prd_parent_sr_level_value_pk,
1351                                       crec_data.prd_parent_level_value,
1352                                       crec_data.prd_parent_level_value_pk);
1353 
1354         ELSE  /* IF there is no parent item then make it null */
1355             crec_data.prd_parent_level_id := NULL;
1356         END IF;
1357     end if;
1358     /*  Get ORG LEVEL_PK */
1359     if nvl(p_cs_rec.org_level_collect_flag, 'N') = 'Y' then
1360         l_org_found := get_level_pk(p_instance_id, crec_data.org_level_id,
1361                        crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk);
1362     end if;
1363     /* Get Geo LEVEL_PK */
1364     if nvl(p_cs_rec.geo_level_collect_flag, 'N') = 'Y' then
1365         l_geo_found := get_level_pk(p_instance_id, crec_data.geo_level_id,
1366                        crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk);
1367     end if;
1368 
1369     /* Get CHN LEVEL_PK */
1370     if nvl(p_cs_rec.chn_level_collect_flag, 'N') = 'Y' then
1371         l_chn_found := get_level_pk(p_instance_id, crec_data.chn_level_id,
1372                        crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk);
1373     end if;
1374     /* Get REP LEVEL_PK */
1375     if nvl(p_cs_rec.rep_level_collect_flag, 'N') = 'Y' then
1376         l_rep_found := get_level_pk(p_instance_id, crec_data.rep_level_id,
1377                        crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk);
1378     end if;
1379     /* Get UD1 LEVEL_PK */
1380     if nvl(p_cs_rec.ud1_level_collect_flag, 'N') = 'Y' then
1381         l_ud1_found := get_level_pk(p_instance_id, crec_data.ud1_level_id,
1382                        crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk);
1383     end if;
1384     /* Get UD2 LEVEL_PK */
1385     if nvl(p_cs_rec.ud2_level_collect_flag, 'N') = 'Y' then
1386         l_ud2_found := get_level_pk(p_instance_id, crec_data.ud2_level_id,
1387                        crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk);
1388     end if;
1389 
1390     /* Get Demand Class LEVEL_PK */
1391     if nvl(p_cs_rec.dcs_level_collect_flag, 'N') = 'Y' then
1392         l_dcs_found := get_level_pk(p_instance_id, crec_data.dcs_level_id,
1393                        crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk);
1394     end if;
1395 
1396     select
1397         decode(l_prd_found, g_level_pk_not_found, 'PRD ', null) ||
1398         /* DWK Check level pk of parent item for dependent demand data */
1399         decode(l_prd_parent_found, g_level_pk_not_found, 'PRD_PARENT ', null) ||
1400         decode(l_org_found, g_level_pk_not_found, 'ORG ', null) ||
1401         decode(l_geo_found, g_level_pk_not_found, 'GEO ', null) ||
1402         decode(l_chn_found, g_level_pk_not_found, 'CHN ', null) ||
1403         decode(l_rep_found, g_level_pk_not_found, 'REP ', null) ||
1404         decode(l_ud1_found, g_level_pk_not_found, 'UD1 ', null) ||
1405         decode(l_ud2_found, g_level_pk_not_found, 'UD2 ', null) ||
1406         decode(l_dcs_found, g_level_pk_not_found, 'DCS ', null)
1407     into
1408         l_comments2
1409     from
1410         dual;
1411 
1412     /* Level validation */
1413 
1414     if nvl(p_cs_rec.strict_flag, 'N') = 'Y' then
1415         /* if level_id is not defined at the definition level then the level_id
1416            of first record fetched will be used for validation
1417         */
1418         if l_first_record then
1419 
1420             l_first_record := FALSE;
1421 /* New */
1422             if p_cs_rec.prd_level_id is null and nvl(p_cs_rec.prd_level_collect_flag, 'N') = 'Y' then
1423                 p_cs_rec.prd_level_id := crec_data.prd_level_id;
1424             end if;
1425 
1426             if p_cs_rec.org_level_id is null and nvl(p_cs_rec.org_level_collect_flag, 'N') = 'Y' then
1427                 p_cs_rec.org_level_id := crec_data.org_level_id;
1428             end if;
1429 
1430             if p_cs_rec.geo_level_id is null and nvl(p_cs_rec.geo_level_collect_flag, 'N') = 'Y' then
1431                 p_cs_rec.geo_level_id := crec_data.geo_level_id;
1432             end if;
1433 
1434             if p_cs_rec.chn_level_id is null  and nvl(p_cs_rec.chn_level_collect_flag, 'N') = 'Y' then
1435                 p_cs_rec.chn_level_id := crec_data.chn_level_id;
1436             end if;
1437 
1438             if p_cs_rec.rep_level_id is null and nvl(p_cs_rec.rep_level_collect_flag, 'N') = 'Y' then
1439                 p_cs_rec.rep_level_id := crec_data.rep_level_id;
1440             end if;
1441 
1442             if p_cs_rec.ud1_level_id is null and nvl(p_cs_rec.ud1_level_collect_flag, 'N') = 'Y' then
1443                 p_cs_rec.ud1_level_id := crec_data.ud1_level_id;
1444             end if;
1445 
1446             if p_cs_rec.ud2_level_id is null and nvl(p_cs_rec.ud2_level_collect_flag, 'N') = 'Y' then
1447                 p_cs_rec.ud2_level_id := crec_data.ud2_level_id;
1448             end if;
1449 
1450             if p_cs_rec.tim_level_id is null and nvl(p_cs_rec.tim_level_collect_flag, 'N') = 'Y' then
1451                 /* Attribute_34 is tim_level_id */
1452                 p_cs_rec.tim_level_id := crec_data.tim_level_id;
1453             end if;
1454 
1455             if p_cs_rec.dcs_level_id is null and nvl(p_cs_rec.dcs_level_collect_flag, 'N') = 'Y' then
1456                 p_cs_rec.dcs_level_id := crec_data.dcs_level_id;
1457             end if;
1458 
1459         end if;
1460 
1461         Select
1462             decode(crec_data.prd_level_id,
1463                    null, decode(p_cs_rec.prd_level_collect_flag,
1464                                 'Y', 'PRD ',
1465                                  null),
1466                    p_cs_rec.prd_level_id, null,
1467                    'PRD ')  ||
1468             /* DWK  IF dependent demand data are collected, its parents level id should be 1 */
1469             decode(nvl(crec_data.prd_parent_level_id, '1'), '1', null, 'PRD_PARENT ') ||
1470             decode(crec_data.org_level_id,
1471                    null, decode(p_cs_rec.org_level_collect_flag,
1472                                 'Y', 'ORG ',
1473                                  null),
1474                    p_cs_rec.org_level_id, null,
1475                    'ORG ')  ||
1476             decode(crec_data.geo_level_id,
1477                    null, decode(p_cs_rec.geo_level_collect_flag,
1478                                 'Y', 'GEO ',
1479                                  null),
1480                    p_cs_rec.geo_level_id, null,
1481                    'GEO ')  ||
1482             decode(crec_data.rep_level_id,
1483                    null, decode(p_cs_rec.rep_level_collect_flag,
1484                                 'Y', 'REP ',
1485                                  null),
1486                    p_cs_rec.rep_level_id, null,
1487                    'REP ')  ||
1488             decode(crec_data.chn_level_id,
1489                    null, decode(p_cs_rec.chn_level_collect_flag,
1490                                 'Y', 'CHN ',
1491                                  null),
1492                    p_cs_rec.chn_level_id, null,
1493                    'CHN ')  ||
1494             decode(crec_data.ud1_level_id,
1495                    null, decode(p_cs_rec.ud1_level_collect_flag,
1496                                 'Y', 'UD1 ',
1497                                  null),
1498                    p_cs_rec.ud1_level_id, null,
1499                    'UD1 ')  ||
1500             decode(crec_data.ud2_level_id,
1501                    null, decode(p_cs_rec.ud2_level_collect_flag,
1502                                 'Y', 'UD2 ',
1503                                  null),
1504                    p_cs_rec.ud2_level_id, null,
1505                    'UD2 ')  ||
1506             decode(crec_data.tim_level_id,
1507                    null, decode(p_cs_rec.tim_level_collect_flag,
1508                                 'Y', 'TIM ',
1509                                  null),
1510                    p_cs_rec.tim_level_id, null,
1511                    'TIM ')  ||
1512             decode(crec_data.dcs_level_id,
1513                    null, decode(p_cs_rec.dcs_level_collect_flag,
1514                                 'Y', 'DCS ',
1515                                  null),
1516                    p_cs_rec.dcs_level_id, null,
1517                    'DCS ' )
1518         into
1519             l_comments1
1520         from  dual;
1521     ELSE  /* p_cs_rec.strict_flag = 'N' */
1522 
1523        /* Check whether that time level id exists in fnd lookup or not */
1524 
1525        IF ( nvl(p_cs_rec.tim_level_collect_flag, 'N') = 'Y') THEN
1526           select count(*) into l_count
1527           from fnd_lookup_values
1528           where lookup_type = 'MSD_PERIOD_TYPE' and
1529           nvl(crec_data.tim_level_id, '999.99') = lookup_code and
1530           rownum <= 1;
1531 
1532           IF ( l_count < 1 ) THEN
1533              select 'TIM' into l_comments1 from dual;
1534           END IF;
1535        END IF;
1536 
1537     END IF;
1538 
1539 
1540 
1541     /* MSD_CS_DATALOAD_INVALID_LVLID - Invalid Level ID for Dimensions    */
1542     /* MSD_CS_DATALOAD_INVALID_DIM     - Invalid Dimensions */
1543     select decode(l_comments2, null, null, 'MSD_CS_DATALOAD_INVALID_DIM : ' || l_comments2) ||
1544            decode(l_comments1, null, null, 'MSD_CS_DATALOAD_INVALID_LVLID : ' || l_comments1)
1545     into p_err_mesg
1546     from dual;
1547 
1548     /* Validate Date Format */
1549     Begin
1550         select to_date(crec_data.attribute_43, 'YYYY/MM/DD')
1551             into l_dummy_date
1552         from dual;
1553     Exception
1554     When others then
1555         p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_DATE_FORMAT : ATTRIBUTE_43';
1556     End;
1557 
1558     /* Validate Amount Number Format */
1559     Begin
1560       -- Check Amount
1561       if (p_cs_rec.measurement_type in (1,3,4)) then
1562 	  l_dummy_number := crec_data.attribute_42;
1563       end if;
1564     Exception
1565     When others then
1566        p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_42';
1567     End;
1568 
1569     /* Validate Quantity Number Format */
1570     Begin
1571       -- Check Quantity
1572       if (p_cs_rec.measurement_type in (2,4,5)) then
1573 	l_dummy_number := crec_data.attribute_41;
1574       end if;
1575     Exception
1576     When others then
1577        p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_41';
1578     End;
1579 
1580     /* Validate Price Number Format */
1581     Begin
1582       -- Check Price
1583       if (p_cs_rec.measurement_type in (3,5)) then
1584 	l_dummy_number := crec_data.attribute_44;
1585       end if;
1586     Exception
1587     When others then
1588        p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_44';
1589     End;
1590 
1591     if p_err_mesg is null then
1592         return TRUE;
1593     else
1594         return FALSE;
1595     end if;
1596 
1597 Exception
1598 When others then
1599     show_line(sqlerrm);
1600     raise;
1601 
1602 End;
1603 
1604 Function get_level_pk (
1605     p_instance          in varchar2,
1606     p_level_id          in number,
1607     p_sr_level_value_pk in OUT NOCOPY varchar2,
1608     p_level_value       in OUT NOCOPY varchar2,
1609     p_level_value_pk    in OUT NOCOPY varchar2) return varchar2 is
1610 
1611     Cursor c1 is
1612     select level_pk, level_value
1613     from
1614         msd_level_values
1615     where
1616         instance = p_instance and
1617         level_id = p_level_id and
1618         sr_level_pk = p_sr_level_value_pk;
1619 
1620     Cursor c2 is
1621     select level_pk
1622     from
1623         msd_level_values
1624     where
1625         instance = p_instance and
1626         level_id = p_level_id and
1627         level_value = p_level_value;
1628 
1629     Cursor c3 is
1630     select sr_level_pk, level_value
1631     from
1632         msd_level_values
1633     where
1634         instance = p_instance and
1635         level_id = p_level_id and
1636         level_pk = p_level_value_pk;
1637 
1638     l_level_pk  varchar2(255):=g_level_pk_not_found;
1639     l_level_val varchar2(2000);
1640 Begin
1641 
1642 --    debug_line('In get_level_pk');
1643     if p_level_id is null then
1644        return null;
1645        /* i.e. no data collected for dimension
1646        */
1647     end if;
1648 
1649     if p_instance is null or nvl(p_sr_level_value_pk, nvl(p_level_value, p_level_value_pk)) is null then
1650         /* insufficient parameters */
1651         /* l_level_pk := g_level_pk_not_found; */
1652         return null;
1653     else
1654         if p_sr_level_value_pk is not null then
1655             open c1;
1656             fetch c1 into l_level_pk, p_level_value;
1657             if c1%notfound then
1658                 l_level_pk := g_level_pk_not_found;
1659             end if;
1660             close c1;
1661         elsif p_level_value is not null then
1662             open c2;
1663             fetch c2 into l_level_pk;
1664             if c2%notfound then
1665                 l_level_pk := g_level_pk_not_found;
1666             end if;
1667             close c2;
1668         else /* p_level_value_pk is not null */
1669             open c3;
1670             fetch c3 into p_sr_level_value_pk, p_level_value;
1671             if c3%notfound then
1672                 l_level_pk := g_level_pk_not_found;
1673             else
1674                l_level_pk := p_level_value_pk;
1675             end if;
1676             close c3;
1677         end if;
1678 
1679     end if;
1680 
1681     if l_level_pk <> g_level_pk_not_found then
1682       p_level_value_pk := l_level_pk;
1683     else
1684         debug_line(' p_instance ' || p_instance || ' p_level_id ' || p_level_id ||
1685                ' p_sr_level_value_pk ' || p_sr_level_value_pk || ' p_level_value ' || p_level_value ||
1686                ' p_level_value_pk  ' || p_level_value_pk);
1687     end if;
1688 
1689     return l_level_pk;
1690 
1691 Exception
1692 When others then
1693     show_line(sqlerrm);
1694     raise;
1695 
1696 End;
1697 
1698 Function Build_Where_Clause (
1699     p_tokenized_where   in  varchar2,
1700     p_default_where     in  varchar2,
1701     p_parameter1        in  varchar2,
1702     p_parameter2        in  varchar2,
1703     p_parameter3        in  varchar2,
1704     p_parameter4        in  varchar2,
1705     p_parameter5        in  varchar2,
1706     p_parameter6        in  varchar2,
1707     p_parameter7        in  varchar2,
1708     p_parameter8        in  varchar2,
1709     p_parameter9        in  varchar2,
1710     p_parameter10       in  varchar2,
1711     p_request_id        in  number) return varchar2 is
1712 
1713     Type param_list_type is varray(10) of varchar2(255);
1714 
1715 
1716     l_para_list param_list_type;
1717     l_where     varchar2(3000);
1718 
1719     Procedure find_and_subst_param ( p_where_cond  in out NOCOPY varchar2,
1720                                      p_val             in varchar2,
1721                                      p_request_id      in number,
1722                                      p_para_num        in number) is
1723 
1724         start_pos number;
1725         end_pos   number;
1726         para_type varchar2(10);
1727 
1728         l_default_col            varchar2(300) := NULL;
1729         l_count                  number := 0;
1730 
1731         l_dblink                 varchar2(100) := NULL;
1732         l_retcode	         number := 0;
1733         l_multi_flag             varchar2(30) := 'N';
1734 
1735 --   'CHAR:Prompt_Name:ValueSet_Name:Remote_Yes_No:Multi_Yes_NO:Default_Column_Name_For_Multi'
1736 
1737     Begin
1738         debug_line('In find_and_subst_param');
1739         start_pos := instr(p_where_cond, '&&', 1);
1740         para_type := substr(p_where_cond, start_pos + 2, 7);
1741         end_pos   := instr(p_where_cond, '''', start_pos);
1742 
1743         if substr(upper(para_type), 1, 5) = 'CHAR:' then /* Character type */
1744            l_multi_flag :=
1745                nvl(upper(msd_cs_defn_utl2.get_char_property(p_where_cond, start_pos, end_pos, 4)), 'N');
1746 
1747            l_default_col := msd_cs_defn_utl2.get_char_property(p_where_cond, start_pos, end_pos, 5);
1748 
1749            IF l_multi_flag = 'Y' THEN
1750              /* If multi input parar then check whether user entered
1751                 any values for the multi input parameters */
1752               select count(1) into l_count from msd_cs_coll_parameters
1753                                     where conc_request_id = p_request_id and
1754                                     parameter_number = p_para_num;
1755               /* If user hasn't entered any multi input parameters then
1756                  use user specified default column name */
1757               IF (l_count = 0 AND l_default_col IS NOT NULL) THEN
1758                  p_where_cond := substr(p_where_cond, 1, start_pos - 2) ||
1759                                  l_default_col ||
1760                                  substr(p_where_cond, end_pos + 1);
1761               ELSE
1762                  p_where_cond :=  substr(p_where_cond, 1, start_pos - 2) ||
1763                                ' (SELECT parameter_code FROM msd_cs_coll_parameters ' ||
1764                                ' WHERE conc_request_id = ' || p_request_id ||
1765                                ' AND parameter_number = ' || p_para_num || ' ) ' ||
1766                                substr(p_where_cond, end_pos + 1);
1767               END IF;
1768            ELSE
1769               p_where_cond := substr(p_where_cond, 1, start_pos - 1) ||
1770                               replace(p_val, '''', '''''') ||
1771                               substr(p_where_cond, end_pos);
1772            END IF;
1773         elsif substr(upper(para_type), 1, 7) = 'NUMBER:' then /* Number type*/
1774             p_where_cond := substr(p_where_cond, 1, start_pos - 2) || p_val ||
1775                             substr(p_where_cond, end_pos + 1);
1776         elsif substr(upper(para_type), 1, 5) = 'DATE:' then /* Date type */
1777             p_where_cond := substr(p_where_cond, 1, start_pos - 2) || 'to_date(''' || p_val || ''', ''YYYYMMDD'')' ||
1778                             substr(p_where_cond, end_pos + 1);
1779         end if;
1780 
1781 debug_line(p_where_cond);
1782 
1783     End;
1784 
1785 
1786     Procedure substitute_parameter (
1787         p_where_cond    in out NOCOPY varchar2,
1788         p_param_list    in     param_list_type,
1789         p_request_id    in     number) is
1790 
1791         i number := 1;
1792     Begin
1793         debug_line('In substitute_parameter');
1794        /* DP-CRM Code changes by easwaran */
1795         while (i < 11 )
1796         loop
1797             find_and_subst_param( p_where_cond, p_param_list(i), p_request_id, i);
1798             i := i + 1;
1799         end loop;
1800 
1801     End;
1802 
1803     Procedure make_para_list(
1804         p_parameter1    in     varchar2,
1805         p_parameter2    in     varchar2,
1806         p_parameter3    in     varchar2,
1807         p_parameter4    in     varchar2,
1808         p_parameter5    in     varchar2,
1809         p_parameter6    in     varchar2,
1810         p_parameter7    in     varchar2,
1811         p_parameter8    in     varchar2,
1812         p_parameter9    in     varchar2,
1813         p_parameter10   in     varchar2,
1814         p_para_list     in out NOCOPY param_list_type) is
1815     Begin
1816         debug_line('In make_para_list');
1817         p_para_list := param_list_type (p_parameter1, p_parameter2, p_parameter3, p_parameter4, p_parameter5,
1818                                         p_parameter6, p_parameter7, p_parameter8, p_parameter9, p_parameter10);
1819 
1820     End;
1821 
1822 
1823 Begin
1824     debug_line('In Build_Where_Clause');
1825     if p_tokenized_where is not null then
1826        /*
1827          convert parameters into an array.
1828        */
1829         make_para_list( p_parameter1, p_parameter2, p_parameter3, p_parameter4, p_parameter5,
1830                         p_parameter6, p_parameter7, p_parameter8, p_parameter9, p_parameter10,
1831                         l_para_list);
1832 
1833         /* Build additional Where */
1834         l_where := p_tokenized_where;
1835 
1836         substitute_parameter ( l_where, l_para_list, p_request_id);
1837 
1838     end if;
1839     if l_where is not null then
1840         if p_default_where is not null then
1841             l_where := p_default_where || ' and ' || l_where;
1842         end if;
1843     else
1844         l_where := p_default_where;
1845     end if;
1846 
1847     return l_where;
1848 
1849 Exception
1850 When others then
1851     show_line(sqlerrm);
1852     raise;
1853 
1854 End;
1855 
1856 Procedure Refresh_Target(
1857                           p_process_type      in varchar2,
1858                           p_cs_definition_id  in number,
1859                           p_cs_name           in varchar2,
1860                           p_comp_refresh      in varchar2,
1861                           p_instance_id       in number,
1862                           p_new_refresh_num   in NUMBER) is
1863 
1864     l_sql_stmt  varchar2(2000);
1865 
1866     cursor C_GET_DEL_CRIT is
1867     select distinct attribute_1 instance, cs_name
1868     from msd_st_cs_data
1869     where cs_definition_id = p_cs_definition_id and
1870           cs_name = nvl(p_cs_name, cs_name);
1871 
1872     /* DWK  create a separe cursor to fetch instance in single stream case */
1873     cursor c_get_del_crit_single is
1874     select distinct attribute_1 instance
1875     from msd_st_cs_data
1876     where cs_definition_id = p_cs_definition_id;
1877 
1878     cursor c_multi_stream is
1879     select nvl(multiple_stream_flag,'N')
1880     from msd_cs_definitions
1881     where cs_definition_id = p_cs_definition_id;
1882 
1883     l_multi_flag  VARCHAR2(10);
1884 
1885 Begin
1886     debug_line('In refresh_target');
1887     if p_comp_refresh = 'Y' then
1888 
1889 /*        if p_process_type = C_SOURCE_TO_FACT then
1890             delete from msd_cs_data where cs_definition_id = p_cs_definition_id
1891                         and cs_name = nvl(p_cs_name, cs_name) and attribute_1 = nvl(p_instance_id, attribute_1);
1892 */
1893         IF p_process_type = C_SOURCE_TO_STAGE then
1894             delete from msd_st_cs_data where cs_definition_id = p_cs_definition_id
1895                         and cs_name = nvl(p_cs_name, cs_name) and attribute_1 = nvl(p_instance_id, attribute_1);
1896 
1897         elsif p_process_type = C_STAGE_TO_FACT then
1898             /* DWK  For single stream, ignore the CS_NAME column for refresh */
1899             open c_multi_stream;
1900             fetch c_multi_stream into l_multi_flag;
1901             close c_multi_stream;
1902 
1903             IF (l_multi_flag = 'Y') THEN
1904                For l_rec IN c_get_del_crit LOOP
1905                   UPDATE msd_cs_data
1906                   SET Action_code = 'D',
1907                       last_refresh_num = p_new_refresh_num
1908                   WHERE cs_definition_id = p_cs_definition_id and
1909                         cs_name = l_rec.cs_name and
1910                         attribute_1 = l_rec.instance and
1911                         action_code = 'I';
1912                END LOOP;
1913 
1914             ELSE    /* For single stream, ignore the cs_name in delete stmt */
1915                For l_rec IN c_get_del_crit_single LOOP
1916                   UPDATE msd_cs_data
1917                   SET Action_code = 'D',
1918                       last_refresh_num = p_new_refresh_num
1919                   WHERE cs_definition_id = p_cs_definition_id and
1920                         attribute_1 = l_rec.instance and
1921                         action_code = 'I';
1922 
1923                END LOOP;
1924             END IF;
1925 
1926         end if;   /* End of C_STAGE_TO_FACT */
1927     else /* Not Complete Refresh
1928           /* Delete data from staging table to avoid double couting when user runs
1929            collection source to stage without complete refresh checkbox checked
1930            This will make custom stream collection behaviour same as other
1931            collection (Bookking/Shipment)
1932          */
1933         IF p_process_type = C_SOURCE_TO_STAGE then
1934             delete from msd_st_cs_data
1935                 where cs_definition_id = p_cs_definition_id and
1936                       cs_name = nvl(p_cs_name, cs_name) and
1937                       attribute_1 = nvl(p_instance_id, attribute_1);
1938         END IF;
1939     end if;   /* End of p_comp_refresh Y */
1940 
1941 Exception
1942 When others then
1943     show_line(sqlerrm);
1944     raise;
1945 
1946 End;
1947 
1948 Procedure Process_1_Sub (
1949         p_cs_rec        in out NOCOPY  msd_cs_definitions_v1%rowtype,
1950         p_cs_name           in  varchar2,
1951         p_source_view       in  varchar2,
1952         p_target_table      in  varchar2,
1953         p_instance_id       in  number,
1954         p_sql_stmt          in  varchar2,
1955         p_new_refresh_num   IN  NUMBER) is
1956 
1957     TYPE cur_type is REF CURSOR;
1958     l_cur       cur_type;
1959     l_rec       msd_cs_dfn_utl.G_TYP_SOURCE_STREAM;
1960 
1961     l_valid     boolean;
1962     l_err_msg   varchar2(1000);
1963 
1964     l_success_rows  number:=0;
1965     l_error_rows    number:=0;
1966 
1967     /* Bug# 4349618  To commit in Batches */
1968     l_counter	    number:=0;
1969     l_commit_flag   number:=0;
1970 
1971 /* DWK */
1972    l_temp_designator         VARCHAR2(40) := NULL;
1973    l_temp_instance_id        NUMBER := NULL;
1974 
1975 Begin
1976     debug_line('In Process_1_Sub');
1977 
1978     open l_cur for p_sql_stmt;
1979     LOOP
1980         fetch l_cur into l_rec;
1981         exit when l_cur%notfound;
1982 
1983         l_valid := null;
1984         l_err_msg := null;
1985 
1986         debug_line('Validating ' || l_rec.pk_id);
1987 
1988 
1989         l_valid := validate_record (l_rec, p_cs_rec, nvl(l_rec.instance,p_instance_id), l_err_msg);
1990 
1991         IF l_valid THEN
1992             /* IMP : Instance is p_instance in case of Collect
1993                      and l_rec.instance in case of PULLL i.e. from the staging
1994 		     table */
1995             IF  (p_target_table = 'MSD_CS_DATA') THEN
1996                 ins_row_fact(l_rec, p_cs_rec, l_rec.designator,
1997                              nvl(p_instance_id, l_rec.instance),
1998                              p_new_refresh_num);
1999 
2000                 /* Insert designator into headers talbe when designator get modified. */
2001 		IF ( l_rec.designator <> nvl(l_temp_designator,'-99999999~!@') OR
2002 		   nvl(p_instance_id,l_rec.instance) <> nvl(l_temp_instance_id,-99999999) ) THEN
2003 		   l_temp_designator  := l_rec.designator;
2004 		   l_temp_instance_id := nvl(p_instance_id,l_rec.instance);
2005 
2006 		   /* DWK  Populate MSD_CS_DATA_HEADERS table after inserting rows
2007 		      into FACT table */
2008 		   insert_update_Into_Headers (	p_cs_rec.cs_definition_id,
2009 						l_rec.designator,
2010 						nvl(p_instance_id,l_rec.instance), p_new_refresh_num);
2011 		END IF;
2012 	    ELSE
2013                 ins_row_staging(l_rec, p_cs_rec, l_rec.designator, nvl(p_instance_id, l_rec.instance), null, null);
2014             END IF;
2015 
2016             /* Mark record Processed */
2017             log_processed(l_rec, p_cs_rec, l_rec.designator, nvl(p_instance_id, l_rec.instance), p_source_view, p_target_table);
2018 
2019             /* Count Success Rows */
2020             l_success_rows := l_success_rows + 1;
2021 
2022             /* Bug# 4349618  To commit in Batches */
2023             l_counter	:= l_counter + 1;
2024 
2025         ELSE   /* IF not Valid */
2026 
2027             /*  Log Error */
2028             log_error(l_rec, p_cs_rec, l_rec.designator,
2029                       nvl(p_instance_id, l_rec.instance),
2030                       l_err_msg, p_source_view, p_target_table);
2031             /* Count Erroneous Rows */
2032             l_error_rows := l_error_rows + 1;
2033 
2034             /* Bug# 4349618  To commit in Batches */
2035             l_counter	:= l_counter + 1;
2036 
2037         END IF;
2038 
2039         /* Bug# 4349618  To commit in Batches */
2040 	SELECT mod( l_counter, C_BATCH_SIZE)
2041 		INTO l_commit_flag
2042 		FROM dual;
2043 
2044 	IF l_commit_flag = 0 THEN
2045 		debug_line( 'Inside Process_1_Sub: commiting inside the loop.');
2046 		commit;
2047 	END IF;
2048 
2049     END LOOP;
2050 
2051     /* Bug$ 4349618  To commit in Batches*/
2052     debug_line( 'Inside Process_1_Sub: commiting after the loop ends.');
2053     commit;
2054 
2055 
2056    if l_error_rows > 0 then
2057         g_retcode := '1';
2058         g_errbuf := 'There were erroneous records in Collect/Pull.';
2059     end if;
2060 
2061     if l_success_rows = 0 and l_error_rows = 0then
2062         g_retcode := '1';
2063         g_errbuf := 'There were no rows fetched.';
2064     end if;
2065 
2066     /* Print Results */
2067 
2068     show_line('Valid Records   : ' || l_success_rows);
2069     show_line('Invalid Records : ' || l_error_rows);
2070 
2071     close l_cur;
2072 
2073 Exception
2074 When others then
2075     show_line(sqlerrm);
2076     show_line(p_sql_stmt);
2077     close l_cur;
2078     raise;
2079 End;
2080 
2081 Function Build_Designator_Where_Clause(
2082     p_cs_rec        in  msd_cs_definitions_v1%rowtype,
2083     p_process_type  in  varchar2,
2084     p_cs_name       in  varchar2) return varchar2 is
2085 
2086     Cursor C1 is
2087      select source_view_column_name
2088      from msd_cs_defn_column_dtls
2089      where
2090         cs_definition_id = p_cs_rec.cs_definition_id and
2091         table_column = 'CS_NAME';
2092 
2093     l_where_cond    varchar2(500);
2094     l_col_name      varchar2(60);
2095 
2096 Begin
2097     debug_Line('In Build_Designator_Where_Clause');
2098 
2099     /* Build filter for designator(cs_name) */
2100     if p_process_type in (C_STAGE_TO_FACT) then
2101 
2102       if p_cs_name is not null then
2103           l_where_cond := 'cs_name = ' || '''' || replace(p_cs_name, '''', '''''') || '''';
2104       end if;
2105     else
2106         if nvl(p_cs_rec.multiple_stream_flag, 'N') = 'Y' and p_cs_name is not null then
2107             open c1;
2108             fetch c1 into l_col_name;
2109             close c1;
2110 
2111             if l_col_name is null then
2112                 null;
2113                 /*Raise Error*/
2114             else
2115                 l_where_cond := l_col_name || ' = ' || '''' || replace(p_cs_name, '''', '''''') || '''';
2116             end if;
2117         end if;
2118     end if;
2119 
2120     /* Add Default Where */
2121     if p_process_type = C_STAGE_TO_FACT then
2122         if l_where_cond is not null then
2123             l_where_cond := l_where_cond || ' and cs_definition_id = ' || p_cs_rec.cs_definition_id;
2124         else
2125             l_where_cond := ' cs_definition_id = ' || p_cs_rec.cs_definition_id;
2126         end if;
2127     end if;
2128 
2129     return l_where_cond;
2130 
2131 Exception
2132 When others then
2133     show_line(sqlerrm);
2134     raise;
2135 
2136 End;
2137 
2138 Procedure debug_line(p_sql in    varchar2)is
2139 Begin
2140     if c_debug = 'Y' then
2141         show_line(p_sql);
2142     end if;
2143 End;
2144 
2145 /* DWK */
2146 /*************************************************************************************************
2147 PROCEDURE Insert_update_Into_Headers
2148 
2149 This procedure will decide whether insert cs_definition_id, cs_name, and instance into
2150 msd_cs_data_headers table or not and insert row if necessary.
2151 **************************************************************************************************/
2152 Procedure insert_update_Into_Headers (	p_cs_definition_id  in  number,
2153 					p_cs_name           in  varchar2,
2154 					p_instance_id       in  number,
2155                                         p_refresh_num       in number) is
2156 
2157 
2158 p_count    NUMBER:=0;
2159 
2160 BEGIN
2161 
2162    SELECT count(*) INTO p_count FROM msd_cs_data_headers_v1
2163    WHERE instance = p_instance_id AND
2164       cs_definition_id = p_cs_definition_id AND
2165       cs_name = p_cs_name;
2166 
2167    IF ( p_count = 0 ) THEN
2168       Insert_Data_Into_Headers (p_cs_definition_id,
2169 				p_cs_name,
2170 				p_instance_id,
2171                                 p_refresh_num);
2172    ELSE
2173 
2174       update msd_cs_data_headers
2175       set last_refresh_num = p_refresh_num
2176       where cs_definition_id = p_cs_definition_id
2177       and instance = p_instance_id
2178       and cs_name = p_cs_name;
2179 
2180    END IF;
2181 
2182 
2183 Exception
2184 When others then
2185     show_line(sqlerrm);
2186     raise;
2187 
2188 END insert_update_Into_Headers;
2189 
2190 
2191 /*************************************************************************************************
2192 PROCEDURE Insert_Data_Into_Headers
2193 
2194 This procedure will insert cs_definition_id, cs_name, and instance into
2195 msd_cs_data_headers table.
2196 **************************************************************************************************/
2197 Procedure Insert_Data_Into_Headers (	p_cs_definition_id  in  number,
2198 					p_cs_name           in  varchar2,
2199 					p_instance_id       in  number,
2200                                         p_refresh_num       in  number) is
2201 
2202 
2203 BEGIN
2204 
2205    INSERT INTO msd_cs_data_headers
2206 	(	CS_DATA_HEADER_ID,
2207 		INSTANCE,
2208 		CS_DEFINITION_ID,
2209 		CS_NAME,
2210 		LAST_UPDATE_DATE,
2211 		LAST_UPDATED_BY,
2212 		CREATION_DATE,
2213 		CREATED_BY,
2214 		LAST_UPDATE_LOGIN,
2215                 LAST_REFRESH_NUM
2216 	)
2217    VALUES (	msd_cs_data_headers_s.nextval,
2218 		p_instance_id,
2219 		p_cs_definition_id,
2220 		p_cs_name,
2221 		sysdate,
2222 		fnd_global.user_id,
2223 		sysdate,
2224 		fnd_global.user_id,
2225 		fnd_global.login_id,
2226                 p_refresh_num
2227 	);
2228 
2229 
2230 
2231 Exception
2232   When others then
2233     show_line('Error in inserting into MSD_CS_DATA_HEADERS');
2234     show_line(sqlerrm);
2235     raise;
2236 
2237 END Insert_Data_Into_Headers;
2238 
2239 End;