[Home] [Help]
PACKAGE BODY: APPS.MSD_CS_COLLECTION
Source
1 package body msd_cs_collection as
2 /* $Header: msdcsclb.pls 120.3 2010/04/30 09:02:55 lannapra ship $ */
3 /*
4 Constants
5 */
6 C_LOG_PROCESSED Constant varchar2(30) :='PROCESSED';
7 C_LOG_ERROR Constant varchar2(30) :='ERROR';
8 C_DEFAULT_STREAM_NAME Constant varchar2(30) := 'SINGLE_STREAM';
9 C_COLLECT Constant varchar2(10) := 'C';
10 C_PULL Constant varchar2(10) := 'P';
11 /* Debug */
12 C_DEBUG Constant varchar2(1) := 'N';
13 /* Bug# 4349618 To commit in Batches */
14 C_BATCH_SIZE Constant NUMBER := 30000;
15 /*
16 Process Types
17 */
18 C_SOURCE_TO_STAGE Constant NUMBER := 1;
19 C_SOURCE_TO_FACT Constant NUMBER := 2;
20 C_STAGE_TO_FACT Constant NUMBER := 3;
21 /* == */
22 g_level_pk_not_found varchar2(30):='%%^^)(::Error::%%^^)(';
23 /* Error Status of prog */
24 g_retcode varchar2(30);
25 g_errbuf varchar2(255);
26 /* */
27 /*
28 == Local Function/Procedures
29 */
30
31 Procedure insert_update_Into_Headers ( p_cs_definition_id in number,
32 p_cs_name in varchar2,
33 p_instance_id in number,
34 p_refresh_num in number);
35
36 Procedure Insert_Data_Into_Headers ( p_cs_definition_id in number,
37 p_cs_name in varchar2,
38 p_instance_id in number,
39 p_refresh_num in number);
40 Procedure Process_1_Sub (
41 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
42 p_cs_name in varchar2,
43 p_source_view in varchar2,
44 p_target_table in varchar2,
45 p_instance_id in number,
46 p_sql_stmt in varchar2,
47 p_new_refresh_num in NUMBER);
48
49 Function Build_SQL_Source(
50 p_cs_definition_id in number,
51 p_process_type in number,
52 p_instance_id in varchar2,
53 p_cs_name in varchar2) return varchar2;
54
55 Function Build_SQL_FOR_COLLECT_AND_VAL(
56 p_cs_definition_id in number,
57 p_process_type in number,
58 p_source_view in varchar2,
59 p_db_link in varchar2,
60 p_cs_name in varchar2) return varchar2;
61
62 Function Build_SQL_INS_AS_SELECT(
63 p_cs_definition_id in number,
64 p_instance_id in varchar2,
65 p_cs_name in varchar2,
66 p_source_view in varchar2,
67 p_db_link in varchar2) return varchar2;
68
69 Function Build_Where_Clause (
70 p_tokenized_where in varchar2,
71 p_default_where in varchar2,
72 p_parameter1 in varchar2,
73 p_parameter2 in varchar2,
74 p_parameter3 in varchar2,
75 p_parameter4 in varchar2,
76 p_parameter5 in varchar2,
77 p_parameter6 in varchar2,
78 p_parameter7 in varchar2,
79 p_parameter8 in varchar2,
80 p_parameter9 in varchar2,
81 p_parameter10 in varchar2,
82 p_request_id in number) return varchar2;
83
84 Procedure log_processed (
85 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
86 p_cs_rec in msd_cs_definitions_v1%rowtype,
87 p_cs_name in varchar2,
88 p_instance_id in varchar2,
89 p_source_view in varchar2,
90 p_target_table in varchar2);
91
92 Procedure log_error (
93 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
94 p_cs_rec in msd_cs_definitions_v1%rowtype,
95 p_cs_name in varchar2,
96 p_instance_id in varchar2,
97 p_error_message in varchar2,
98 p_source_view in varchar2,
99 p_target_table in varchar2);
100
101 Procedure upd_stage_error (p_pk_id in number, p_process_status in varchar2, p_error_mesg in varchar2);
102
103 Procedure Refresh_Target(p_process_type in varchar2,
104 p_cs_definition_id in number,
105 p_cs_name in varchar2,
106 p_comp_refresh in varchar2,
107 p_instance_id in number,
108 p_new_refresh_num in NUMBER);
109
110 Procedure ins_row_fact(
111 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
112 p_cs_rec in msd_cs_definitions_v1%rowtype,
113 p_cs_name in varchar2,
114 p_instance_id in varchar2,
115 p_new_refresh_num IN NUMBER);
116
117 Procedure ins_row_staging (
118 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
119 p_cs_rec in msd_cs_definitions_v1%rowtype,
120 p_cs_name in varchar2,
121 p_instance_id in varchar2,
122 p_process_status in varchar2,
123 p_error_message in varchar2);
124
125
126 Procedure cs_collect_post_process (
127 p_cs_Rec in msd_cs_definitions_v1%rowtype,
128 p_cs_name in varchar2,
129 p_instance_id in varchar2 );
130
131 Procedure Process_1 (
132 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
133 p_cs_name in varchar2,
134 p_db_link in varchar2,
135 p_source_view in varchar2,
136 p_target_table in varchar2,
137 p_process_type in number,
138 p_default_where in varchar2,
139 p_tokenized_where in varchar2,
140 p_comp_refresh in varchar2,
141 p_instance_id in number,
142 p_parameter1 in varchar2,
143 p_parameter2 in varchar2,
144 p_parameter3 in varchar2,
145 p_parameter4 in varchar2,
146 p_parameter5 in varchar2,
147 p_parameter6 in varchar2,
148 p_parameter7 in varchar2,
149 p_parameter8 in varchar2,
150 p_parameter9 in varchar2,
151 p_parameter10 in varchar2,
152 p_new_refresh_num IN NUMBER,
153 p_request_id in number);
154
155 Procedure Process_2 (
156 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
157 p_cs_name in varchar2,
158 p_db_link in varchar2,
159 p_source_view in varchar2,
160 p_target_table in varchar2,
161 p_process_type in number,
162 p_default_where in varchar2,
163 p_tokenized_where in varchar2,
164 p_comp_refresh in varchar2,
165 p_instance_id in number,
166 p_parameter1 in varchar2,
167 p_parameter2 in varchar2,
168 p_parameter3 in varchar2,
169 p_parameter4 in varchar2,
170 p_parameter5 in varchar2,
171 p_parameter6 in varchar2,
172 p_parameter7 in varchar2,
173 p_parameter8 in varchar2,
174 p_parameter9 in varchar2,
175 p_parameter10 in varchar2,
176 p_request_id in number);
177
178 Function get_level_pk (
179 p_instance in varchar2,
180 p_level_id in number,
181 p_sr_level_value_pk in OUT NOCOPY varchar2 ,
182 p_level_value in OUT NOCOPY varchar2,
183 p_level_value_pk in OUT NOCOPY varchar2) return varchar2;
184
185
186 Procedure show_line(p_sql in varchar2);
187
188 Procedure debug_line(p_sql in varchar2);
189
190 Function validate_record (
191 crec_data in out NOCOPY msd_cs_dfn_utl.g_typ_source_stream,
192 p_cs_rec in out NOCOPY msd_Cs_definitions_v1%rowtype,
193 p_instance_id in varchar2,
194 p_err_mesg out NOCOPY varchar2) return boolean;
195
196 Function Build_Designator_Where_Clause(
197 p_cs_rec in msd_cs_definitions_v1%rowtype,
198 p_process_type in varchar2,
199 p_cs_name in varchar2) return varchar2;
200
201 /* Logic Starts here */
202 Procedure Custom_Stream_Collection (
203 errbuf OUT NOCOPY varchar2,
204 retcode OUT NOCOPY varchar2,
205 p_collection_type in varchar2,
206 p_validate_data in varchar2,
207 p_definition_id in number,
208 p_cs_name in varchar2,
209 p_comp_refresh in varchar2,
210 p_instance_id in number,
211 p_parameter1 in varchar2,
212 p_parameter2 in varchar2,
213 p_parameter3 in varchar2,
214 p_parameter4 in varchar2,
215 p_parameter5 in varchar2,
216 p_parameter6 in varchar2,
217 p_parameter7 in varchar2,
218 p_parameter8 in varchar2,
219 p_parameter9 in varchar2,
220 p_parameter10 in varchar2,
221 p_request_id in number default 0) is
222
223
224 l_single_step_collection varchar2(30):='Y';
225
226 ll_name varchar2(80) := null;
227
228 cursor c_get_cs is
229 select * from msd_cs_definitions_v1
230 where
231 cs_definition_id = p_definition_id and
232 nvl(valid_flag, 'N') = 'Y';
233
234 l_sql_stmt varchar2(32767);
235 l_cs_rec msd_cs_definitions_v1%rowtype;
236 l_target varchar2(60);
237 l_source varchar2(60);
238 l_dblink varchar2(60);
239 l_default_where varchar2(200);
240 l_cs_name varchar2(255);
241 l_retcode VARCHAR2(30);
242 L_PROCESS_TYPE number;
243 l_conc_request_id number;
244
245 l_new_refresh_num NUMBER;
246 l_comp_refresh VARCHAR2(30);
247
248 Begin
249
250 select DESCRIPTION
251 into ll_name
252 from msd_cs_definitions
253 where cs_definition_id = p_definition_id;
254
255 show_line('Stream Name : ' || ll_name);
256
257 debug_line('In Custom Stream Collection');
258 /* Initialize */
259 errbuf := 'Program Completed with Success';
260 retcode := '0';
261
262 /* Get profile */
263 l_single_step_collection := nvl(fnd_profile.value('MSD_ONE_STEP_COLLECTION'), 'N');
264
265 l_conc_request_id := fnd_global.conc_request_id;
266 debug_line('Conc Reuquest ID : ' || l_conc_request_id);
267
268 /* Validate definition id */
269 open c_get_cs;
270 fetch c_get_cs into l_cs_rec;
271 if c_get_cs%notfound then
272 /* raise error */
273 retcode := 2;
274 errbuf := 'Custom Definition Not Found : ' || p_definition_id;
275 /* DWK close cursor */
276 close c_get_cs;
277 return;
278 end if;
279 /* DWK close cursor */
280 close c_get_cs;
281
282 if p_collection_type = C_COLLECT and l_cs_rec.source_view_name is null then
283 /* Print message/ raise error */
284 retcode := 2;
285 errbuf := 'Can not preform Collection - Source View is not specified.';
286 return;
287 end if;
288
289 if nvl(l_cs_rec.multiple_stream_flag, 'N') <> 'Y' then
290 l_cs_name := C_DEFAULT_STREAM_NAME;
291 else
292 l_cs_name := p_cs_name;
293 end if;
294 /* Instance must be specified for collection */
295 if p_collection_type = C_COLLECT and p_instance_id is null then
296 /* Print message/raise error */
297 retcode :=2;
298 errbuf := 'Instance must be specified';
299 return;
300 end if;
301 /*
302 Fetch database link only if stream source type is 'SOURCE'
303 */
304 if l_cs_rec.cs_type in ('SOURCE') and p_collection_type = C_COLLECT then
305 msd_common_utilities.get_db_link(p_instance_id, l_dblink, l_retcode);
306 if (l_retcode = -1) then
307 retcode := 2;
308 errbuf := 'Error while getting db_link';
309 return;
310 end if;
311 end if;
312
313 /*-------------- For Collection ----------------------------*/
314 IF ( p_collection_type = C_COLLECT ) THEN
315
316 /* Check and push setup parameters if it is not done so previously */
317 MSD_PUSH_SETUP_DATA.chk_push_setup( errbuf,
318 retcode,
319 p_instance_id);
320 IF (nvl(retcode, 0) <> 0) THEN
321 return;
322 END IF;
323
324 IF (l_single_step_collection = 'Y' and p_validate_data = 'Y') THEN
325 /* One Step Collection will be internally transformed into
326 Two Step Collection and Pull.
327 Set Source and Target
328 Set Global var for processing error record and marking processed */
329
330 /* Collect into staging without Validation. Validation will be done in PULL */
331 l_target := 'MSD_ST_CS_DATA';
332 l_process_type := C_SOURCE_TO_STAGE;
333 l_source := l_cs_rec.source_view_name;
334 /* Internally transformed 2 step collection, always performs complete
335 refresh for collection part */
336 l_comp_refresh := 'Y';
337
338 l_default_where := Build_Designator_Where_Clause( l_cs_rec,
339 l_process_type,
340 p_cs_name);
341 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
342 l_cs_name, l_comp_refresh, p_instance_id,
343 l_new_refresh_num);
344 Process_2(
345 p_cs_rec => l_cs_rec,
346 p_cs_name => l_cs_name,
347 p_db_link => l_dblink,
348 p_source_view => l_source,
349 p_target_table => l_target,
350 p_process_type => l_process_type,
351 p_default_where => l_default_where,
352 p_tokenized_where => l_cs_rec.collect_addtl_where_clause,
353 p_comp_refresh => l_comp_refresh,
354 p_instance_id => p_instance_id,
355 p_parameter1 => p_parameter1,
356 p_parameter2 => p_parameter2,
357 p_parameter3 => p_parameter3,
358 p_parameter4 => p_parameter4,
359 p_parameter5 => p_parameter5,
360 p_parameter6 => p_parameter6,
361 p_parameter7 => p_parameter7,
362 p_parameter8 => p_parameter8,
363 p_parameter9 => p_parameter9,
364 p_parameter10 => p_parameter10,
365 p_request_id => l_conc_request_id);
366
367 /* Custom Steam Collection Post Process
368 After data has been collected from source to staging */
369 cs_collect_post_process(l_cs_rec,
370 l_cs_name,
371 p_instance_id);
372
373 /* Pull
374 Set Source and Target
375 Set Global var for processing error record and marking processed */
376 l_target := 'MSD_CS_DATA';
377 l_source := 'MSD_ST_CS_DATA';
378 l_process_type := C_STAGE_TO_FACT;
379 l_default_where := Build_Designator_Where_Clause( l_cs_rec,
380 l_process_type,
381 p_cs_name);
382 /* Get a new seq number for pull part */
383 SELECT msd.msd_last_refresh_number_s.nextval into
384 l_new_refresh_Num from dual;
385
386 /* Refresh Target */
387 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
388 l_cs_name, p_comp_refresh, p_instance_id,
389 l_new_refresh_num);
390 Process_1(
391 p_cs_rec => l_cs_rec,
392 p_cs_name => l_cs_name,
393 p_db_link => l_dblink,
394 p_source_view => l_source,
395 p_process_type => C_STAGE_TO_FACT,
396 p_target_table => l_target,
397 p_default_where => l_default_where,
398 p_tokenized_where => NULL,
399 p_comp_refresh => p_comp_refresh,
400 p_instance_id => p_instance_id,
401 p_parameter1 => p_parameter1,
402 p_parameter2 => p_parameter2,
403 p_parameter3 => p_parameter3,
404 p_parameter4 => p_parameter4,
405 p_parameter5 => p_parameter5,
406 p_parameter6 => p_parameter6,
407 p_parameter7 => p_parameter7,
408 p_parameter8 => p_parameter8,
409 p_parameter9 => p_parameter9,
410 p_parameter10 => p_parameter10,
411 p_new_refresh_num => l_new_refresh_num,
412 p_request_id => l_conc_request_id);
413
414 ELSIF (l_single_step_collection = 'N' and p_validate_data = 'Y') THEN
415 /*
416 Set Source and Target
417 Set Global var for processing error record and marking processed
418 */
419 l_target := 'MSD_ST_CS_DATA';
420 l_process_type := C_SOURCE_TO_STAGE;
421 l_source := l_cs_rec.source_view_name;
422 l_default_where := Build_Designator_Where_Clause( l_cs_rec,
423 l_process_type,
424 p_cs_name);
425 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
426 l_cs_name, p_comp_refresh, p_instance_id,
427 l_new_refresh_num);
428 Process_1(
429 p_cs_rec => l_cs_rec,
430 p_cs_name => l_cs_name,
431 p_db_link => l_dblink,
432 p_source_view => l_source,
433 p_target_table => l_target,
434 p_process_type => l_process_type,
435 p_default_where => l_default_where,
436 p_tokenized_where => l_cs_rec.collect_addtl_where_clause,
437 p_comp_refresh => p_comp_refresh,
438 p_instance_id => p_instance_id,
439 p_parameter1 => p_parameter1,
440 p_parameter2 => p_parameter2,
441 p_parameter3 => p_parameter3,
442 p_parameter4 => p_parameter4,
443 p_parameter5 => p_parameter5,
444 p_parameter6 => p_parameter6,
445 p_parameter7 => p_parameter7,
446 p_parameter8 => p_parameter8,
447 p_parameter9 => p_parameter9,
448 p_parameter10 => p_parameter10,
449 p_new_refresh_num => l_new_refresh_num,
450 p_request_id => l_conc_request_id);
451
452 /* Custom Steam Collection Post Process
453 After data has been collected from source to staging
454 */
455 cs_collect_post_process(l_cs_rec,
456 l_cs_name,
457 p_instance_id);
458
459 ELSIF (l_single_step_collection = 'Y' and p_validate_data = 'N') THEN
460 /* Invalid Option. Raise Error */
461 retcode := 2;
462 errbuf := 'Invalid option - Single Step Collection must perform Validation';
463 return;
464 ELSIF (l_single_step_collection = 'N' and p_validate_data = 'N') THEN
465 /* Collect into staging without Validation */
466 l_target := 'MSD_ST_CS_DATA';
467 l_process_type := C_SOURCE_TO_STAGE;
468 l_source := l_cs_rec.source_view_name;
469 l_default_where := Build_Designator_Where_Clause( l_cs_rec,
470 l_process_type,
471 p_cs_name);
472 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
473 l_cs_name, p_comp_refresh, p_instance_id,
474 l_new_refresh_num);
475 Process_2(
476 p_cs_rec => l_cs_rec,
477 p_cs_name => l_cs_name,
478 p_db_link => l_dblink,
479 p_source_view => l_source,
480 p_target_table => l_target,
481 p_process_type => l_process_type,
482 p_default_where => l_default_where,
483 p_tokenized_where => l_cs_rec.collect_addtl_where_clause,
484 p_comp_refresh => p_comp_refresh,
485 p_instance_id => p_instance_id,
486 p_parameter1 => p_parameter1,
487 p_parameter2 => p_parameter2,
488 p_parameter3 => p_parameter3,
489 p_parameter4 => p_parameter4,
490 p_parameter5 => p_parameter5,
491 p_parameter6 => p_parameter6,
492 p_parameter7 => p_parameter7,
493 p_parameter8 => p_parameter8,
494 p_parameter9 => p_parameter9,
495 p_parameter10 => p_parameter10,
496 p_request_id => l_conc_request_id);
497
498 /* Custom Steam Collection Post Process
499 After data has been collected from source to staging
500 */
501 cs_collect_post_process(l_cs_Rec,
502 l_cs_name,
503 p_instance_id);
504
505
506 END IF; /* End of ELSE IF */
507
508 /*--------------------- For Pull ----------------------------*/
509 ELSIF (p_collection_type = C_PULL) THEN
510 IF p_validate_data = 'Y' THEN
511 /*
512 Set Source and Target
513 Set Global var for processing error record and marking processed
514 */
515 l_target := 'MSD_CS_DATA';
516 l_source := 'MSD_ST_CS_DATA';
517 l_process_type := C_STAGE_TO_FACT;
518 l_default_where := Build_Designator_Where_Clause(
519 l_cs_rec ,
520 l_process_type ,
521 p_cs_name );
522
523 /* Get a new seq number for PULL part */
524 SELECT msd.msd_last_refresh_number_s.nextval into
525 l_new_refresh_Num from dual;
526
527 /* Refresh Target */
528 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
529 l_cs_name,p_comp_refresh, p_instance_id,
530 l_new_refresh_num);
531
532 Process_1(
533 p_cs_rec => l_cs_rec,
534 p_cs_name => l_cs_name,
535 p_db_link => l_dblink,
536 p_source_view => l_source,
537 p_process_type => C_STAGE_TO_FACT,
538 p_target_table => l_target,
539 p_default_where => l_default_where,
540 p_tokenized_where => NULL,
541 p_comp_refresh => p_comp_refresh,
542 p_instance_id => p_instance_id,
543 p_parameter1 => p_parameter1,
544 p_parameter2 => p_parameter2,
545 p_parameter3 => p_parameter3,
546 p_parameter4 => p_parameter4,
547 p_parameter5 => p_parameter5,
548 p_parameter6 => p_parameter6,
549 p_parameter7 => p_parameter7,
550 p_parameter8 => p_parameter8,
551 p_parameter9 => p_parameter9,
552 p_parameter10 => p_parameter10,
553 p_new_refresh_num => l_new_refresh_num,
554 p_request_id => l_conc_request_id);
555
556 ELSE
557 /* Invalid Option. Raise Error;*/
558 retcode := 2;
559 errbuf := 'Invalid option - Pull must perform Validation';
560 return;
561 END IF; /* End of p_validate_data = 'Y' */
562 END IF; /* End of Collect or Pull */
563
564
565 IF (l_target = 'MSD_CS_DATA') THEN
566 /* Delete cs fact rows that are not used by any demand plans */
567 MSD_TRANSLATE_FACT_DATA.clean_fact_data( errbuf,
568 retcode,
569 l_target,
570 p_definition_id);
571 END IF;
572
573
574 retcode := g_retcode;
575 errbuf := g_errbuf;
576
577 commit;
578
579 Exception
580 When others then
581 retcode := 2;
582 errbuf := substr( sqlerrm, 1, 255);
583 rollback;
584 End;
585
586 Procedure Process_1 (
587 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
588 p_cs_name in varchar2,
589 p_db_link in varchar2,
590 p_source_view in varchar2,
591 p_target_table in varchar2,
592 p_process_type in number,
593 p_default_where in varchar2,
594 p_tokenized_where in varchar2,
595 p_comp_refresh in varchar2,
596 p_instance_id in number,
597 p_parameter1 in varchar2,
598 p_parameter2 in varchar2,
599 p_parameter3 in varchar2,
600 p_parameter4 in varchar2,
601 p_parameter5 in varchar2,
602 p_parameter6 in varchar2,
603 p_parameter7 in varchar2,
604 p_parameter8 in varchar2,
605 p_parameter9 in varchar2,
606 p_parameter10 in varchar2,
607 p_new_refresh_num IN NUMBER,
608 p_request_id in number) is
609
610 TYPE cur_type is REF CURSOR;
611
612 l_cur cur_type;
613 l_rec msd_cs_dfn_utl.G_TYP_SOURCE_STREAM;
614 l_valid boolean;
615 l_err_msg varchar2(1000);
616 l_sql_stmt varchar2(5000);
617 l_where varchar2(3000);
618
619 Begin
620 debug_line('In Process_1');
621 /*
622 This process does following
623 1. Fetches data using cursor (source/staging)
624 2. Validates data
625 3. If Error
626 3.1 Mark/Save Erroneous data in staging
627 else
628 3.100 Save in Target (Fact/Staging)
629 3.101 Mark record Processed
630 end if
631 */
632
633 l_sql_stmt := Build_SQL_FOR_COLLECT_AND_VAL
634 (p_cs_rec.cs_definition_id, p_process_type,
635 p_source_view, p_db_link, p_cs_name);
636
637 l_sql_stmt := l_sql_stmt || ' WHERE 1 = 1';
638
639 l_where := build_where_clause(
640 p_tokenized_where ,
641 p_default_where ,
642 p_parameter1 ,
643 p_parameter2 ,
644 p_parameter3 ,
645 p_parameter4 ,
646 p_parameter5 ,
647 p_parameter6 ,
648 p_parameter7 ,
649 p_parameter8 ,
650 p_parameter9 ,
651 p_parameter10 ,
652 p_request_id );
653
654 if l_where is not null then
655 l_sql_stmt := l_sql_stmt || ' AND ' || l_where;
656 end if;
657
658 /* DWK. Do not include instace = 0 into fact table when we PULL data */
659 IF (p_process_type = C_STAGE_TO_FACT) THEN
660 l_sql_stmt := l_sql_stmt ||
661 ' AND ' || 'attribute_1 <> 0';
662 END IF;
663
664 debug_line('length for l_sql_stmt :' || length(l_sql_stmt));
665 debug_line('length for l_where :' || length(l_where));
666 debug_line('before debug line');
667 debug_line(l_sql_stmt);
668 debug_line('after debug line');
669
670 /* Use Dynamic SQL to fetch and process rows */
671 Process_1_Sub (
672 p_cs_rec ,
673 p_cs_name ,
674 p_source_view ,
675 p_target_table ,
676 p_instance_id ,
677 l_sql_stmt,
678 p_new_refresh_num);
679
680 /* Delete Successfully processed Staging rows if the process was Staging to Fact */
681 /* DWK Don't delete any row with instance = 0 */
682 /* Also, removed cs_name = p_cs_name condition from WHERE clause */
683
684 IF p_process_type = C_STAGE_TO_FACT THEN
685 delete from MSD_ST_CS_DATA
686 where
687 cs_definition_id = p_cs_rec.cs_definition_id and
688 process_Status = C_LOG_PROCESSED and
689 attribute_1 <> '0';
690 END IF;
691
692 Exception
693 When others then
694 show_line(sqlerrm);
695 raise;
696
697 End;
698
699
700 Procedure cs_collect_post_process (
701 p_cs_rec in msd_cs_definitions_v1%rowtype,
702 p_cs_name in varchar2,
703 p_instance_id in varchar2 ) is
704
705 cursor c1 is
706 select 'Y'
707 from msd_st_cs_data
708 where cs_definition_id = p_cs_rec.cs_definition_id
709 and cs_name = p_cs_name
710 and attribute_1 = p_instance_id
711 and attribute_49 = '1'
712 and rownum < 2;
713
714 l_exists varchar2(10):='N';
715
716 Begin
717
718 /* Is this Sales Forecast Stream */
719 if p_cs_rec.name in (
720 'MSD_SALES_FCST_BESTCASE', 'MSD_SALES_FCST_PIPELINE',
721 'MSD_SALES_FCST_REALISTIC', 'MSD_SALES_FCST_WGTPLINE',
722 'MSD_SALES_FCST_WORSTCASE' ) then
723
724 open c1;
725 fetch c1 into l_exists;
726 close c1;
727
728 If l_exists = 'Y' then
729 delete from msd_st_cs_data
730 where cs_definition_id = p_cs_Rec.cs_definition_id
731 and cs_name = p_cs_name
732 and attribute_1 = p_instance_id
733 and attribute_49 = '2';
734
735 end if;
736
737 end if;
738
739 /* Collect Current On-Hand Inventory data from ODS table for SOP data stream */
740
741 if p_cs_rec.name = 'MSD_ONHAND_INVENTORY' then
742
743 insert into msd_st_cs_data (
744 CS_ST_DATA_ID,
745 CS_DEFINITION_ID,
746 CS_NAME,
747 ATTRIBUTE_1,
748 ATTRIBUTE_2,
749 ATTRIBUTE_3,
750 ATTRIBUTE_6,
751 ATTRIBUTE_7,
752 ATTRIBUTE_10,
753 ATTRIBUTE_11,
754 ATTRIBUTE_34,
755 ATTRIBUTE_41,
756 ATTRIBUTE_43,
757 ATTRIBUTE_50,
758 ATTRIBUTE_51,
759 CREATION_DATE,
760 CREATED_BY,
761 LAST_UPDATE_DATE,
762 LAST_UPDATED_BY,
763 LAST_UPDATE_LOGIN
764 )
765 select msd_st_cs_data_s.nextval,
766 to_char(p_cs_rec.cs_definition_id),
767 'SINGLE_STREAM',
768 to_char(inv.sr_instance_id),
769 inv.prd_level_id,
770 inv.prd_sr_level_pk,
771 inv.geo_level_id,
772 inv.geo_sr_level_pk,
773 inv.org_level_id,
774 inv.org_sr_level_pk,
775 inv.time_level_id,
776 to_char(inv.quantity),
777 to_char(sysdate, 'YYYY/MM/DD'),
778 inv.dcs_level_id,
779 inv.dcs_sr_level_pk,
780 to_char(sysdate),
781 to_char(fnd_global.user_id),
782 to_char(sysdate),
783 to_char(fnd_global.user_id),
784 to_char(fnd_global.login_id)
785 from msd_curr_onhand_inventory_v inv
786 where inv.sr_instance_id = p_instance_id;
787
788 end if;
789
790 Exception
791 When others then
792 show_line(sqlerrm);
793 raise;
794 End;
795
796
797 Procedure log_error (
798 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
799 p_cs_rec in msd_cs_definitions_v1%rowtype,
800 p_cs_name in varchar2,
801 p_instance_id in varchar2,
802 p_error_message in varchar2,
803 p_source_view in varchar2,
804 p_target_table in varchar2) is
805 Begin
806 /*
807 Error Logging depends on source and target.
808 */
809 debug_line('In Log Error');
810 if (p_target_table = 'MSD_CS_DATA' and p_source_view <> 'MSD_ST_CS_DATA') or
811 (p_target_table = 'MSD_ST_CS_DATA') then
812 /*
813 if data is collected directly from source to Fact table or
814 data is collected into staging table then
815 insert erroneous row in staging table with Status "Error"
816 */
817 ins_row_staging(crec_data, p_cs_rec, p_cs_name,
818 nvl(p_instance_id, crec_data.instance),
819 C_LOG_ERROR,
820 p_error_message);
821 else
822 /* i.e. Data is Pulled from Staging to Fact. Then update the staging row
823 with Status 'invalid'
824 */
825 upd_stage_error(crec_data.pk_id, C_LOG_ERROR, p_error_message);
826 end if;
827
828 Exception
829 When others then
830 show_line(sqlerrm);
831 raise;
832
833 End;
834
835 Procedure upd_stage_error (p_pk_id in number, p_process_status in varchar2, p_error_mesg in varchar2) is
836 Begin
837 debug_line('In upd_stage_error');
838 update msd_st_cs_data
839 set
840 error_desc = p_error_mesg,
841 process_status = p_process_status
842 where cs_st_data_id = p_pk_id;
843
844 Exception
845 When others then
846 show_line(sqlerrm);
847 raise;
848
849 End;
850
851 Procedure log_processed (
852 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
853 p_cs_rec in msd_cs_definitions_v1%rowtype,
854 p_cs_name in varchar2,
855 p_instance_id in varchar2,
856 p_source_view in varchar2,
857 p_target_table in varchar2) is
858 Begin
859 debug_line('In log_processed');
860 /* Process Logging depends on source and target.
861 */
862 if (p_target_table = 'MSD_CS_DATA' and p_source_view <> 'MSD_ST_CS_DATA') or
863 (p_target_table = 'MSD_ST_CS_DATA') then
864 /*
865 if data is collected directly from source to Fact table or
866 data is collected into staging table then
867 Processing can not be logged or is not yet done
868 */
869 null;
870 else
871 /* i.e. Data is Pulled from Staging to Fact. Then update the staging row
872 with Status PROCESSED
873 */
874 upd_stage_error(crec_data.pk_id, C_LOG_PROCESSED, null);
875 end if;
876
877 if p_target_table = 'MSD_CS_DATA' then
878
879 null;
880 end if;
881
882 Exception
883 When others then
884 show_line(sqlerrm);
885 raise;
886
887 End;
888
889 Procedure ins_row_staging (
890 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
891 p_cs_rec in msd_cs_definitions_v1%rowtype,
892 p_cs_name in varchar2,
893 p_instance_id in varchar2,
894 p_process_status in varchar2,
895 p_error_message in varchar2) is
896 Begin
897 -- debug_line('In ins_row_staging');
898 insert into msd_st_cs_data
899 (cs_st_data_id, cs_definition_id, cs_name,
900 attribute_1, attribute_2, attribute_3, attribute_4,
901 attribute_5, attribute_6, attribute_7, attribute_8, attribute_9,
902 attribute_10, attribute_11, attribute_12, attribute_13,
903 attribute_14, attribute_15, attribute_16, attribute_17,
904 attribute_18, attribute_19, attribute_20, attribute_21, attribute_22,
905 attribute_23, attribute_24, attribute_25, attribute_26, attribute_27,
906 attribute_28, attribute_29, attribute_30, attribute_31,
907 attribute_32, attribute_33, attribute_34, attribute_35, attribute_36,
908 attribute_37, attribute_38, attribute_39, attribute_40,
909 attribute_41, attribute_42, attribute_43, attribute_44, attribute_45,
910 attribute_46, attribute_47, attribute_48, attribute_49,
911 attribute_50, attribute_51, attribute_52, attribute_53, attribute_54,
912 attribute_55, attribute_56, attribute_57, attribute_58, attribute_59,
913 attribute_60,
914 process_status, error_desc,
915 created_by, creation_date, last_update_date, last_updated_by, last_update_login
916 )
917 values
918 /* Fix for designator name crec_data.designator instead of p_cs_name */
919 (msd_st_cs_data_s.nextval, p_cs_rec.cs_definition_id, crec_data.designator,
920 p_instance_id,
921 crec_data.prd_level_id, crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk,
922 crec_data.geo_level_id, crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk,
923 crec_data.org_level_id, crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk,
924 crec_data.prd_parent_level_id, crec_data.prd_parent_sr_level_value_pk,
925 crec_data.prd_parent_level_value, crec_data.prd_parent_level_value_pk,
926 crec_data.rep_level_id, crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk,
927 crec_data.chn_level_id, crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk,
928 crec_data.ud1_level_id, crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk,
929 crec_data.ud2_level_id, crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk,
930 crec_data.tim_level_id, crec_data.attribute_35, crec_data.attribute_36, crec_data.attribute_37,
931 crec_data.attribute_38, crec_data.attribute_39, crec_data.attribute_40, crec_data.attribute_41,
932 crec_data.attribute_42, crec_data.attribute_43, crec_data.attribute_44, crec_data.attribute_45,
933 crec_data.attribute_46, crec_data.attribute_47, crec_data.attribute_48, crec_data.attribute_49,
934 crec_data.dcs_level_id, crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk,
935 crec_data.attribute_54, crec_data.attribute_55, crec_data.attribute_56, crec_data.attribute_57,
936 crec_data.attribute_58, crec_data.attribute_59, crec_data.attribute_60,
937 p_process_status, p_error_message,
938 fnd_global.user_id, sysdate, sysdate, fnd_global.user_id, fnd_global.login_id);
939
940 Exception
941 When others then
942 show_line(sqlerrm);
943 raise;
944
945 End;
946
947 Procedure ins_row_fact(
948 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
949 p_cs_rec in msd_cs_definitions_v1%rowtype,
950 p_cs_name in varchar2,
951 p_instance_id in varchar2,
952 p_new_refresh_num in NUMBER) is
953 Begin
954 debug_line('In ins_row_fact');
955 insert into msd_cs_data
956 (cs_data_id, cs_definition_id, cs_name,
957 attribute_1, attribute_2, attribute_3, attribute_4,
958 attribute_5, attribute_6, attribute_7, attribute_8, attribute_9,
959 attribute_10, attribute_11, attribute_12, attribute_13,
960 attribute_14, attribute_15, attribute_16, attribute_17,
961 attribute_18, attribute_19, attribute_20, attribute_21, attribute_22,
962 attribute_23, attribute_24, attribute_25, attribute_26, attribute_27,
963 attribute_28, attribute_29, attribute_30, attribute_31,
964 attribute_32, attribute_33, attribute_34, attribute_35, attribute_36,
965 attribute_37, attribute_38, attribute_39, attribute_40,
966 attribute_41, attribute_42, attribute_43, attribute_44, attribute_45,
967 attribute_46, attribute_47, attribute_48, attribute_49,
968 attribute_50, attribute_51, attribute_52, attribute_53, attribute_54,
969 attribute_55, attribute_56, attribute_57, attribute_58, attribute_59,
970 attribute_60,
971 created_by, creation_date, last_update_date, last_updated_by,last_update_login,
972 created_by_refresh_num, last_refresh_num, action_code)
973 values
974 /* Fix for designator name crec_data.designator instead of p_cs_name */
975 (msd_cs_data_s.nextval, p_cs_rec.cs_definition_id, crec_data.designator ,
976 p_instance_id,
977 crec_data.prd_level_id, crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk,
978 crec_data.geo_level_id, crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk,
979 crec_data.org_level_id, crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk,
980 crec_data.prd_parent_level_id, crec_data.prd_parent_sr_level_value_pk,
981 crec_data.prd_parent_level_value, crec_data.prd_parent_level_value_pk,
982 crec_data.rep_level_id, crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk,
983 crec_data.chn_level_id, crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk,
984 crec_data.ud1_level_id, crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk,
985 crec_data.ud2_level_id, crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk,
986 crec_data.tim_level_id, crec_data.attribute_35, crec_data.attribute_36, crec_data.attribute_37,
987 crec_data.attribute_38, crec_data.attribute_39, crec_data.attribute_40, crec_data.attribute_41,
988 crec_data.attribute_42, crec_data.attribute_43, crec_data.attribute_44, crec_data.attribute_45,
989 crec_data.attribute_46, crec_data.attribute_47, crec_data.attribute_48, crec_data.attribute_49,
990 crec_data.dcs_level_id, crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk,
991 crec_data.attribute_54, crec_data.attribute_55, crec_data.attribute_56, crec_data.attribute_57,
992 crec_data.attribute_58, crec_data.attribute_59, crec_data.attribute_60,
993 fnd_global.user_id, sysdate, sysdate, fnd_global.user_id,fnd_global.login_id,
994 p_new_refresh_num, p_new_refresh_num, 'I');
995
996 Exception
997 When others then
998 show_line(sqlerrm);
999 raise;
1000
1001 End;
1002
1003 Procedure Process_2 (
1004 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
1005 p_cs_name in varchar2,
1006 p_db_link in varchar2,
1007 p_source_view in varchar2,
1008 p_target_table in varchar2,
1009 p_process_type in number,
1010 p_default_where in varchar2,
1011 p_tokenized_where in varchar2,
1012 p_comp_refresh in varchar2,
1013 p_instance_id in number,
1014 p_parameter1 in varchar2,
1015 p_parameter2 in varchar2,
1016 p_parameter3 in varchar2,
1017 p_parameter4 in varchar2,
1018 p_parameter5 in varchar2,
1019 p_parameter6 in varchar2,
1020 p_parameter7 in varchar2,
1021 p_parameter8 in varchar2,
1022 p_parameter9 in varchar2,
1023 p_parameter10 in varchar2,
1024 p_request_id in number) is
1025
1026 l_ins_stmt varchar2(32767);
1027 l_where varchar2(500);
1028
1029
1030 Begin
1031 debug_line('In Process_2');
1032 /*
1033 This Procedure inserts data in staging table from source view
1034 without performing any validation.
1035 */
1036
1037
1038 l_ins_stmt := Build_SQL_INS_AS_SELECT(
1039 p_cs_definition_id => p_cs_rec.cs_definition_id,
1040 p_instance_id => p_instance_id,
1041 p_cs_name => p_cs_name,
1042 p_source_view => p_source_view,
1043 p_db_link => p_db_link);
1044
1045
1046 l_where := build_where_clause(
1047 p_tokenized_where ,
1048 p_default_where ,
1049 p_parameter1 ,
1050 p_parameter2 ,
1051 p_parameter3 ,
1052 p_parameter4 ,
1053 p_parameter5 ,
1054 p_parameter6 ,
1055 p_parameter7 ,
1056 p_parameter8 ,
1057 p_parameter9 ,
1058 p_parameter10 ,
1059 p_request_id );
1060
1061
1062 if l_where is not null then
1063 l_ins_stmt := l_ins_stmt || ' where ' || l_where;
1064 end if;
1065
1066
1067
1068 /* Execute SQL */
1069 debug_line(l_ins_stmt);
1070
1071 Execute immediate l_ins_stmt;
1072
1073 Exception
1074 When others then
1075 show_line(sqlerrm);
1076 raise;
1077
1078 End;
1079
1080 Function Build_SQL_FOR_COLLECT_AND_VAL(
1081 p_cs_definition_id in number,
1082 p_process_type in number,
1083 p_source_view in varchar2,
1084 p_db_link in varchar2,
1085 p_cs_name in varchar2)
1086 return varchar2 is
1087
1088 l_sql_stmt varchar2(32767);
1089 Begin
1090 debug_line('In Build_SQL_FOR_COLLECT_AND_VAL');
1091 /*
1092 This method will be used in the following cases
1093 Source ------ Staging (Process - Collect (Single Step = 'N'). Validation - 'Y')
1094 Source ------ Fact (Process - Collect (Single Step = 'Y'). Validation - 'Y')
1095 Staging ------ Fact (Process - Pull (Single Step = N/A). Validation - 'Y')
1096 */
1097 l_sql_stmt := Build_SQL_Source(p_cs_definition_id, p_process_type, NULL, p_cs_name);
1098 /*
1099 Append data specific to Single Step needs
1100 */
1101 if p_source_view = 'MSD_ST_CS_DATA' then
1102 l_sql_stmt := 'Select cs_st_data_id PK_ID, ' || l_sql_stmt || ' from ' || p_source_view ;
1103 else
1104 l_sql_stmt := 'Select null pk_id, ' || l_sql_stmt || ' from ' || p_source_view || p_db_link;
1105 end if;
1106
1107 return l_sql_stmt;
1108
1109
1110 Exception
1111 When others then
1112 show_line(sqlerrm);
1113 raise;
1114
1115 End;
1116
1117 Function Build_SQL_INS_AS_SELECT(
1118 p_cs_definition_id in number,
1119 p_instance_id in varchar2,
1120 p_cs_name in varchar2,
1121 p_source_view in varchar2,
1122 p_db_link in varchar2) return varchar2 is
1123
1124 l_sql_stmt varchar2(32767);
1125 Begin
1126 debug_line('In Build_SQL_INS_AS_SELECT');
1127 /*
1128 This method will be used in the following cases
1129 Source ------ Staging (Process - Collect (Single Step = 'N'). Validation - 'N')
1130 */
1131 l_sql_stmt := Build_SQL_Source(p_cs_definition_id, C_SOURCE_TO_STAGE,
1132 p_instance_id, p_cs_name);
1133
1134 /* DWK Move cs_name from top to at the bottom of insert statement since
1135 l_sql_stmt will have forecast_designator inside. */
1136 l_sql_stmt := 'Insert into MSD_ST_CS_DATA (cs_st_data_id , cs_definition_id, ' ||
1137 'attribute_1, attribute_2, attribute_3, attribute_4, attribute_5, ' ||
1138 'attribute_6, attribute_7, attribute_8, attribute_9, attribute_10,' ||
1139 'attribute_11, attribute_12, attribute_13, attribute_14, attribute_15, ' ||
1140 'attribute_16, attribute_17, attribute_18, attribute_19, attribute_20, ' ||
1141 'attribute_21, attribute_22, attribute_23, attribute_24, attribute_25, ' ||
1142 'attribute_26, attribute_27, attribute_28, attribute_29, attribute_30,' ||
1143 'attribute_31, attribute_32, attribute_33, attribute_34, attribute_35, ' ||
1144 'attribute_36, attribute_37, attribute_38, attribute_39, attribute_40,' ||
1145 'attribute_41, attribute_42, attribute_43, attribute_44, attribute_45, ' ||
1146 'attribute_46, attribute_47, attribute_48, attribute_49, attribute_50, ' ||
1147 'attribute_51, attribute_52, attribute_53, attribute_54, attribute_55, ' ||
1148 'attribute_56, attribute_57, attribute_58, attribute_59, attribute_60,' ||
1149 'cs_name ) ' || ' select ' || 'msd_st_cs_Data_s.nextval, ' || p_cs_definition_id ||
1150 ', ' || l_sql_stmt || ' from ' || p_source_view || p_db_link;
1151
1152 return l_sql_stmt;
1153
1154
1155 Exception
1156 When others then
1157 show_line(sqlerrm);
1158 raise;
1159
1160 End;
1161
1162 Function Build_SQL_Source(
1163 p_cs_definition_id in number,
1164 p_process_type in number,
1165 p_instance_id in varchar2,
1166 p_cs_name in varchar2) return varchar2 is
1167
1168 Type l_type_sql_struct is RECORD (
1169 tabcol_name varchar2(60),
1170 srccol_name varchar2(60));
1171 Type l_type_sql_struct_array is TABLE of l_type_sql_struct;
1172
1173 CURSOR c1 IS
1174 select * from msd_cs_defn_column_dtls_v
1175 where cs_definition_id = p_cs_definition_id;
1176
1177 CURSOR c_multi_stream IS
1178 SELECT multiple_stream_flag FROM msd_cs_definitions
1179 WHERE cs_definition_id = p_cs_definition_id;
1180
1181 CURSOR c_cs_name IS
1182 select source_view_column_name
1183 from msd_cs_defn_column_dtls_v
1184 where cs_definition_id = p_cs_definition_id and
1185 table_column = 'CS_NAME';
1186
1187 l_struct l_type_sql_struct_array;
1188 l_sql_stmt varchar2(32767);
1189 l_multi_stream VARCHAR2(30);
1190
1191 /*
1192 Function conv_to_sql_struct (a in varchar2, b in varchar2)
1193 return l_type_sql_struct is
1194 x l_type_sql_struct;
1195 Begin
1196 x.tabcol_name := a;
1197 x.srccol_name := b;
1198 return x;
1199 End;
1200 */
1201
1202 Begin
1203 debug_line('In Build_SQL_Source');
1204 /* p_source_or_stage = 0 menas build select for source view,
1205 p_source_or_stage = non 0 menas build select for staging */
1206 /* Initialize array */
1207
1208 l_struct := l_type_sql_struct_array(null);
1209
1210 /* Build Array with default values - Table_column_name is 'cs_name, attribute_1' ...
1211 and source_column_name is 'NULL' */
1212
1213 for i in 1..60 loop
1214 l_struct.extend;
1215 l_struct(i).tabcol_name := 'ATTRIBUTE_' || i;
1216 l_struct(i).srccol_name := 'NULL';
1217 end loop;
1218 l_struct.extend;
1219 l_struct(61).tabcol_name := 'CS_NAME';
1220
1221 /* Fetch source column name from the mappings table and update the array */
1222 for c1_rec in c1 loop
1223
1224 for i IN 1..61 loop
1225 if l_struct(i).tabcol_name = c1_rec.table_column then
1226 if c1_rec.identifier_type = 'INSTANCE' then
1227 if p_instance_id is not null then
1228 l_struct(i).srccol_name := '''' || p_instance_id || '''';
1229 end if;
1230 elsif c1_rec.identifier_type = 'DATE' then
1231 if c1_rec.source_view_column_name is not null then
1232 l_struct(i).srccol_name := 'to_char(' ||c1_rec.source_view_column_name || ', ''YYYY/MM/DD'')';
1233 end if;
1234 else
1235 if c1_rec.source_view_column_name is not null then
1236 l_struct(i).srccol_name := c1_rec.source_view_column_name ;
1237 end if;
1238 end if;
1239 exit;
1240 end if;
1241 end loop;
1242 end loop;
1243
1244 /* DWK If this stream is multiple stream and there is no column mapping
1245 for CS_NAME then, assume user will populate the CS_NAME
1246 from Collection */
1247
1248 OPEN c_multi_stream;
1249 FETCH c_multi_stream INTO l_multi_stream;
1250 CLOSE c_multi_stream;
1251
1252 IF nvl(l_multi_stream, 'N') = 'Y' THEN
1253 /* After column mapping, if source column for CS_NAME is still null
1254 then assume user will populate the CS_NAME from Collection */
1255
1256 IF ( l_struct(61).srccol_name IS NULL ) THEN
1257 l_struct(61).srccol_name := '''' || replace(p_cs_name, '''', '''''') || '''';
1258 END IF;
1259
1260 ELSE /* Single stream */
1261 l_struct(61).srccol_name := '''' || C_DEFAULT_STREAM_NAME || '''' ;
1262 END IF;
1263
1264 /* Builds SQL stmt */
1265 for i in 1..61 loop
1266
1267 if p_process_type in (C_SOURCE_TO_FACT, C_SOURCE_TO_STAGE) then
1268 if l_sql_stmt is null then
1269 l_sql_stmt := l_sql_stmt || l_struct(i).srccol_name;
1270 else
1271 l_sql_stmt := l_sql_stmt || ', ' || l_struct(i).srccol_name;
1272 end if;
1273 else
1274 /* staging to fact (assumption always for Validate = 'Yes'
1275 append staging table column name + column alias ("Source view name")
1276 */
1277 if l_sql_stmt is null then
1278 l_sql_stmt := l_sql_stmt || l_struct(i).tabcol_name;
1279 else
1280 l_sql_stmt := l_sql_stmt || ', ' || l_struct(i).tabcol_name ;
1281 end if;
1282 end if;
1283
1284 end loop;
1285
1286 debug_line('l_sql_stmt : ' || l_sql_stmt);
1287 return l_sql_stmt;
1288
1289 Exception
1290 When others then
1291 show_line(sqlerrm);
1292 raise;
1293
1294 End;
1295
1296 Procedure show_line(p_sql in varchar2) is
1297 i number:=1;
1298 Begin
1299 while i <= length(p_sql)
1300 loop
1301 -- dbms_output.put_line (substr(p_sql, i, 255));
1302 fnd_file.put_line(fnd_file.log,substr(p_sql, i, 255));
1303 null;
1304 i := i+255;
1305 end loop;
1306 End;
1307
1308 Function validate_record (
1309 crec_data in out NOCOPY msd_cs_dfn_utl.g_typ_source_stream,
1310 p_cs_rec in out NOCOPY msd_Cs_definitions_v1%rowtype,
1311 p_instance_id in varchar2,
1312 p_err_mesg out NOCOPY varchar2) return boolean is
1313
1314 l_comments1 varchar2(1000);
1315 l_comments2 varchar2(1000);
1316 l_first_record boolean:=TRUE;
1317 l_dummy_date date;
1318 l_dummy_number number;
1319
1320 l_prd_found varchar2(30);
1321 l_prd_parent_found varchar2(30);
1322 l_geo_found varchar2(30);
1323 l_org_found varchar2(30);
1324 l_chn_found varchar2(30);
1325 l_rep_found varchar2(30);
1326 l_ud1_found varchar2(30);
1327 l_ud2_found varchar2(30);
1328 l_tim_found varchar2(30);
1329 l_dcs_found varchar2(30);
1330
1331 l_count number(2) := 0;
1332
1333 Begin
1334
1335 /* crec_data -> actual record that you want to validate(ex, rows in staging table)
1336 p_cs_rec -> information in custom stream definition
1337 */
1338
1339 -- debug_line('In validate_record');
1340 /* Get Product LEVEL_PK */
1341 if nvl(p_cs_rec.prd_level_collect_flag, 'N') = 'Y' then
1342 l_prd_found := get_level_pk(p_instance_id, crec_data.prd_level_id,
1343 crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk);
1344
1345 IF ( crec_data.prd_parent_sr_level_value_pk IS NOT NULL or
1346 crec_data.prd_parent_level_value IS NOT NULL or
1347 crec_data.prd_parent_level_value_pk IS NOT NULL ) THEN
1348 /* DWK Get Product Dimension's Parent LEVEL_PK */
1349 l_prd_parent_found := get_level_pk(p_instance_id, crec_data.prd_parent_level_id,
1350 crec_data.prd_parent_sr_level_value_pk,
1351 crec_data.prd_parent_level_value,
1352 crec_data.prd_parent_level_value_pk);
1353
1354 ELSE /* IF there is no parent item then make it null */
1355 crec_data.prd_parent_level_id := NULL;
1356 END IF;
1357 end if;
1358 /* Get ORG LEVEL_PK */
1359 if nvl(p_cs_rec.org_level_collect_flag, 'N') = 'Y' then
1360 l_org_found := get_level_pk(p_instance_id, crec_data.org_level_id,
1361 crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk);
1362 end if;
1363 /* Get Geo LEVEL_PK */
1364 if nvl(p_cs_rec.geo_level_collect_flag, 'N') = 'Y' then
1365 l_geo_found := get_level_pk(p_instance_id, crec_data.geo_level_id,
1366 crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk);
1367 end if;
1368
1369 /* Get CHN LEVEL_PK */
1370 if nvl(p_cs_rec.chn_level_collect_flag, 'N') = 'Y' then
1371 l_chn_found := get_level_pk(p_instance_id, crec_data.chn_level_id,
1372 crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk);
1373 end if;
1374 /* Get REP LEVEL_PK */
1375 if nvl(p_cs_rec.rep_level_collect_flag, 'N') = 'Y' then
1376 l_rep_found := get_level_pk(p_instance_id, crec_data.rep_level_id,
1377 crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk);
1378 end if;
1379 /* Get UD1 LEVEL_PK */
1380 if nvl(p_cs_rec.ud1_level_collect_flag, 'N') = 'Y' then
1381 l_ud1_found := get_level_pk(p_instance_id, crec_data.ud1_level_id,
1382 crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk);
1383 end if;
1384 /* Get UD2 LEVEL_PK */
1385 if nvl(p_cs_rec.ud2_level_collect_flag, 'N') = 'Y' then
1386 l_ud2_found := get_level_pk(p_instance_id, crec_data.ud2_level_id,
1387 crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk);
1388 end if;
1389
1390 /* Get Demand Class LEVEL_PK */
1391 if nvl(p_cs_rec.dcs_level_collect_flag, 'N') = 'Y' then
1392 l_dcs_found := get_level_pk(p_instance_id, crec_data.dcs_level_id,
1393 crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk);
1394 end if;
1395
1396 select
1397 decode(l_prd_found, g_level_pk_not_found, 'PRD ', null) ||
1398 /* DWK Check level pk of parent item for dependent demand data */
1399 decode(l_prd_parent_found, g_level_pk_not_found, 'PRD_PARENT ', null) ||
1400 decode(l_org_found, g_level_pk_not_found, 'ORG ', null) ||
1401 decode(l_geo_found, g_level_pk_not_found, 'GEO ', null) ||
1402 decode(l_chn_found, g_level_pk_not_found, 'CHN ', null) ||
1403 decode(l_rep_found, g_level_pk_not_found, 'REP ', null) ||
1404 decode(l_ud1_found, g_level_pk_not_found, 'UD1 ', null) ||
1405 decode(l_ud2_found, g_level_pk_not_found, 'UD2 ', null) ||
1406 decode(l_dcs_found, g_level_pk_not_found, 'DCS ', null)
1407 into
1408 l_comments2
1409 from
1410 dual;
1411
1412 /* Level validation */
1413
1414 if nvl(p_cs_rec.strict_flag, 'N') = 'Y' then
1415 /* if level_id is not defined at the definition level then the level_id
1416 of first record fetched will be used for validation
1417 */
1418 if l_first_record then
1419
1420 l_first_record := FALSE;
1421 /* New */
1422 if p_cs_rec.prd_level_id is null and nvl(p_cs_rec.prd_level_collect_flag, 'N') = 'Y' then
1423 p_cs_rec.prd_level_id := crec_data.prd_level_id;
1424 end if;
1425
1426 if p_cs_rec.org_level_id is null and nvl(p_cs_rec.org_level_collect_flag, 'N') = 'Y' then
1427 p_cs_rec.org_level_id := crec_data.org_level_id;
1428 end if;
1429
1430 if p_cs_rec.geo_level_id is null and nvl(p_cs_rec.geo_level_collect_flag, 'N') = 'Y' then
1431 p_cs_rec.geo_level_id := crec_data.geo_level_id;
1432 end if;
1433
1434 if p_cs_rec.chn_level_id is null and nvl(p_cs_rec.chn_level_collect_flag, 'N') = 'Y' then
1435 p_cs_rec.chn_level_id := crec_data.chn_level_id;
1436 end if;
1437
1438 if p_cs_rec.rep_level_id is null and nvl(p_cs_rec.rep_level_collect_flag, 'N') = 'Y' then
1439 p_cs_rec.rep_level_id := crec_data.rep_level_id;
1440 end if;
1441
1442 if p_cs_rec.ud1_level_id is null and nvl(p_cs_rec.ud1_level_collect_flag, 'N') = 'Y' then
1443 p_cs_rec.ud1_level_id := crec_data.ud1_level_id;
1444 end if;
1445
1446 if p_cs_rec.ud2_level_id is null and nvl(p_cs_rec.ud2_level_collect_flag, 'N') = 'Y' then
1447 p_cs_rec.ud2_level_id := crec_data.ud2_level_id;
1448 end if;
1449
1450 if p_cs_rec.tim_level_id is null and nvl(p_cs_rec.tim_level_collect_flag, 'N') = 'Y' then
1451 /* Attribute_34 is tim_level_id */
1452 p_cs_rec.tim_level_id := crec_data.tim_level_id;
1453 end if;
1454
1455 if p_cs_rec.dcs_level_id is null and nvl(p_cs_rec.dcs_level_collect_flag, 'N') = 'Y' then
1456 p_cs_rec.dcs_level_id := crec_data.dcs_level_id;
1457 end if;
1458
1459 end if;
1460
1461 Select
1462 decode(crec_data.prd_level_id,
1463 null, decode(p_cs_rec.prd_level_collect_flag,
1464 'Y', 'PRD ',
1465 null),
1466 p_cs_rec.prd_level_id, null,
1467 'PRD ') ||
1468 /* DWK IF dependent demand data are collected, its parents level id should be 1 */
1469 decode(nvl(crec_data.prd_parent_level_id, '1'), '1', null, 'PRD_PARENT ') ||
1470 decode(crec_data.org_level_id,
1471 null, decode(p_cs_rec.org_level_collect_flag,
1472 'Y', 'ORG ',
1473 null),
1474 p_cs_rec.org_level_id, null,
1475 'ORG ') ||
1476 decode(crec_data.geo_level_id,
1477 null, decode(p_cs_rec.geo_level_collect_flag,
1478 'Y', 'GEO ',
1479 null),
1480 p_cs_rec.geo_level_id, null,
1481 'GEO ') ||
1482 decode(crec_data.rep_level_id,
1483 null, decode(p_cs_rec.rep_level_collect_flag,
1484 'Y', 'REP ',
1485 null),
1486 p_cs_rec.rep_level_id, null,
1487 'REP ') ||
1488 decode(crec_data.chn_level_id,
1489 null, decode(p_cs_rec.chn_level_collect_flag,
1490 'Y', 'CHN ',
1491 null),
1492 p_cs_rec.chn_level_id, null,
1493 'CHN ') ||
1494 decode(crec_data.ud1_level_id,
1495 null, decode(p_cs_rec.ud1_level_collect_flag,
1496 'Y', 'UD1 ',
1497 null),
1498 p_cs_rec.ud1_level_id, null,
1499 'UD1 ') ||
1500 decode(crec_data.ud2_level_id,
1501 null, decode(p_cs_rec.ud2_level_collect_flag,
1502 'Y', 'UD2 ',
1503 null),
1504 p_cs_rec.ud2_level_id, null,
1505 'UD2 ') ||
1506 decode(crec_data.tim_level_id,
1507 null, decode(p_cs_rec.tim_level_collect_flag,
1508 'Y', 'TIM ',
1509 null),
1510 p_cs_rec.tim_level_id, null,
1511 'TIM ') ||
1512 decode(crec_data.dcs_level_id,
1513 null, decode(p_cs_rec.dcs_level_collect_flag,
1514 'Y', 'DCS ',
1515 null),
1516 p_cs_rec.dcs_level_id, null,
1517 'DCS ' )
1518 into
1519 l_comments1
1520 from dual;
1521 ELSE /* p_cs_rec.strict_flag = 'N' */
1522
1523 /* Check whether that time level id exists in fnd lookup or not */
1524
1525 IF ( nvl(p_cs_rec.tim_level_collect_flag, 'N') = 'Y') THEN
1526 select count(*) into l_count
1527 from fnd_lookup_values
1528 where lookup_type = 'MSD_PERIOD_TYPE' and
1529 nvl(crec_data.tim_level_id, '999.99') = lookup_code and
1530 rownum <= 1;
1531
1532 IF ( l_count < 1 ) THEN
1533 select 'TIM' into l_comments1 from dual;
1534 END IF;
1535 END IF;
1536
1537 END IF;
1538
1539
1540
1541 /* MSD_CS_DATALOAD_INVALID_LVLID - Invalid Level ID for Dimensions */
1542 /* MSD_CS_DATALOAD_INVALID_DIM - Invalid Dimensions */
1543 select decode(l_comments2, null, null, 'MSD_CS_DATALOAD_INVALID_DIM : ' || l_comments2) ||
1544 decode(l_comments1, null, null, 'MSD_CS_DATALOAD_INVALID_LVLID : ' || l_comments1)
1545 into p_err_mesg
1546 from dual;
1547
1548 /* Validate Date Format */
1549 Begin
1550 select to_date(crec_data.attribute_43, 'YYYY/MM/DD')
1551 into l_dummy_date
1552 from dual;
1553 Exception
1554 When others then
1555 p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_DATE_FORMAT : ATTRIBUTE_43';
1556 End;
1557
1558 /* Validate Amount Number Format */
1559 Begin
1560 -- Check Amount
1561 if (p_cs_rec.measurement_type in (1,3,4)) then
1562 l_dummy_number := crec_data.attribute_42;
1563 end if;
1564 Exception
1565 When others then
1566 p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_42';
1567 End;
1568
1569 /* Validate Quantity Number Format */
1570 Begin
1571 -- Check Quantity
1572 if (p_cs_rec.measurement_type in (2,4,5)) then
1573 l_dummy_number := crec_data.attribute_41;
1574 end if;
1575 Exception
1576 When others then
1577 p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_41';
1578 End;
1579
1580 /* Validate Price Number Format */
1581 Begin
1582 -- Check Price
1583 if (p_cs_rec.measurement_type in (3,5)) then
1584 l_dummy_number := crec_data.attribute_44;
1585 end if;
1586 Exception
1587 When others then
1588 p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_44';
1589 End;
1590
1591 if p_err_mesg is null then
1592 return TRUE;
1593 else
1594 return FALSE;
1595 end if;
1596
1597 Exception
1598 When others then
1599 show_line(sqlerrm);
1600 raise;
1601
1602 End;
1603
1604 Function get_level_pk (
1605 p_instance in varchar2,
1606 p_level_id in number,
1607 p_sr_level_value_pk in OUT NOCOPY varchar2,
1608 p_level_value in OUT NOCOPY varchar2,
1609 p_level_value_pk in OUT NOCOPY varchar2) return varchar2 is
1610
1611 Cursor c1 is
1612 select level_pk, level_value
1613 from
1614 msd_level_values
1615 where
1616 instance = p_instance and
1617 level_id = p_level_id and
1618 sr_level_pk = p_sr_level_value_pk;
1619
1620 Cursor c2 is
1621 select level_pk
1622 from
1623 msd_level_values
1624 where
1625 instance = p_instance and
1626 level_id = p_level_id and
1627 level_value = p_level_value;
1628
1629 Cursor c3 is
1630 select sr_level_pk, level_value
1631 from
1632 msd_level_values
1633 where
1634 instance = p_instance and
1635 level_id = p_level_id and
1636 level_pk = p_level_value_pk;
1637
1638 l_level_pk varchar2(255):=g_level_pk_not_found;
1639 l_level_val varchar2(2000);
1640 Begin
1641
1642 -- debug_line('In get_level_pk');
1643 if p_level_id is null then
1644 return null;
1645 /* i.e. no data collected for dimension
1646 */
1647 end if;
1648
1649 if p_instance is null or nvl(p_sr_level_value_pk, nvl(p_level_value, p_level_value_pk)) is null then
1650 /* insufficient parameters */
1651 /* l_level_pk := g_level_pk_not_found; */
1652 return null;
1653 else
1654 if p_sr_level_value_pk is not null then
1655 open c1;
1656 fetch c1 into l_level_pk, p_level_value;
1657 if c1%notfound then
1658 l_level_pk := g_level_pk_not_found;
1659 end if;
1660 close c1;
1661 elsif p_level_value is not null then
1662 open c2;
1663 fetch c2 into l_level_pk;
1664 if c2%notfound then
1665 l_level_pk := g_level_pk_not_found;
1666 end if;
1667 close c2;
1668 else /* p_level_value_pk is not null */
1669 open c3;
1670 fetch c3 into p_sr_level_value_pk, p_level_value;
1671 if c3%notfound then
1672 l_level_pk := g_level_pk_not_found;
1673 else
1674 l_level_pk := p_level_value_pk;
1675 end if;
1676 close c3;
1677 end if;
1678
1679 end if;
1680
1681 if l_level_pk <> g_level_pk_not_found then
1682 p_level_value_pk := l_level_pk;
1683 else
1684 debug_line(' p_instance ' || p_instance || ' p_level_id ' || p_level_id ||
1685 ' p_sr_level_value_pk ' || p_sr_level_value_pk || ' p_level_value ' || p_level_value ||
1686 ' p_level_value_pk ' || p_level_value_pk);
1687 end if;
1688
1689 return l_level_pk;
1690
1691 Exception
1692 When others then
1693 show_line(sqlerrm);
1694 raise;
1695
1696 End;
1697
1698 Function Build_Where_Clause (
1699 p_tokenized_where in varchar2,
1700 p_default_where in varchar2,
1701 p_parameter1 in varchar2,
1702 p_parameter2 in varchar2,
1703 p_parameter3 in varchar2,
1704 p_parameter4 in varchar2,
1705 p_parameter5 in varchar2,
1706 p_parameter6 in varchar2,
1707 p_parameter7 in varchar2,
1708 p_parameter8 in varchar2,
1709 p_parameter9 in varchar2,
1710 p_parameter10 in varchar2,
1711 p_request_id in number) return varchar2 is
1712
1713 Type param_list_type is varray(10) of varchar2(255);
1714
1715
1716 l_para_list param_list_type;
1717 l_where varchar2(3000);
1718
1719 Procedure find_and_subst_param ( p_where_cond in out NOCOPY varchar2,
1720 p_val in varchar2,
1721 p_request_id in number,
1722 p_para_num in number) is
1723
1724 start_pos number;
1725 end_pos number;
1726 para_type varchar2(10);
1727
1728 l_default_col varchar2(300) := NULL;
1729 l_count number := 0;
1730
1731 l_dblink varchar2(100) := NULL;
1732 l_retcode number := 0;
1733 l_multi_flag varchar2(30) := 'N';
1734
1735 -- 'CHAR:Prompt_Name:ValueSet_Name:Remote_Yes_No:Multi_Yes_NO:Default_Column_Name_For_Multi'
1736
1737 Begin
1738 debug_line('In find_and_subst_param');
1739 start_pos := instr(p_where_cond, '&&', 1);
1740 para_type := substr(p_where_cond, start_pos + 2, 7);
1741 end_pos := instr(p_where_cond, '''', start_pos);
1742
1743 if substr(upper(para_type), 1, 5) = 'CHAR:' then /* Character type */
1744 l_multi_flag :=
1745 nvl(upper(msd_cs_defn_utl2.get_char_property(p_where_cond, start_pos, end_pos, 4)), 'N');
1746
1747 l_default_col := msd_cs_defn_utl2.get_char_property(p_where_cond, start_pos, end_pos, 5);
1748
1749 IF l_multi_flag = 'Y' THEN
1750 /* If multi input parar then check whether user entered
1751 any values for the multi input parameters */
1752 select count(1) into l_count from msd_cs_coll_parameters
1753 where conc_request_id = p_request_id and
1754 parameter_number = p_para_num;
1755 /* If user hasn't entered any multi input parameters then
1756 use user specified default column name */
1757 IF (l_count = 0 AND l_default_col IS NOT NULL) THEN
1758 p_where_cond := substr(p_where_cond, 1, start_pos - 2) ||
1759 l_default_col ||
1760 substr(p_where_cond, end_pos + 1);
1761 ELSE
1762 p_where_cond := substr(p_where_cond, 1, start_pos - 2) ||
1763 ' (SELECT parameter_code FROM msd_cs_coll_parameters ' ||
1764 ' WHERE conc_request_id = ' || p_request_id ||
1765 ' AND parameter_number = ' || p_para_num || ' ) ' ||
1766 substr(p_where_cond, end_pos + 1);
1767 END IF;
1768 ELSE
1769 p_where_cond := substr(p_where_cond, 1, start_pos - 1) ||
1770 replace(p_val, '''', '''''') ||
1771 substr(p_where_cond, end_pos);
1772 END IF;
1773 elsif substr(upper(para_type), 1, 7) = 'NUMBER:' then /* Number type*/
1774 p_where_cond := substr(p_where_cond, 1, start_pos - 2) || p_val ||
1775 substr(p_where_cond, end_pos + 1);
1776 elsif substr(upper(para_type), 1, 5) = 'DATE:' then /* Date type */
1777 p_where_cond := substr(p_where_cond, 1, start_pos - 2) || 'to_date(''' || p_val || ''', ''YYYYMMDD'')' ||
1778 substr(p_where_cond, end_pos + 1);
1779 end if;
1780
1781 debug_line(p_where_cond);
1782
1783 End;
1784
1785
1786 Procedure substitute_parameter (
1787 p_where_cond in out NOCOPY varchar2,
1788 p_param_list in param_list_type,
1789 p_request_id in number) is
1790
1791 i number := 1;
1792 Begin
1793 debug_line('In substitute_parameter');
1794 /* DP-CRM Code changes by easwaran */
1795 while (i < 11 )
1796 loop
1797 find_and_subst_param( p_where_cond, p_param_list(i), p_request_id, i);
1798 i := i + 1;
1799 end loop;
1800
1801 End;
1802
1803 Procedure make_para_list(
1804 p_parameter1 in varchar2,
1805 p_parameter2 in varchar2,
1806 p_parameter3 in varchar2,
1807 p_parameter4 in varchar2,
1808 p_parameter5 in varchar2,
1809 p_parameter6 in varchar2,
1810 p_parameter7 in varchar2,
1811 p_parameter8 in varchar2,
1812 p_parameter9 in varchar2,
1813 p_parameter10 in varchar2,
1814 p_para_list in out NOCOPY param_list_type) is
1815 Begin
1816 debug_line('In make_para_list');
1817 p_para_list := param_list_type (p_parameter1, p_parameter2, p_parameter3, p_parameter4, p_parameter5,
1818 p_parameter6, p_parameter7, p_parameter8, p_parameter9, p_parameter10);
1819
1820 End;
1821
1822
1823 Begin
1824 debug_line('In Build_Where_Clause');
1825 if p_tokenized_where is not null then
1826 /*
1827 convert parameters into an array.
1828 */
1829 make_para_list( p_parameter1, p_parameter2, p_parameter3, p_parameter4, p_parameter5,
1830 p_parameter6, p_parameter7, p_parameter8, p_parameter9, p_parameter10,
1831 l_para_list);
1832
1833 /* Build additional Where */
1834 l_where := p_tokenized_where;
1835
1836 substitute_parameter ( l_where, l_para_list, p_request_id);
1837
1838 end if;
1839 if l_where is not null then
1840 if p_default_where is not null then
1841 l_where := p_default_where || ' and ' || l_where;
1842 end if;
1843 else
1844 l_where := p_default_where;
1845 end if;
1846
1847 return l_where;
1848
1849 Exception
1850 When others then
1851 show_line(sqlerrm);
1852 raise;
1853
1854 End;
1855
1856 Procedure Refresh_Target(
1857 p_process_type in varchar2,
1858 p_cs_definition_id in number,
1859 p_cs_name in varchar2,
1860 p_comp_refresh in varchar2,
1861 p_instance_id in number,
1862 p_new_refresh_num in NUMBER) is
1863
1864 l_sql_stmt varchar2(2000);
1865
1866 cursor C_GET_DEL_CRIT is
1867 select distinct attribute_1 instance, cs_name
1868 from msd_st_cs_data
1869 where cs_definition_id = p_cs_definition_id and
1870 cs_name = nvl(p_cs_name, cs_name);
1871
1872 /* DWK create a separe cursor to fetch instance in single stream case */
1873 cursor c_get_del_crit_single is
1874 select distinct attribute_1 instance
1875 from msd_st_cs_data
1876 where cs_definition_id = p_cs_definition_id;
1877
1878 cursor c_multi_stream is
1879 select nvl(multiple_stream_flag,'N')
1880 from msd_cs_definitions
1881 where cs_definition_id = p_cs_definition_id;
1882
1883 l_multi_flag VARCHAR2(10);
1884
1885 Begin
1886 debug_line('In refresh_target');
1887 if p_comp_refresh = 'Y' then
1888
1889 /* if p_process_type = C_SOURCE_TO_FACT then
1890 delete from msd_cs_data where cs_definition_id = p_cs_definition_id
1891 and cs_name = nvl(p_cs_name, cs_name) and attribute_1 = nvl(p_instance_id, attribute_1);
1892 */
1893 IF p_process_type = C_SOURCE_TO_STAGE then
1894 delete from msd_st_cs_data where cs_definition_id = p_cs_definition_id
1895 and cs_name = nvl(p_cs_name, cs_name) and attribute_1 = nvl(p_instance_id, attribute_1);
1896
1897 elsif p_process_type = C_STAGE_TO_FACT then
1898 /* DWK For single stream, ignore the CS_NAME column for refresh */
1899 open c_multi_stream;
1900 fetch c_multi_stream into l_multi_flag;
1901 close c_multi_stream;
1902
1903 IF (l_multi_flag = 'Y') THEN
1904 For l_rec IN c_get_del_crit LOOP
1905 UPDATE msd_cs_data
1906 SET Action_code = 'D',
1907 last_refresh_num = p_new_refresh_num
1908 WHERE cs_definition_id = p_cs_definition_id and
1909 cs_name = l_rec.cs_name and
1910 attribute_1 = l_rec.instance and
1911 action_code = 'I';
1912 END LOOP;
1913
1914 ELSE /* For single stream, ignore the cs_name in delete stmt */
1915 For l_rec IN c_get_del_crit_single LOOP
1916 UPDATE msd_cs_data
1917 SET Action_code = 'D',
1918 last_refresh_num = p_new_refresh_num
1919 WHERE cs_definition_id = p_cs_definition_id and
1920 attribute_1 = l_rec.instance and
1921 action_code = 'I';
1922
1923 END LOOP;
1924 END IF;
1925
1926 end if; /* End of C_STAGE_TO_FACT */
1927 else /* Not Complete Refresh
1928 /* Delete data from staging table to avoid double couting when user runs
1929 collection source to stage without complete refresh checkbox checked
1930 This will make custom stream collection behaviour same as other
1931 collection (Bookking/Shipment)
1932 */
1933 IF p_process_type = C_SOURCE_TO_STAGE then
1934 delete from msd_st_cs_data
1935 where cs_definition_id = p_cs_definition_id and
1936 cs_name = nvl(p_cs_name, cs_name) and
1937 attribute_1 = nvl(p_instance_id, attribute_1);
1938 END IF;
1939 end if; /* End of p_comp_refresh Y */
1940
1941 Exception
1942 When others then
1943 show_line(sqlerrm);
1944 raise;
1945
1946 End;
1947
1948 Procedure Process_1_Sub (
1949 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
1950 p_cs_name in varchar2,
1951 p_source_view in varchar2,
1952 p_target_table in varchar2,
1953 p_instance_id in number,
1954 p_sql_stmt in varchar2,
1955 p_new_refresh_num IN NUMBER) is
1956
1957 TYPE cur_type is REF CURSOR;
1958 l_cur cur_type;
1959 l_rec msd_cs_dfn_utl.G_TYP_SOURCE_STREAM;
1960
1961 l_valid boolean;
1962 l_err_msg varchar2(1000);
1963
1964 l_success_rows number:=0;
1965 l_error_rows number:=0;
1966
1967 /* Bug# 4349618 To commit in Batches */
1968 l_counter number:=0;
1969 l_commit_flag number:=0;
1970
1971 /* DWK */
1972 l_temp_designator VARCHAR2(40) := NULL;
1973 l_temp_instance_id NUMBER := NULL;
1974
1975 Begin
1976 debug_line('In Process_1_Sub');
1977
1978 open l_cur for p_sql_stmt;
1979 LOOP
1980 fetch l_cur into l_rec;
1981 exit when l_cur%notfound;
1982
1983 l_valid := null;
1984 l_err_msg := null;
1985
1986 debug_line('Validating ' || l_rec.pk_id);
1987
1988
1989 l_valid := validate_record (l_rec, p_cs_rec, nvl(l_rec.instance,p_instance_id), l_err_msg);
1990
1991 IF l_valid THEN
1992 /* IMP : Instance is p_instance in case of Collect
1993 and l_rec.instance in case of PULLL i.e. from the staging
1994 table */
1995 IF (p_target_table = 'MSD_CS_DATA') THEN
1996 ins_row_fact(l_rec, p_cs_rec, l_rec.designator,
1997 nvl(p_instance_id, l_rec.instance),
1998 p_new_refresh_num);
1999
2000 /* Insert designator into headers talbe when designator get modified. */
2001 IF ( l_rec.designator <> nvl(l_temp_designator,'-99999999~!@') OR
2002 nvl(p_instance_id,l_rec.instance) <> nvl(l_temp_instance_id,-99999999) ) THEN
2003 l_temp_designator := l_rec.designator;
2004 l_temp_instance_id := nvl(p_instance_id,l_rec.instance);
2005
2006 /* DWK Populate MSD_CS_DATA_HEADERS table after inserting rows
2007 into FACT table */
2008 insert_update_Into_Headers ( p_cs_rec.cs_definition_id,
2009 l_rec.designator,
2010 nvl(p_instance_id,l_rec.instance), p_new_refresh_num);
2011 END IF;
2012 ELSE
2013 ins_row_staging(l_rec, p_cs_rec, l_rec.designator, nvl(p_instance_id, l_rec.instance), null, null);
2014 END IF;
2015
2016 /* Mark record Processed */
2017 log_processed(l_rec, p_cs_rec, l_rec.designator, nvl(p_instance_id, l_rec.instance), p_source_view, p_target_table);
2018
2019 /* Count Success Rows */
2020 l_success_rows := l_success_rows + 1;
2021
2022 /* Bug# 4349618 To commit in Batches */
2023 l_counter := l_counter + 1;
2024
2025 ELSE /* IF not Valid */
2026
2027 /* Log Error */
2028 log_error(l_rec, p_cs_rec, l_rec.designator,
2029 nvl(p_instance_id, l_rec.instance),
2030 l_err_msg, p_source_view, p_target_table);
2031 /* Count Erroneous Rows */
2032 l_error_rows := l_error_rows + 1;
2033
2034 /* Bug# 4349618 To commit in Batches */
2035 l_counter := l_counter + 1;
2036
2037 END IF;
2038
2039 /* Bug# 4349618 To commit in Batches */
2040 SELECT mod( l_counter, C_BATCH_SIZE)
2041 INTO l_commit_flag
2042 FROM dual;
2043
2044 IF l_commit_flag = 0 THEN
2045 debug_line( 'Inside Process_1_Sub: commiting inside the loop.');
2046 commit;
2047 END IF;
2048
2049 END LOOP;
2050
2051 /* Bug$ 4349618 To commit in Batches*/
2052 debug_line( 'Inside Process_1_Sub: commiting after the loop ends.');
2053 commit;
2054
2055
2056 if l_error_rows > 0 then
2057 g_retcode := '1';
2058 g_errbuf := 'There were erroneous records in Collect/Pull.';
2059 end if;
2060
2061 if l_success_rows = 0 and l_error_rows = 0then
2062 g_retcode := '1';
2063 g_errbuf := 'There were no rows fetched.';
2064 end if;
2065
2066 /* Print Results */
2067
2068 show_line('Valid Records : ' || l_success_rows);
2069 show_line('Invalid Records : ' || l_error_rows);
2070
2071 close l_cur;
2072
2073 Exception
2074 When others then
2075 show_line(sqlerrm);
2076 show_line(p_sql_stmt);
2077 close l_cur;
2078 raise;
2079 End;
2080
2081 Function Build_Designator_Where_Clause(
2082 p_cs_rec in msd_cs_definitions_v1%rowtype,
2083 p_process_type in varchar2,
2084 p_cs_name in varchar2) return varchar2 is
2085
2086 Cursor C1 is
2087 select source_view_column_name
2088 from msd_cs_defn_column_dtls
2089 where
2090 cs_definition_id = p_cs_rec.cs_definition_id and
2091 table_column = 'CS_NAME';
2092
2093 l_where_cond varchar2(500);
2094 l_col_name varchar2(60);
2095
2096 Begin
2097 debug_Line('In Build_Designator_Where_Clause');
2098
2099 /* Build filter for designator(cs_name) */
2100 if p_process_type in (C_STAGE_TO_FACT) then
2101
2102 if p_cs_name is not null then
2103 l_where_cond := 'cs_name = ' || '''' || replace(p_cs_name, '''', '''''') || '''';
2104 end if;
2105 else
2106 if nvl(p_cs_rec.multiple_stream_flag, 'N') = 'Y' and p_cs_name is not null then
2107 open c1;
2108 fetch c1 into l_col_name;
2109 close c1;
2110
2111 if l_col_name is null then
2112 null;
2113 /*Raise Error*/
2114 else
2115 l_where_cond := l_col_name || ' = ' || '''' || replace(p_cs_name, '''', '''''') || '''';
2116 end if;
2117 end if;
2118 end if;
2119
2120 /* Add Default Where */
2121 if p_process_type = C_STAGE_TO_FACT then
2122 if l_where_cond is not null then
2123 l_where_cond := l_where_cond || ' and cs_definition_id = ' || p_cs_rec.cs_definition_id;
2124 else
2125 l_where_cond := ' cs_definition_id = ' || p_cs_rec.cs_definition_id;
2126 end if;
2127 end if;
2128
2129 return l_where_cond;
2130
2131 Exception
2132 When others then
2133 show_line(sqlerrm);
2134 raise;
2135
2136 End;
2137
2138 Procedure debug_line(p_sql in varchar2)is
2139 Begin
2140 if c_debug = 'Y' then
2141 show_line(p_sql);
2142 end if;
2143 End;
2144
2145 /* DWK */
2146 /*************************************************************************************************
2147 PROCEDURE Insert_update_Into_Headers
2148
2149 This procedure will decide whether insert cs_definition_id, cs_name, and instance into
2150 msd_cs_data_headers table or not and insert row if necessary.
2151 **************************************************************************************************/
2152 Procedure insert_update_Into_Headers ( p_cs_definition_id in number,
2153 p_cs_name in varchar2,
2154 p_instance_id in number,
2155 p_refresh_num in number) is
2156
2157
2158 p_count NUMBER:=0;
2159
2160 BEGIN
2161
2162 SELECT count(*) INTO p_count FROM msd_cs_data_headers_v1
2163 WHERE instance = p_instance_id AND
2164 cs_definition_id = p_cs_definition_id AND
2165 cs_name = p_cs_name;
2166
2167 IF ( p_count = 0 ) THEN
2168 Insert_Data_Into_Headers (p_cs_definition_id,
2169 p_cs_name,
2170 p_instance_id,
2171 p_refresh_num);
2172 ELSE
2173
2174 update msd_cs_data_headers
2175 set last_refresh_num = p_refresh_num
2176 where cs_definition_id = p_cs_definition_id
2177 and instance = p_instance_id
2178 and cs_name = p_cs_name;
2179
2180 END IF;
2181
2182
2183 Exception
2184 When others then
2185 show_line(sqlerrm);
2186 raise;
2187
2188 END insert_update_Into_Headers;
2189
2190
2191 /*************************************************************************************************
2192 PROCEDURE Insert_Data_Into_Headers
2193
2194 This procedure will insert cs_definition_id, cs_name, and instance into
2195 msd_cs_data_headers table.
2196 **************************************************************************************************/
2197 Procedure Insert_Data_Into_Headers ( p_cs_definition_id in number,
2198 p_cs_name in varchar2,
2199 p_instance_id in number,
2200 p_refresh_num in number) is
2201
2202
2203 BEGIN
2204
2205 INSERT INTO msd_cs_data_headers
2206 ( CS_DATA_HEADER_ID,
2207 INSTANCE,
2208 CS_DEFINITION_ID,
2209 CS_NAME,
2210 LAST_UPDATE_DATE,
2211 LAST_UPDATED_BY,
2212 CREATION_DATE,
2213 CREATED_BY,
2214 LAST_UPDATE_LOGIN,
2215 LAST_REFRESH_NUM
2216 )
2217 VALUES ( msd_cs_data_headers_s.nextval,
2218 p_instance_id,
2219 p_cs_definition_id,
2220 p_cs_name,
2221 sysdate,
2222 fnd_global.user_id,
2223 sysdate,
2224 fnd_global.user_id,
2225 fnd_global.login_id,
2226 p_refresh_num
2227 );
2228
2229
2230
2231 Exception
2232 When others then
2233 show_line('Error in inserting into MSD_CS_DATA_HEADERS');
2234 show_line(sqlerrm);
2235 raise;
2236
2237 END Insert_Data_Into_Headers;
2238
2239 End;