[Home] [Help]
PACKAGE BODY: APPS.MSD_CS_COLLECTION
Source
1 package body msd_cs_collection as
2 /* $Header: msdcsclb.pls 120.2.12010000.1 2008/05/15 07:09:28 lannapra ship $ */
3 /*
4 Constants
5 */
6 C_LOG_PROCESSED Constant varchar2(30) :='PROCESSED';
7 C_LOG_ERROR Constant varchar2(30) :='ERROR';
8 C_DEFAULT_STREAM_NAME Constant varchar2(30) := 'SINGLE_STREAM';
9 C_COLLECT Constant varchar2(10) := 'C';
10 C_PULL Constant varchar2(10) := 'P';
11 /* Debug */
12 C_DEBUG Constant varchar2(1) := 'N';
13 /* Bug# 4349618 To commit in Batches */
14 C_BATCH_SIZE Constant NUMBER := 30000;
15 /*
16 Process Types
17 */
18 C_SOURCE_TO_STAGE Constant NUMBER := 1;
19 C_SOURCE_TO_FACT Constant NUMBER := 2;
20 C_STAGE_TO_FACT Constant NUMBER := 3;
21 /* == */
22 g_level_pk_not_found varchar2(30):='%%^^)(::Error::%%^^)(';
23 /* Error Status of prog */
24 g_retcode varchar2(30);
25 g_errbuf varchar2(255);
26 /* */
27 /*
28 == Local Function/Procedures
29 */
30
31 Procedure insert_update_Into_Headers ( p_cs_definition_id in number,
32 p_cs_name in varchar2,
33 p_instance_id in number,
34 p_refresh_num in number);
35
36 Procedure Insert_Data_Into_Headers ( p_cs_definition_id in number,
37 p_cs_name in varchar2,
38 p_instance_id in number,
39 p_refresh_num in number);
40 Procedure Process_1_Sub (
41 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
42 p_cs_name in varchar2,
43 p_source_view in varchar2,
44 p_target_table in varchar2,
45 p_instance_id in number,
46 p_sql_stmt in varchar2,
47 p_new_refresh_num in NUMBER);
48
49 Function Build_SQL_Source(
50 p_cs_definition_id in number,
51 p_process_type in number,
52 p_instance_id in varchar2,
53 p_cs_name in varchar2) return varchar2;
54
55 Function Build_SQL_FOR_COLLECT_AND_VAL(
56 p_cs_definition_id in number,
57 p_process_type in number,
58 p_source_view in varchar2,
59 p_db_link in varchar2,
60 p_cs_name in varchar2) return varchar2;
61
62 Function Build_SQL_INS_AS_SELECT(
63 p_cs_definition_id in number,
64 p_instance_id in varchar2,
65 p_cs_name in varchar2,
66 p_source_view in varchar2,
67 p_db_link in varchar2) return varchar2;
68
69 Function Build_Where_Clause (
70 p_tokenized_where in varchar2,
71 p_default_where in varchar2,
72 p_parameter1 in varchar2,
73 p_parameter2 in varchar2,
74 p_parameter3 in varchar2,
75 p_parameter4 in varchar2,
76 p_parameter5 in varchar2,
77 p_parameter6 in varchar2,
78 p_parameter7 in varchar2,
79 p_parameter8 in varchar2,
80 p_parameter9 in varchar2,
81 p_parameter10 in varchar2,
82 p_request_id in number) return varchar2;
83
84 Procedure log_processed (
85 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
86 p_cs_rec in msd_cs_definitions_v1%rowtype,
87 p_cs_name in varchar2,
88 p_instance_id in varchar2,
89 p_source_view in varchar2,
90 p_target_table in varchar2);
91
92 Procedure log_error (
93 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
94 p_cs_rec in msd_cs_definitions_v1%rowtype,
95 p_cs_name in varchar2,
96 p_instance_id in varchar2,
97 p_error_message in varchar2,
98 p_source_view in varchar2,
99 p_target_table in varchar2);
100
101 Procedure upd_stage_error (p_pk_id in number, p_process_status in varchar2, p_error_mesg in varchar2);
102
103 Procedure Refresh_Target(p_process_type in varchar2,
104 p_cs_definition_id in number,
105 p_cs_name in varchar2,
106 p_comp_refresh in varchar2,
107 p_instance_id in number,
108 p_new_refresh_num in NUMBER);
109
110 Procedure ins_row_fact(
111 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
112 p_cs_rec in msd_cs_definitions_v1%rowtype,
113 p_cs_name in varchar2,
114 p_instance_id in varchar2,
115 p_new_refresh_num IN NUMBER);
116
117 Procedure ins_row_staging (
118 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
119 p_cs_rec in msd_cs_definitions_v1%rowtype,
120 p_cs_name in varchar2,
121 p_instance_id in varchar2,
122 p_process_status in varchar2,
123 p_error_message in varchar2);
124
125
126 Procedure cs_collect_post_process (
127 p_cs_Rec in msd_cs_definitions_v1%rowtype,
128 p_cs_name in varchar2,
129 p_instance_id in varchar2 );
130
131 Procedure Process_1 (
132 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
133 p_cs_name in varchar2,
134 p_db_link in varchar2,
135 p_source_view in varchar2,
136 p_target_table in varchar2,
137 p_process_type in number,
138 p_default_where in varchar2,
139 p_tokenized_where in varchar2,
140 p_comp_refresh in varchar2,
141 p_instance_id in number,
142 p_parameter1 in varchar2,
143 p_parameter2 in varchar2,
144 p_parameter3 in varchar2,
145 p_parameter4 in varchar2,
146 p_parameter5 in varchar2,
147 p_parameter6 in varchar2,
148 p_parameter7 in varchar2,
149 p_parameter8 in varchar2,
150 p_parameter9 in varchar2,
151 p_parameter10 in varchar2,
152 p_new_refresh_num IN NUMBER,
153 p_request_id in number);
154
155 Procedure Process_2 (
156 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
157 p_cs_name in varchar2,
158 p_db_link in varchar2,
159 p_source_view in varchar2,
160 p_target_table in varchar2,
161 p_process_type in number,
162 p_default_where in varchar2,
163 p_tokenized_where in varchar2,
164 p_comp_refresh in varchar2,
165 p_instance_id in number,
166 p_parameter1 in varchar2,
167 p_parameter2 in varchar2,
168 p_parameter3 in varchar2,
169 p_parameter4 in varchar2,
170 p_parameter5 in varchar2,
171 p_parameter6 in varchar2,
172 p_parameter7 in varchar2,
173 p_parameter8 in varchar2,
174 p_parameter9 in varchar2,
175 p_parameter10 in varchar2,
176 p_request_id in number);
177
178 Function get_level_pk (
179 p_instance in varchar2,
180 p_level_id in number,
181 p_sr_level_value_pk in OUT NOCOPY varchar2 ,
182 p_level_value in OUT NOCOPY varchar2,
183 p_level_value_pk in OUT NOCOPY varchar2) return varchar2;
184
185
186 Procedure show_line(p_sql in varchar2);
187
188 Procedure debug_line(p_sql in varchar2);
189
190 Function validate_record (
191 crec_data in out NOCOPY msd_cs_dfn_utl.g_typ_source_stream,
192 p_cs_rec in out NOCOPY msd_Cs_definitions_v1%rowtype,
193 p_instance_id in varchar2,
194 p_err_mesg out NOCOPY varchar2) return boolean;
195
196 Function Build_Designator_Where_Clause(
197 p_cs_rec in msd_cs_definitions_v1%rowtype,
198 p_process_type in varchar2,
199 p_cs_name in varchar2) return varchar2;
200
201 /* Logic Starts here */
202 Procedure Custom_Stream_Collection (
203 errbuf OUT NOCOPY varchar2,
204 retcode OUT NOCOPY varchar2,
205 p_collection_type in varchar2,
206 p_validate_data in varchar2,
207 p_definition_id in number,
208 p_cs_name in varchar2,
209 p_comp_refresh in varchar2,
210 p_instance_id in number,
211 p_parameter1 in varchar2,
212 p_parameter2 in varchar2,
213 p_parameter3 in varchar2,
214 p_parameter4 in varchar2,
215 p_parameter5 in varchar2,
216 p_parameter6 in varchar2,
217 p_parameter7 in varchar2,
218 p_parameter8 in varchar2,
219 p_parameter9 in varchar2,
220 p_parameter10 in varchar2,
221 p_request_id in number default 0) is
222
223
224 l_single_step_collection varchar2(30):='Y';
225
226 ll_name varchar2(80) := null;
227
228 cursor c_get_cs is
229 select * from msd_cs_definitions_v1
230 where
231 cs_definition_id = p_definition_id and
232 nvl(valid_flag, 'N') = 'Y';
233
234 l_sql_stmt varchar2(32767);
235 l_cs_rec msd_cs_definitions_v1%rowtype;
236 l_target varchar2(60);
237 l_source varchar2(60);
238 l_dblink varchar2(60);
239 l_default_where varchar2(200);
240 l_cs_name varchar2(255);
241 l_retcode VARCHAR2(30);
242 L_PROCESS_TYPE number;
243 l_conc_request_id number;
244
245 l_new_refresh_num NUMBER;
246 l_comp_refresh VARCHAR2(30);
247
248 Begin
249
250 select DESCRIPTION
251 into ll_name
252 from msd_cs_definitions
253 where cs_definition_id = p_definition_id;
254
255 show_line('Stream Name : ' || ll_name);
256
257 debug_line('In Custom Stream Collection');
258 /* Initialize */
259 errbuf := 'Program Completed with Success';
260 retcode := '0';
261
262 /* Get profile */
263 l_single_step_collection := nvl(fnd_profile.value('MSD_ONE_STEP_COLLECTION'), 'N');
264
265 l_conc_request_id := fnd_global.conc_request_id;
266 debug_line('Conc Reuquest ID : ' || l_conc_request_id);
267
268 /* Validate definition id */
269 open c_get_cs;
270 fetch c_get_cs into l_cs_rec;
271 if c_get_cs%notfound then
272 /* raise error */
273 retcode := 2;
274 errbuf := 'Custom Definition Not Found : ' || p_definition_id;
275 /* DWK close cursor */
276 close c_get_cs;
277 return;
278 end if;
279 /* DWK close cursor */
280 close c_get_cs;
281
282 if p_collection_type = C_COLLECT and l_cs_rec.source_view_name is null then
283 /* Print message/ raise error */
284 retcode := 2;
285 errbuf := 'Can not preform Collection - Source View is not specified.';
286 return;
287 end if;
288
289 if nvl(l_cs_rec.multiple_stream_flag, 'N') <> 'Y' then
290 l_cs_name := C_DEFAULT_STREAM_NAME;
291 else
292 l_cs_name := p_cs_name;
293 end if;
294 /* Instance must be specified for collection */
295 if p_collection_type = C_COLLECT and p_instance_id is null then
296 /* Print message/raise error */
297 retcode :=2;
298 errbuf := 'Instance must be specified';
299 return;
300 end if;
301 /*
302 Fetch database link only if stream source type is 'SOURCE'
303 */
304 if l_cs_rec.cs_type in ('SOURCE') and p_collection_type = C_COLLECT then
305 msd_common_utilities.get_db_link(p_instance_id, l_dblink, l_retcode);
306 if (l_retcode = -1) then
307 retcode := 2;
308 errbuf := 'Error while getting db_link';
309 return;
310 end if;
311 end if;
312
313 /*-------------- For Collection ----------------------------*/
314 IF ( p_collection_type = C_COLLECT ) THEN
315
316 /* Check and push setup parameters if it is not done so previously */
317 MSD_PUSH_SETUP_DATA.chk_push_setup( errbuf,
318 retcode,
319 p_instance_id);
320 IF (nvl(retcode, 0) <> 0) THEN
321 return;
322 END IF;
323
324 IF (l_single_step_collection = 'Y' and p_validate_data = 'Y') THEN
325 /* One Step Collection will be internally transformed into
326 Two Step Collection and Pull.
327 Set Source and Target
328 Set Global var for processing error record and marking processed */
329
330 /* Collect into staging without Validation. Validation will be done in PULL */
331 l_target := 'MSD_ST_CS_DATA';
332 l_process_type := C_SOURCE_TO_STAGE;
333 l_source := l_cs_rec.source_view_name;
334 /* Internally transformed 2 step collection, always performs complete
335 refresh for collection part */
336 l_comp_refresh := 'Y';
337
338 l_default_where := Build_Designator_Where_Clause( l_cs_rec,
339 l_process_type,
340 p_cs_name);
341 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
342 l_cs_name, l_comp_refresh, p_instance_id,
343 l_new_refresh_num);
344 Process_2(
345 p_cs_rec => l_cs_rec,
346 p_cs_name => l_cs_name,
347 p_db_link => l_dblink,
348 p_source_view => l_source,
349 p_target_table => l_target,
350 p_process_type => l_process_type,
351 p_default_where => l_default_where,
352 p_tokenized_where => l_cs_rec.collect_addtl_where_clause,
353 p_comp_refresh => l_comp_refresh,
354 p_instance_id => p_instance_id,
355 p_parameter1 => p_parameter1,
356 p_parameter2 => p_parameter2,
357 p_parameter3 => p_parameter3,
358 p_parameter4 => p_parameter4,
359 p_parameter5 => p_parameter5,
360 p_parameter6 => p_parameter6,
361 p_parameter7 => p_parameter7,
362 p_parameter8 => p_parameter8,
363 p_parameter9 => p_parameter9,
364 p_parameter10 => p_parameter10,
365 p_request_id => l_conc_request_id);
366
367 /* Custom Steam Collection Post Process
368 After data has been collected from source to staging */
369 cs_collect_post_process(l_cs_rec,
370 l_cs_name,
371 p_instance_id);
372
373 /* Pull
374 Set Source and Target
375 Set Global var for processing error record and marking processed */
376 l_target := 'MSD_CS_DATA';
377 l_source := 'MSD_ST_CS_DATA';
378 l_process_type := C_STAGE_TO_FACT;
379 l_default_where := Build_Designator_Where_Clause( l_cs_rec,
380 l_process_type,
381 p_cs_name);
382 /* Get a new seq number for pull part */
383 SELECT msd.msd_last_refresh_number_s.nextval into
384 l_new_refresh_Num from dual;
385
386 /* Refresh Target */
387 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
388 l_cs_name, p_comp_refresh, p_instance_id,
389 l_new_refresh_num);
390 Process_1(
391 p_cs_rec => l_cs_rec,
392 p_cs_name => l_cs_name,
393 p_db_link => l_dblink,
394 p_source_view => l_source,
395 p_process_type => C_STAGE_TO_FACT,
396 p_target_table => l_target,
397 p_default_where => l_default_where,
398 p_tokenized_where => NULL,
399 p_comp_refresh => p_comp_refresh,
400 p_instance_id => p_instance_id,
401 p_parameter1 => p_parameter1,
402 p_parameter2 => p_parameter2,
403 p_parameter3 => p_parameter3,
404 p_parameter4 => p_parameter4,
405 p_parameter5 => p_parameter5,
406 p_parameter6 => p_parameter6,
407 p_parameter7 => p_parameter7,
408 p_parameter8 => p_parameter8,
409 p_parameter9 => p_parameter9,
410 p_parameter10 => p_parameter10,
411 p_new_refresh_num => l_new_refresh_num,
412 p_request_id => l_conc_request_id);
413
414 ELSIF (l_single_step_collection = 'N' and p_validate_data = 'Y') THEN
415 /*
416 Set Source and Target
417 Set Global var for processing error record and marking processed
418 */
419 l_target := 'MSD_ST_CS_DATA';
420 l_process_type := C_SOURCE_TO_STAGE;
421 l_source := l_cs_rec.source_view_name;
422 l_default_where := Build_Designator_Where_Clause( l_cs_rec,
423 l_process_type,
424 p_cs_name);
425 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
426 l_cs_name, p_comp_refresh, p_instance_id,
427 l_new_refresh_num);
428 Process_1(
429 p_cs_rec => l_cs_rec,
430 p_cs_name => l_cs_name,
431 p_db_link => l_dblink,
432 p_source_view => l_source,
433 p_target_table => l_target,
434 p_process_type => l_process_type,
435 p_default_where => l_default_where,
436 p_tokenized_where => l_cs_rec.collect_addtl_where_clause,
437 p_comp_refresh => p_comp_refresh,
438 p_instance_id => p_instance_id,
439 p_parameter1 => p_parameter1,
440 p_parameter2 => p_parameter2,
441 p_parameter3 => p_parameter3,
442 p_parameter4 => p_parameter4,
443 p_parameter5 => p_parameter5,
444 p_parameter6 => p_parameter6,
445 p_parameter7 => p_parameter7,
446 p_parameter8 => p_parameter8,
447 p_parameter9 => p_parameter9,
448 p_parameter10 => p_parameter10,
449 p_new_refresh_num => l_new_refresh_num,
450 p_request_id => l_conc_request_id);
451
452 /* Custom Steam Collection Post Process
453 After data has been collected from source to staging
454 */
455 cs_collect_post_process(l_cs_rec,
456 l_cs_name,
457 p_instance_id);
458
459 ELSIF (l_single_step_collection = 'Y' and p_validate_data = 'N') THEN
460 /* Invalid Option. Raise Error */
461 retcode := 2;
462 errbuf := 'Invalid option - Single Step Collection must perform Validation';
463 return;
464 ELSIF (l_single_step_collection = 'N' and p_validate_data = 'N') THEN
465 /* Collect into staging without Validation */
466 l_target := 'MSD_ST_CS_DATA';
467 l_process_type := C_SOURCE_TO_STAGE;
468 l_source := l_cs_rec.source_view_name;
469 l_default_where := Build_Designator_Where_Clause( l_cs_rec,
470 l_process_type,
471 p_cs_name);
472 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
473 l_cs_name, p_comp_refresh, p_instance_id,
474 l_new_refresh_num);
475 Process_2(
476 p_cs_rec => l_cs_rec,
477 p_cs_name => l_cs_name,
478 p_db_link => l_dblink,
479 p_source_view => l_source,
480 p_target_table => l_target,
481 p_process_type => l_process_type,
482 p_default_where => l_default_where,
483 p_tokenized_where => l_cs_rec.collect_addtl_where_clause,
484 p_comp_refresh => p_comp_refresh,
485 p_instance_id => p_instance_id,
486 p_parameter1 => p_parameter1,
487 p_parameter2 => p_parameter2,
488 p_parameter3 => p_parameter3,
489 p_parameter4 => p_parameter4,
490 p_parameter5 => p_parameter5,
491 p_parameter6 => p_parameter6,
492 p_parameter7 => p_parameter7,
493 p_parameter8 => p_parameter8,
494 p_parameter9 => p_parameter9,
495 p_parameter10 => p_parameter10,
496 p_request_id => l_conc_request_id);
497
498 /* Custom Steam Collection Post Process
499 After data has been collected from source to staging
500 */
501 cs_collect_post_process(l_cs_Rec,
502 l_cs_name,
503 p_instance_id);
504
505
506 END IF; /* End of ELSE IF */
507
508 /*--------------------- For Pull ----------------------------*/
509 ELSIF (p_collection_type = C_PULL) THEN
510 IF p_validate_data = 'Y' THEN
511 /*
512 Set Source and Target
513 Set Global var for processing error record and marking processed
514 */
515 l_target := 'MSD_CS_DATA';
516 l_source := 'MSD_ST_CS_DATA';
517 l_process_type := C_STAGE_TO_FACT;
518 l_default_where := Build_Designator_Where_Clause(
519 l_cs_rec ,
520 l_process_type ,
521 p_cs_name );
522
523 /* Get a new seq number for PULL part */
524 SELECT msd.msd_last_refresh_number_s.nextval into
525 l_new_refresh_Num from dual;
526
527 /* Refresh Target */
528 Refresh_Target(l_process_type, l_cs_rec.cs_definition_id,
529 l_cs_name,p_comp_refresh, p_instance_id,
530 l_new_refresh_num);
531
532 Process_1(
533 p_cs_rec => l_cs_rec,
534 p_cs_name => l_cs_name,
535 p_db_link => l_dblink,
536 p_source_view => l_source,
537 p_process_type => C_STAGE_TO_FACT,
538 p_target_table => l_target,
539 p_default_where => l_default_where,
540 p_tokenized_where => NULL,
541 p_comp_refresh => p_comp_refresh,
542 p_instance_id => p_instance_id,
543 p_parameter1 => p_parameter1,
544 p_parameter2 => p_parameter2,
545 p_parameter3 => p_parameter3,
546 p_parameter4 => p_parameter4,
547 p_parameter5 => p_parameter5,
548 p_parameter6 => p_parameter6,
549 p_parameter7 => p_parameter7,
550 p_parameter8 => p_parameter8,
551 p_parameter9 => p_parameter9,
552 p_parameter10 => p_parameter10,
553 p_new_refresh_num => l_new_refresh_num,
554 p_request_id => l_conc_request_id);
555
556 ELSE
557 /* Invalid Option. Raise Error;*/
558 retcode := 2;
559 errbuf := 'Invalid option - Pull must perform Validation';
560 return;
561 END IF; /* End of p_validate_data = 'Y' */
562 END IF; /* End of Collect or Pull */
563
564
565 IF (l_target = 'MSD_CS_DATA') THEN
566 /* Delete cs fact rows that are not used by any demand plans */
567 MSD_TRANSLATE_FACT_DATA.clean_fact_data( errbuf,
568 retcode,
569 l_target);
570 END IF;
571
572
573 retcode := g_retcode;
574 errbuf := g_errbuf;
575
576 commit;
577
578 Exception
579 When others then
580 retcode := 2;
581 errbuf := substr( sqlerrm, 1, 255);
582 rollback;
583 End;
584
585 Procedure Process_1 (
586 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
587 p_cs_name in varchar2,
588 p_db_link in varchar2,
589 p_source_view in varchar2,
590 p_target_table in varchar2,
591 p_process_type in number,
592 p_default_where in varchar2,
593 p_tokenized_where in varchar2,
594 p_comp_refresh in varchar2,
595 p_instance_id in number,
596 p_parameter1 in varchar2,
597 p_parameter2 in varchar2,
598 p_parameter3 in varchar2,
599 p_parameter4 in varchar2,
600 p_parameter5 in varchar2,
601 p_parameter6 in varchar2,
602 p_parameter7 in varchar2,
603 p_parameter8 in varchar2,
604 p_parameter9 in varchar2,
605 p_parameter10 in varchar2,
606 p_new_refresh_num IN NUMBER,
607 p_request_id in number) is
608
609 TYPE cur_type is REF CURSOR;
610
611 l_cur cur_type;
612 l_rec msd_cs_dfn_utl.G_TYP_SOURCE_STREAM;
613 l_valid boolean;
614 l_err_msg varchar2(1000);
615 l_sql_stmt varchar2(5000);
616 l_where varchar2(3000);
617
618 Begin
619 debug_line('In Process_1');
620 /*
621 This process does following
622 1. Fetches data using cursor (source/staging)
623 2. Validates data
624 3. If Error
625 3.1 Mark/Save Erroneous data in staging
626 else
627 3.100 Save in Target (Fact/Staging)
628 3.101 Mark record Processed
629 end if
630 */
631
632 l_sql_stmt := Build_SQL_FOR_COLLECT_AND_VAL
633 (p_cs_rec.cs_definition_id, p_process_type,
634 p_source_view, p_db_link, p_cs_name);
635
636 l_sql_stmt := l_sql_stmt || ' WHERE 1 = 1';
637
638 l_where := build_where_clause(
639 p_tokenized_where ,
640 p_default_where ,
641 p_parameter1 ,
642 p_parameter2 ,
643 p_parameter3 ,
644 p_parameter4 ,
645 p_parameter5 ,
646 p_parameter6 ,
647 p_parameter7 ,
648 p_parameter8 ,
649 p_parameter9 ,
650 p_parameter10 ,
651 p_request_id );
652
653 if l_where is not null then
654 l_sql_stmt := l_sql_stmt || ' AND ' || l_where;
655 end if;
656
657 /* DWK. Do not include instace = 0 into fact table when we PULL data */
658 IF (p_process_type = C_STAGE_TO_FACT) THEN
659 l_sql_stmt := l_sql_stmt ||
660 ' AND ' || 'attribute_1 <> 0';
661 END IF;
662
663 debug_line('length for l_sql_stmt :' || length(l_sql_stmt));
664 debug_line('length for l_where :' || length(l_where));
665 debug_line('before debug line');
666 debug_line(l_sql_stmt);
667 debug_line('after debug line');
668
669 /* Use Dynamic SQL to fetch and process rows */
670 Process_1_Sub (
671 p_cs_rec ,
672 p_cs_name ,
673 p_source_view ,
674 p_target_table ,
675 p_instance_id ,
676 l_sql_stmt,
677 p_new_refresh_num);
678
679 /* Delete Successfully processed Staging rows if the process was Staging to Fact */
680 /* DWK Don't delete any row with instance = 0 */
681 /* Also, removed cs_name = p_cs_name condition from WHERE clause */
682
683 IF p_process_type = C_STAGE_TO_FACT THEN
684 delete from MSD_ST_CS_DATA
685 where
686 cs_definition_id = p_cs_rec.cs_definition_id and
687 process_Status = C_LOG_PROCESSED and
688 attribute_1 <> '0';
689 END IF;
690
691 Exception
692 When others then
693 show_line(sqlerrm);
694 raise;
695
696 End;
697
698
699 Procedure cs_collect_post_process (
700 p_cs_rec in msd_cs_definitions_v1%rowtype,
701 p_cs_name in varchar2,
702 p_instance_id in varchar2 ) is
703
704 cursor c1 is
705 select 'Y'
706 from msd_st_cs_data
707 where cs_definition_id = p_cs_rec.cs_definition_id
708 and cs_name = p_cs_name
709 and attribute_1 = p_instance_id
710 and attribute_49 = '1'
711 and rownum < 2;
712
713 l_exists varchar2(10):='N';
714
715 Begin
716
717 /* Is this Sales Forecast Stream */
718 if p_cs_rec.name in (
719 'MSD_SALES_FCST_BESTCASE', 'MSD_SALES_FCST_PIPELINE',
720 'MSD_SALES_FCST_REALISTIC', 'MSD_SALES_FCST_WGTPLINE',
721 'MSD_SALES_FCST_WORSTCASE' ) then
722
723 open c1;
724 fetch c1 into l_exists;
725 close c1;
726
727 If l_exists = 'Y' then
728 delete from msd_st_cs_data
729 where cs_definition_id = p_cs_Rec.cs_definition_id
730 and cs_name = p_cs_name
731 and attribute_1 = p_instance_id
732 and attribute_49 = '2';
733
734 end if;
735
736 end if;
737
738 /* Collect Current On-Hand Inventory data from ODS table for SOP data stream */
739
740 if p_cs_rec.name = 'MSD_ONHAND_INVENTORY' then
741
742 insert into msd_st_cs_data (
743 CS_ST_DATA_ID,
744 CS_DEFINITION_ID,
745 CS_NAME,
746 ATTRIBUTE_1,
747 ATTRIBUTE_2,
748 ATTRIBUTE_3,
749 ATTRIBUTE_6,
750 ATTRIBUTE_7,
751 ATTRIBUTE_10,
752 ATTRIBUTE_11,
753 ATTRIBUTE_34,
754 ATTRIBUTE_41,
755 ATTRIBUTE_43,
756 ATTRIBUTE_50,
757 ATTRIBUTE_51,
758 CREATION_DATE,
759 CREATED_BY,
760 LAST_UPDATE_DATE,
761 LAST_UPDATED_BY,
762 LAST_UPDATE_LOGIN
763 )
764 select msd_st_cs_data_s.nextval,
765 to_char(p_cs_rec.cs_definition_id),
766 'SINGLE_STREAM',
767 to_char(inv.sr_instance_id),
768 inv.prd_level_id,
769 inv.prd_sr_level_pk,
770 inv.geo_level_id,
771 inv.geo_sr_level_pk,
772 inv.org_level_id,
773 inv.org_sr_level_pk,
774 inv.time_level_id,
775 to_char(inv.quantity),
776 to_char(sysdate, 'YYYY/MM/DD'),
777 inv.dcs_level_id,
778 inv.dcs_sr_level_pk,
779 to_char(sysdate),
780 to_char(fnd_global.user_id),
781 to_char(sysdate),
782 to_char(fnd_global.user_id),
783 to_char(fnd_global.login_id)
784 from msd_curr_onhand_inventory_v inv
785 where inv.sr_instance_id = p_instance_id;
786
787 end if;
788
789 Exception
790 When others then
791 show_line(sqlerrm);
792 raise;
793 End;
794
795
796 Procedure log_error (
797 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
798 p_cs_rec in msd_cs_definitions_v1%rowtype,
799 p_cs_name in varchar2,
800 p_instance_id in varchar2,
801 p_error_message in varchar2,
802 p_source_view in varchar2,
803 p_target_table in varchar2) is
804 Begin
805 /*
806 Error Logging depends on source and target.
807 */
808 debug_line('In Log Error');
809 if (p_target_table = 'MSD_CS_DATA' and p_source_view <> 'MSD_ST_CS_DATA') or
810 (p_target_table = 'MSD_ST_CS_DATA') then
811 /*
812 if data is collected directly from source to Fact table or
813 data is collected into staging table then
814 insert erroneous row in staging table with Status "Error"
815 */
816 ins_row_staging(crec_data, p_cs_rec, p_cs_name,
817 nvl(p_instance_id, crec_data.instance),
818 C_LOG_ERROR,
819 p_error_message);
820 else
821 /* i.e. Data is Pulled from Staging to Fact. Then update the staging row
822 with Status 'invalid'
823 */
824 upd_stage_error(crec_data.pk_id, C_LOG_ERROR, p_error_message);
825 end if;
826
827 Exception
828 When others then
829 show_line(sqlerrm);
830 raise;
831
832 End;
833
834 Procedure upd_stage_error (p_pk_id in number, p_process_status in varchar2, p_error_mesg in varchar2) is
835 Begin
836 debug_line('In upd_stage_error');
837 update msd_st_cs_data
838 set
839 error_desc = p_error_mesg,
840 process_status = p_process_status
841 where cs_st_data_id = p_pk_id;
842
843 Exception
844 When others then
845 show_line(sqlerrm);
846 raise;
847
848 End;
849
850 Procedure log_processed (
851 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
852 p_cs_rec in msd_cs_definitions_v1%rowtype,
853 p_cs_name in varchar2,
854 p_instance_id in varchar2,
855 p_source_view in varchar2,
856 p_target_table in varchar2) is
857 Begin
858 debug_line('In log_processed');
859 /* Process Logging depends on source and target.
860 */
861 if (p_target_table = 'MSD_CS_DATA' and p_source_view <> 'MSD_ST_CS_DATA') or
862 (p_target_table = 'MSD_ST_CS_DATA') then
863 /*
864 if data is collected directly from source to Fact table or
865 data is collected into staging table then
866 Processing can not be logged or is not yet done
867 */
868 null;
869 else
870 /* i.e. Data is Pulled from Staging to Fact. Then update the staging row
871 with Status PROCESSED
872 */
873 upd_stage_error(crec_data.pk_id, C_LOG_PROCESSED, null);
874 end if;
875
876 if p_target_table = 'MSD_CS_DATA' then
877
878 null;
879 end if;
880
881 Exception
882 When others then
883 show_line(sqlerrm);
884 raise;
885
886 End;
887
888 Procedure ins_row_staging (
889 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
890 p_cs_rec in msd_cs_definitions_v1%rowtype,
891 p_cs_name in varchar2,
892 p_instance_id in varchar2,
893 p_process_status in varchar2,
894 p_error_message in varchar2) is
895 Begin
896 -- debug_line('In ins_row_staging');
897 insert into msd_st_cs_data
898 (cs_st_data_id, cs_definition_id, cs_name,
899 attribute_1, attribute_2, attribute_3, attribute_4,
900 attribute_5, attribute_6, attribute_7, attribute_8, attribute_9,
901 attribute_10, attribute_11, attribute_12, attribute_13,
902 attribute_14, attribute_15, attribute_16, attribute_17,
903 attribute_18, attribute_19, attribute_20, attribute_21, attribute_22,
904 attribute_23, attribute_24, attribute_25, attribute_26, attribute_27,
905 attribute_28, attribute_29, attribute_30, attribute_31,
906 attribute_32, attribute_33, attribute_34, attribute_35, attribute_36,
907 attribute_37, attribute_38, attribute_39, attribute_40,
908 attribute_41, attribute_42, attribute_43, attribute_44, attribute_45,
909 attribute_46, attribute_47, attribute_48, attribute_49,
910 attribute_50, attribute_51, attribute_52, attribute_53, attribute_54,
911 attribute_55, attribute_56, attribute_57, attribute_58, attribute_59,
912 attribute_60,
913 process_status, error_desc,
914 created_by, creation_date, last_update_date, last_updated_by, last_update_login
915 )
916 values
917 /* Fix for designator name crec_data.designator instead of p_cs_name */
918 (msd_st_cs_data_s.nextval, p_cs_rec.cs_definition_id, crec_data.designator,
919 p_instance_id,
920 crec_data.prd_level_id, crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk,
921 crec_data.geo_level_id, crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk,
922 crec_data.org_level_id, crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk,
923 crec_data.prd_parent_level_id, crec_data.prd_parent_sr_level_value_pk,
924 crec_data.prd_parent_level_value, crec_data.prd_parent_level_value_pk,
925 crec_data.rep_level_id, crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk,
926 crec_data.chn_level_id, crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk,
927 crec_data.ud1_level_id, crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk,
928 crec_data.ud2_level_id, crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk,
929 crec_data.tim_level_id, crec_data.attribute_35, crec_data.attribute_36, crec_data.attribute_37,
930 crec_data.attribute_38, crec_data.attribute_39, crec_data.attribute_40, crec_data.attribute_41,
931 crec_data.attribute_42, crec_data.attribute_43, crec_data.attribute_44, crec_data.attribute_45,
932 crec_data.attribute_46, crec_data.attribute_47, crec_data.attribute_48, crec_data.attribute_49,
933 crec_data.dcs_level_id, crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk,
934 crec_data.attribute_54, crec_data.attribute_55, crec_data.attribute_56, crec_data.attribute_57,
935 crec_data.attribute_58, crec_data.attribute_59, crec_data.attribute_60,
936 p_process_status, p_error_message,
937 fnd_global.user_id, sysdate, sysdate, fnd_global.user_id, fnd_global.login_id);
938
939 Exception
940 When others then
941 show_line(sqlerrm);
942 raise;
943
944 End;
945
946 Procedure ins_row_fact(
947 crec_data in msd_cs_dfn_utl.g_typ_source_stream,
948 p_cs_rec in msd_cs_definitions_v1%rowtype,
949 p_cs_name in varchar2,
950 p_instance_id in varchar2,
951 p_new_refresh_num in NUMBER) is
952 Begin
953 debug_line('In ins_row_fact');
954 insert into msd_cs_data
955 (cs_data_id, cs_definition_id, cs_name,
956 attribute_1, attribute_2, attribute_3, attribute_4,
957 attribute_5, attribute_6, attribute_7, attribute_8, attribute_9,
958 attribute_10, attribute_11, attribute_12, attribute_13,
959 attribute_14, attribute_15, attribute_16, attribute_17,
960 attribute_18, attribute_19, attribute_20, attribute_21, attribute_22,
961 attribute_23, attribute_24, attribute_25, attribute_26, attribute_27,
962 attribute_28, attribute_29, attribute_30, attribute_31,
963 attribute_32, attribute_33, attribute_34, attribute_35, attribute_36,
964 attribute_37, attribute_38, attribute_39, attribute_40,
965 attribute_41, attribute_42, attribute_43, attribute_44, attribute_45,
966 attribute_46, attribute_47, attribute_48, attribute_49,
967 attribute_50, attribute_51, attribute_52, attribute_53, attribute_54,
968 attribute_55, attribute_56, attribute_57, attribute_58, attribute_59,
969 attribute_60,
970 created_by, creation_date, last_update_date, last_updated_by,last_update_login,
971 created_by_refresh_num, last_refresh_num, action_code)
972 values
973 /* Fix for designator name crec_data.designator instead of p_cs_name */
974 (msd_cs_data_s.nextval, p_cs_rec.cs_definition_id, crec_data.designator ,
975 p_instance_id,
976 crec_data.prd_level_id, crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk,
977 crec_data.geo_level_id, crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk,
978 crec_data.org_level_id, crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk,
979 crec_data.prd_parent_level_id, crec_data.prd_parent_sr_level_value_pk,
980 crec_data.prd_parent_level_value, crec_data.prd_parent_level_value_pk,
981 crec_data.rep_level_id, crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk,
982 crec_data.chn_level_id, crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk,
983 crec_data.ud1_level_id, crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk,
984 crec_data.ud2_level_id, crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk,
985 crec_data.tim_level_id, crec_data.attribute_35, crec_data.attribute_36, crec_data.attribute_37,
986 crec_data.attribute_38, crec_data.attribute_39, crec_data.attribute_40, crec_data.attribute_41,
987 crec_data.attribute_42, crec_data.attribute_43, crec_data.attribute_44, crec_data.attribute_45,
988 crec_data.attribute_46, crec_data.attribute_47, crec_data.attribute_48, crec_data.attribute_49,
989 crec_data.dcs_level_id, crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk,
990 crec_data.attribute_54, crec_data.attribute_55, crec_data.attribute_56, crec_data.attribute_57,
991 crec_data.attribute_58, crec_data.attribute_59, crec_data.attribute_60,
992 fnd_global.user_id, sysdate, sysdate, fnd_global.user_id,fnd_global.login_id,
993 p_new_refresh_num, p_new_refresh_num, 'I');
994
995 Exception
996 When others then
997 show_line(sqlerrm);
998 raise;
999
1000 End;
1001
1002 Procedure Process_2 (
1003 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
1004 p_cs_name in varchar2,
1005 p_db_link in varchar2,
1006 p_source_view in varchar2,
1007 p_target_table in varchar2,
1008 p_process_type in number,
1009 p_default_where in varchar2,
1010 p_tokenized_where in varchar2,
1011 p_comp_refresh in varchar2,
1012 p_instance_id in number,
1013 p_parameter1 in varchar2,
1014 p_parameter2 in varchar2,
1015 p_parameter3 in varchar2,
1016 p_parameter4 in varchar2,
1017 p_parameter5 in varchar2,
1018 p_parameter6 in varchar2,
1019 p_parameter7 in varchar2,
1020 p_parameter8 in varchar2,
1021 p_parameter9 in varchar2,
1022 p_parameter10 in varchar2,
1023 p_request_id in number) is
1024
1025 l_ins_stmt varchar2(32767);
1026 l_where varchar2(500);
1027
1028
1029 Begin
1030 debug_line('In Process_2');
1031 /*
1032 This Procedure inserts data in staging table from source view
1033 without performing any validation.
1034 */
1035
1036
1037 l_ins_stmt := Build_SQL_INS_AS_SELECT(
1038 p_cs_definition_id => p_cs_rec.cs_definition_id,
1039 p_instance_id => p_instance_id,
1040 p_cs_name => p_cs_name,
1041 p_source_view => p_source_view,
1042 p_db_link => p_db_link);
1043
1044
1045 l_where := build_where_clause(
1046 p_tokenized_where ,
1047 p_default_where ,
1048 p_parameter1 ,
1049 p_parameter2 ,
1050 p_parameter3 ,
1051 p_parameter4 ,
1052 p_parameter5 ,
1053 p_parameter6 ,
1054 p_parameter7 ,
1055 p_parameter8 ,
1056 p_parameter9 ,
1057 p_parameter10 ,
1058 p_request_id );
1059
1060
1061 if l_where is not null then
1062 l_ins_stmt := l_ins_stmt || ' where ' || l_where;
1063 end if;
1064
1065
1066
1067 /* Execute SQL */
1068 debug_line(l_ins_stmt);
1069
1070 Execute immediate l_ins_stmt;
1071
1072 Exception
1073 When others then
1074 show_line(sqlerrm);
1075 raise;
1076
1077 End;
1078
1079 Function Build_SQL_FOR_COLLECT_AND_VAL(
1080 p_cs_definition_id in number,
1081 p_process_type in number,
1082 p_source_view in varchar2,
1083 p_db_link in varchar2,
1084 p_cs_name in varchar2)
1085 return varchar2 is
1086
1087 l_sql_stmt varchar2(32767);
1088 Begin
1089 debug_line('In Build_SQL_FOR_COLLECT_AND_VAL');
1090 /*
1091 This method will be used in the following cases
1092 Source ------ Staging (Process - Collect (Single Step = 'N'). Validation - 'Y')
1093 Source ------ Fact (Process - Collect (Single Step = 'Y'). Validation - 'Y')
1094 Staging ------ Fact (Process - Pull (Single Step = N/A). Validation - 'Y')
1095 */
1096 l_sql_stmt := Build_SQL_Source(p_cs_definition_id, p_process_type, NULL, p_cs_name);
1097 /*
1098 Append data specific to Single Step needs
1099 */
1100 if p_source_view = 'MSD_ST_CS_DATA' then
1101 l_sql_stmt := 'Select cs_st_data_id PK_ID, ' || l_sql_stmt || ' from ' || p_source_view ;
1102 else
1103 l_sql_stmt := 'Select null pk_id, ' || l_sql_stmt || ' from ' || p_source_view || p_db_link;
1104 end if;
1105
1106 return l_sql_stmt;
1107
1108
1109 Exception
1110 When others then
1111 show_line(sqlerrm);
1112 raise;
1113
1114 End;
1115
1116 Function Build_SQL_INS_AS_SELECT(
1117 p_cs_definition_id in number,
1118 p_instance_id in varchar2,
1119 p_cs_name in varchar2,
1120 p_source_view in varchar2,
1121 p_db_link in varchar2) return varchar2 is
1122
1123 l_sql_stmt varchar2(32767);
1124 Begin
1125 debug_line('In Build_SQL_INS_AS_SELECT');
1126 /*
1127 This method will be used in the following cases
1128 Source ------ Staging (Process - Collect (Single Step = 'N'). Validation - 'N')
1129 */
1130 l_sql_stmt := Build_SQL_Source(p_cs_definition_id, C_SOURCE_TO_STAGE,
1131 p_instance_id, p_cs_name);
1132
1133 /* DWK Move cs_name from top to at the bottom of insert statement since
1134 l_sql_stmt will have forecast_designator inside. */
1135 l_sql_stmt := 'Insert into MSD_ST_CS_DATA (cs_st_data_id , cs_definition_id, ' ||
1136 'attribute_1, attribute_2, attribute_3, attribute_4, attribute_5, ' ||
1137 'attribute_6, attribute_7, attribute_8, attribute_9, attribute_10,' ||
1138 'attribute_11, attribute_12, attribute_13, attribute_14, attribute_15, ' ||
1139 'attribute_16, attribute_17, attribute_18, attribute_19, attribute_20, ' ||
1140 'attribute_21, attribute_22, attribute_23, attribute_24, attribute_25, ' ||
1141 'attribute_26, attribute_27, attribute_28, attribute_29, attribute_30,' ||
1142 'attribute_31, attribute_32, attribute_33, attribute_34, attribute_35, ' ||
1143 'attribute_36, attribute_37, attribute_38, attribute_39, attribute_40,' ||
1144 'attribute_41, attribute_42, attribute_43, attribute_44, attribute_45, ' ||
1145 'attribute_46, attribute_47, attribute_48, attribute_49, attribute_50, ' ||
1146 'attribute_51, attribute_52, attribute_53, attribute_54, attribute_55, ' ||
1147 'attribute_56, attribute_57, attribute_58, attribute_59, attribute_60,' ||
1148 'cs_name ) ' || ' select ' || 'msd_st_cs_Data_s.nextval, ' || p_cs_definition_id ||
1149 ', ' || l_sql_stmt || ' from ' || p_source_view || p_db_link;
1150
1151 return l_sql_stmt;
1152
1153
1154 Exception
1155 When others then
1156 show_line(sqlerrm);
1157 raise;
1158
1159 End;
1160
1161 Function Build_SQL_Source(
1162 p_cs_definition_id in number,
1163 p_process_type in number,
1164 p_instance_id in varchar2,
1165 p_cs_name in varchar2) return varchar2 is
1166
1167 Type l_type_sql_struct is RECORD (
1168 tabcol_name varchar2(60),
1169 srccol_name varchar2(60));
1170 Type l_type_sql_struct_array is TABLE of l_type_sql_struct;
1171
1172 CURSOR c1 IS
1173 select * from msd_cs_defn_column_dtls_v
1174 where cs_definition_id = p_cs_definition_id;
1175
1176 CURSOR c_multi_stream IS
1177 SELECT multiple_stream_flag FROM msd_cs_definitions
1178 WHERE cs_definition_id = p_cs_definition_id;
1179
1180 CURSOR c_cs_name IS
1181 select source_view_column_name
1182 from msd_cs_defn_column_dtls_v
1183 where cs_definition_id = p_cs_definition_id and
1184 table_column = 'CS_NAME';
1185
1186 l_struct l_type_sql_struct_array;
1187 l_sql_stmt varchar2(32767);
1188 l_multi_stream VARCHAR2(30);
1189
1190 /*
1191 Function conv_to_sql_struct (a in varchar2, b in varchar2)
1192 return l_type_sql_struct is
1193 x l_type_sql_struct;
1194 Begin
1195 x.tabcol_name := a;
1196 x.srccol_name := b;
1197 return x;
1198 End;
1199 */
1200
1201 Begin
1202 debug_line('In Build_SQL_Source');
1203 /* p_source_or_stage = 0 menas build select for source view,
1204 p_source_or_stage = non 0 menas build select for staging */
1205 /* Initialize array */
1206
1207 l_struct := l_type_sql_struct_array(null);
1208
1209 /* Build Array with default values - Table_column_name is 'cs_name, attribute_1' ...
1210 and source_column_name is 'NULL' */
1211
1212 for i in 1..60 loop
1213 l_struct.extend;
1214 l_struct(i).tabcol_name := 'ATTRIBUTE_' || i;
1215 l_struct(i).srccol_name := 'NULL';
1216 end loop;
1217 l_struct.extend;
1218 l_struct(61).tabcol_name := 'CS_NAME';
1219
1220 /* Fetch source column name from the mappings table and update the array */
1221 for c1_rec in c1 loop
1222
1223 for i IN 1..61 loop
1224 if l_struct(i).tabcol_name = c1_rec.table_column then
1225 if c1_rec.identifier_type = 'INSTANCE' then
1226 if p_instance_id is not null then
1227 l_struct(i).srccol_name := '''' || p_instance_id || '''';
1228 end if;
1229 elsif c1_rec.identifier_type = 'DATE' then
1230 if c1_rec.source_view_column_name is not null then
1231 l_struct(i).srccol_name := 'to_char(' ||c1_rec.source_view_column_name || ', ''YYYY/MM/DD'')';
1232 end if;
1233 else
1234 if c1_rec.source_view_column_name is not null then
1235 l_struct(i).srccol_name := c1_rec.source_view_column_name ;
1236 end if;
1237 end if;
1238 exit;
1239 end if;
1240 end loop;
1241 end loop;
1242
1243 /* DWK If this stream is multiple stream and there is no column mapping
1244 for CS_NAME then, assume user will populate the CS_NAME
1245 from Collection */
1246
1247 OPEN c_multi_stream;
1248 FETCH c_multi_stream INTO l_multi_stream;
1249 CLOSE c_multi_stream;
1250
1251 IF nvl(l_multi_stream, 'N') = 'Y' THEN
1252 /* After column mapping, if source column for CS_NAME is still null
1253 then assume user will populate the CS_NAME from Collection */
1254
1255 IF ( l_struct(61).srccol_name IS NULL ) THEN
1256 l_struct(61).srccol_name := '''' || replace(p_cs_name, '''', '''''') || '''';
1257 END IF;
1258
1259 ELSE /* Single stream */
1260 l_struct(61).srccol_name := '''' || C_DEFAULT_STREAM_NAME || '''' ;
1261 END IF;
1262
1263 /* Builds SQL stmt */
1264 for i in 1..61 loop
1265
1266 if p_process_type in (C_SOURCE_TO_FACT, C_SOURCE_TO_STAGE) then
1267 if l_sql_stmt is null then
1268 l_sql_stmt := l_sql_stmt || l_struct(i).srccol_name;
1269 else
1270 l_sql_stmt := l_sql_stmt || ', ' || l_struct(i).srccol_name;
1271 end if;
1272 else
1273 /* staging to fact (assumption always for Validate = 'Yes'
1274 append staging table column name + column alias ("Source view name")
1275 */
1276 if l_sql_stmt is null then
1277 l_sql_stmt := l_sql_stmt || l_struct(i).tabcol_name;
1278 else
1279 l_sql_stmt := l_sql_stmt || ', ' || l_struct(i).tabcol_name ;
1280 end if;
1281 end if;
1282
1283 end loop;
1284
1285 debug_line('l_sql_stmt : ' || l_sql_stmt);
1286 return l_sql_stmt;
1287
1288 Exception
1289 When others then
1290 show_line(sqlerrm);
1291 raise;
1292
1293 End;
1294
1295 Procedure show_line(p_sql in varchar2) is
1296 i number:=1;
1297 Begin
1298 while i <= length(p_sql)
1299 loop
1300 -- dbms_output.put_line (substr(p_sql, i, 255));
1301 fnd_file.put_line(fnd_file.log,substr(p_sql, i, 255));
1302 null;
1303 i := i+255;
1304 end loop;
1305 End;
1306
1307 Function validate_record (
1308 crec_data in out NOCOPY msd_cs_dfn_utl.g_typ_source_stream,
1309 p_cs_rec in out NOCOPY msd_Cs_definitions_v1%rowtype,
1310 p_instance_id in varchar2,
1311 p_err_mesg out NOCOPY varchar2) return boolean is
1312
1313 l_comments1 varchar2(1000);
1314 l_comments2 varchar2(1000);
1315 l_first_record boolean:=TRUE;
1316 l_dummy_date date;
1317 l_dummy_number number;
1318
1319 l_prd_found varchar2(30);
1320 l_prd_parent_found varchar2(30);
1321 l_geo_found varchar2(30);
1322 l_org_found varchar2(30);
1323 l_chn_found varchar2(30);
1324 l_rep_found varchar2(30);
1325 l_ud1_found varchar2(30);
1326 l_ud2_found varchar2(30);
1327 l_tim_found varchar2(30);
1328 l_dcs_found varchar2(30);
1329
1330 l_count number(2) := 0;
1331
1332 Begin
1333
1334 /* crec_data -> actual record that you want to validate(ex, rows in staging table)
1335 p_cs_rec -> information in custom stream definition
1336 */
1337
1338 -- debug_line('In validate_record');
1339 /* Get Product LEVEL_PK */
1340 if nvl(p_cs_rec.prd_level_collect_flag, 'N') = 'Y' then
1341 l_prd_found := get_level_pk(p_instance_id, crec_data.prd_level_id,
1342 crec_data.prd_sr_level_value_pk, crec_data.prd_level_value, crec_data.prd_level_value_pk);
1343
1344 IF ( crec_data.prd_parent_sr_level_value_pk IS NOT NULL or
1345 crec_data.prd_parent_level_value IS NOT NULL or
1346 crec_data.prd_parent_level_value_pk IS NOT NULL ) THEN
1347 /* DWK Get Product Dimension's Parent LEVEL_PK */
1348 l_prd_parent_found := get_level_pk(p_instance_id, crec_data.prd_parent_level_id,
1349 crec_data.prd_parent_sr_level_value_pk,
1350 crec_data.prd_parent_level_value,
1351 crec_data.prd_parent_level_value_pk);
1352
1353 ELSE /* IF there is no parent item then make it null */
1354 crec_data.prd_parent_level_id := NULL;
1355 END IF;
1356 end if;
1357 /* Get ORG LEVEL_PK */
1358 if nvl(p_cs_rec.org_level_collect_flag, 'N') = 'Y' then
1359 l_org_found := get_level_pk(p_instance_id, crec_data.org_level_id,
1360 crec_data.org_sr_level_value_pk, crec_data.org_level_value, crec_data.org_level_value_pk);
1361 end if;
1362 /* Get Geo LEVEL_PK */
1363 if nvl(p_cs_rec.geo_level_collect_flag, 'N') = 'Y' then
1364 l_geo_found := get_level_pk(p_instance_id, crec_data.geo_level_id,
1365 crec_data.geo_sr_level_value_pk, crec_data.geo_level_value, crec_data.geo_level_value_pk);
1366 end if;
1367
1368 /* Get CHN LEVEL_PK */
1369 if nvl(p_cs_rec.chn_level_collect_flag, 'N') = 'Y' then
1370 l_chn_found := get_level_pk(p_instance_id, crec_data.chn_level_id,
1371 crec_data.chn_sr_level_value_pk, crec_data.chn_level_value, crec_data.chn_level_value_pk);
1372 end if;
1373 /* Get REP LEVEL_PK */
1374 if nvl(p_cs_rec.rep_level_collect_flag, 'N') = 'Y' then
1375 l_rep_found := get_level_pk(p_instance_id, crec_data.rep_level_id,
1376 crec_data.rep_sr_level_value_pk, crec_data.rep_level_value, crec_data.rep_level_value_pk);
1377 end if;
1378 /* Get UD1 LEVEL_PK */
1379 if nvl(p_cs_rec.ud1_level_collect_flag, 'N') = 'Y' then
1380 l_ud1_found := get_level_pk(p_instance_id, crec_data.ud1_level_id,
1381 crec_data.ud1_sr_level_value_pk, crec_data.ud1_level_value, crec_data.ud1_level_value_pk);
1382 end if;
1383 /* Get UD2 LEVEL_PK */
1384 if nvl(p_cs_rec.ud2_level_collect_flag, 'N') = 'Y' then
1385 l_ud2_found := get_level_pk(p_instance_id, crec_data.ud2_level_id,
1386 crec_data.ud2_sr_level_value_pk, crec_data.ud2_level_value, crec_data.ud2_level_value_pk);
1387 end if;
1388
1389 /* Get Demand Class LEVEL_PK */
1390 if nvl(p_cs_rec.dcs_level_collect_flag, 'N') = 'Y' then
1391 l_dcs_found := get_level_pk(p_instance_id, crec_data.dcs_level_id,
1392 crec_data.dcs_sr_level_value_pk, crec_data.dcs_level_value, crec_data.dcs_level_value_pk);
1393 end if;
1394
1395 select
1396 decode(l_prd_found, g_level_pk_not_found, 'PRD ', null) ||
1397 /* DWK Check level pk of parent item for dependent demand data */
1398 decode(l_prd_parent_found, g_level_pk_not_found, 'PRD_PARENT ', null) ||
1399 decode(l_org_found, g_level_pk_not_found, 'ORG ', null) ||
1400 decode(l_geo_found, g_level_pk_not_found, 'GEO ', null) ||
1401 decode(l_chn_found, g_level_pk_not_found, 'CHN ', null) ||
1402 decode(l_rep_found, g_level_pk_not_found, 'REP ', null) ||
1403 decode(l_ud1_found, g_level_pk_not_found, 'UD1 ', null) ||
1404 decode(l_ud2_found, g_level_pk_not_found, 'UD2 ', null) ||
1405 decode(l_dcs_found, g_level_pk_not_found, 'DCS ', null)
1406 into
1407 l_comments2
1408 from
1409 dual;
1410
1411 /* Level validation */
1412
1413 if nvl(p_cs_rec.strict_flag, 'N') = 'Y' then
1414 /* if level_id is not defined at the definition level then the level_id
1415 of first record fetched will be used for validation
1416 */
1417 if l_first_record then
1418
1419 l_first_record := FALSE;
1420 /* New */
1421 if p_cs_rec.prd_level_id is null and nvl(p_cs_rec.prd_level_collect_flag, 'N') = 'Y' then
1422 p_cs_rec.prd_level_id := crec_data.prd_level_id;
1423 end if;
1424
1425 if p_cs_rec.org_level_id is null and nvl(p_cs_rec.org_level_collect_flag, 'N') = 'Y' then
1426 p_cs_rec.org_level_id := crec_data.org_level_id;
1427 end if;
1428
1429 if p_cs_rec.geo_level_id is null and nvl(p_cs_rec.geo_level_collect_flag, 'N') = 'Y' then
1430 p_cs_rec.geo_level_id := crec_data.geo_level_id;
1431 end if;
1432
1433 if p_cs_rec.chn_level_id is null and nvl(p_cs_rec.chn_level_collect_flag, 'N') = 'Y' then
1434 p_cs_rec.chn_level_id := crec_data.chn_level_id;
1435 end if;
1436
1437 if p_cs_rec.rep_level_id is null and nvl(p_cs_rec.rep_level_collect_flag, 'N') = 'Y' then
1438 p_cs_rec.rep_level_id := crec_data.rep_level_id;
1439 end if;
1440
1441 if p_cs_rec.ud1_level_id is null and nvl(p_cs_rec.ud1_level_collect_flag, 'N') = 'Y' then
1442 p_cs_rec.ud1_level_id := crec_data.ud1_level_id;
1443 end if;
1444
1445 if p_cs_rec.ud2_level_id is null and nvl(p_cs_rec.ud2_level_collect_flag, 'N') = 'Y' then
1446 p_cs_rec.ud2_level_id := crec_data.ud2_level_id;
1447 end if;
1448
1449 if p_cs_rec.tim_level_id is null and nvl(p_cs_rec.tim_level_collect_flag, 'N') = 'Y' then
1450 /* Attribute_34 is tim_level_id */
1451 p_cs_rec.tim_level_id := crec_data.tim_level_id;
1452 end if;
1453
1454 if p_cs_rec.dcs_level_id is null and nvl(p_cs_rec.dcs_level_collect_flag, 'N') = 'Y' then
1455 p_cs_rec.dcs_level_id := crec_data.dcs_level_id;
1456 end if;
1457
1458 end if;
1459
1460 Select
1461 decode(crec_data.prd_level_id,
1462 null, decode(p_cs_rec.prd_level_collect_flag,
1463 'Y', 'PRD ',
1464 null),
1465 p_cs_rec.prd_level_id, null,
1466 'PRD ') ||
1467 /* DWK IF dependent demand data are collected, its parents level id should be 1 */
1468 decode(nvl(crec_data.prd_parent_level_id, '1'), '1', null, 'PRD_PARENT ') ||
1469 decode(crec_data.org_level_id,
1470 null, decode(p_cs_rec.org_level_collect_flag,
1471 'Y', 'ORG ',
1472 null),
1473 p_cs_rec.org_level_id, null,
1474 'ORG ') ||
1475 decode(crec_data.geo_level_id,
1476 null, decode(p_cs_rec.geo_level_collect_flag,
1477 'Y', 'GEO ',
1478 null),
1479 p_cs_rec.geo_level_id, null,
1480 'GEO ') ||
1481 decode(crec_data.rep_level_id,
1482 null, decode(p_cs_rec.rep_level_collect_flag,
1483 'Y', 'REP ',
1484 null),
1485 p_cs_rec.rep_level_id, null,
1486 'REP ') ||
1487 decode(crec_data.chn_level_id,
1488 null, decode(p_cs_rec.chn_level_collect_flag,
1489 'Y', 'CHN ',
1490 null),
1491 p_cs_rec.chn_level_id, null,
1492 'CHN ') ||
1493 decode(crec_data.ud1_level_id,
1494 null, decode(p_cs_rec.ud1_level_collect_flag,
1495 'Y', 'UD1 ',
1496 null),
1497 p_cs_rec.ud1_level_id, null,
1498 'UD1 ') ||
1499 decode(crec_data.ud2_level_id,
1500 null, decode(p_cs_rec.ud2_level_collect_flag,
1501 'Y', 'UD2 ',
1502 null),
1503 p_cs_rec.ud2_level_id, null,
1504 'UD2 ') ||
1505 decode(crec_data.tim_level_id,
1506 null, decode(p_cs_rec.tim_level_collect_flag,
1507 'Y', 'TIM ',
1508 null),
1509 p_cs_rec.tim_level_id, null,
1510 'TIM ') ||
1511 decode(crec_data.dcs_level_id,
1512 null, decode(p_cs_rec.dcs_level_collect_flag,
1513 'Y', 'DCS ',
1514 null),
1515 p_cs_rec.dcs_level_id, null,
1516 'DCS ' )
1517 into
1518 l_comments1
1519 from dual;
1520 ELSE /* p_cs_rec.strict_flag = 'N' */
1521
1522 /* Check whether that time level id exists in fnd lookup or not */
1523
1524 IF ( nvl(p_cs_rec.tim_level_collect_flag, 'N') = 'Y') THEN
1525 select count(*) into l_count
1526 from fnd_lookup_values
1527 where lookup_type = 'MSD_PERIOD_TYPE' and
1528 nvl(crec_data.tim_level_id, '999.99') = lookup_code and
1529 rownum <= 1;
1530
1531 IF ( l_count < 1 ) THEN
1532 select 'TIM' into l_comments1 from dual;
1533 END IF;
1534 END IF;
1535
1536 END IF;
1537
1538
1539
1540 /* MSD_CS_DATALOAD_INVALID_LVLID - Invalid Level ID for Dimensions */
1541 /* MSD_CS_DATALOAD_INVALID_DIM - Invalid Dimensions */
1542 select decode(l_comments2, null, null, 'MSD_CS_DATALOAD_INVALID_DIM : ' || l_comments2) ||
1543 decode(l_comments1, null, null, 'MSD_CS_DATALOAD_INVALID_LVLID : ' || l_comments1)
1544 into p_err_mesg
1545 from dual;
1546
1547 /* Validate Date Format */
1548 Begin
1549 select to_date(crec_data.attribute_43, 'YYYY/MM/DD')
1550 into l_dummy_date
1551 from dual;
1552 Exception
1553 When others then
1554 p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_DATE_FORMAT : ATTRIBUTE_43';
1555 End;
1556
1557 /* Validate Amount Number Format */
1558 Begin
1559 -- Check Amount
1560 if (p_cs_rec.measurement_type in (1,3,4)) then
1561 l_dummy_number := crec_data.attribute_42;
1562 end if;
1563 Exception
1564 When others then
1565 p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_42';
1566 End;
1567
1568 /* Validate Quantity Number Format */
1569 Begin
1570 -- Check Quantity
1571 if (p_cs_rec.measurement_type in (2,4,5)) then
1572 l_dummy_number := crec_data.attribute_41;
1573 end if;
1574 Exception
1575 When others then
1576 p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_41';
1577 End;
1578
1579 /* Validate Price Number Format */
1580 Begin
1581 -- Check Price
1582 if (p_cs_rec.measurement_type in (3,5)) then
1583 l_dummy_number := crec_data.attribute_44;
1584 end if;
1585 Exception
1586 When others then
1587 p_err_mesg := p_err_mesg || ' MSD_CS_DATALOAD_INVALID_NUMBER_FORMAT : ATTRIBUTE_44';
1588 End;
1589
1590 if p_err_mesg is null then
1591 return TRUE;
1592 else
1593 return FALSE;
1594 end if;
1595
1596 Exception
1597 When others then
1598 show_line(sqlerrm);
1599 raise;
1600
1601 End;
1602
1603 Function get_level_pk (
1604 p_instance in varchar2,
1605 p_level_id in number,
1606 p_sr_level_value_pk in OUT NOCOPY varchar2,
1607 p_level_value in OUT NOCOPY varchar2,
1608 p_level_value_pk in OUT NOCOPY varchar2) return varchar2 is
1609
1610 Cursor c1 is
1611 select level_pk, level_value
1612 from
1613 msd_level_values
1614 where
1615 instance = p_instance and
1616 level_id = p_level_id and
1617 sr_level_pk = p_sr_level_value_pk;
1618
1619 Cursor c2 is
1620 select level_pk
1621 from
1622 msd_level_values
1623 where
1624 instance = p_instance and
1625 level_id = p_level_id and
1626 level_value = p_level_value;
1627
1628 Cursor c3 is
1629 select sr_level_pk, level_value
1630 from
1631 msd_level_values
1632 where
1633 instance = p_instance and
1634 level_id = p_level_id and
1635 level_pk = p_level_value_pk;
1636
1637 l_level_pk varchar2(255):=g_level_pk_not_found;
1638 l_level_val varchar2(2000);
1639 Begin
1640
1641 -- debug_line('In get_level_pk');
1642 if p_level_id is null then
1643 return null;
1644 /* i.e. no data collected for dimension
1645 */
1646 end if;
1647
1648 if p_instance is null or nvl(p_sr_level_value_pk, nvl(p_level_value, p_level_value_pk)) is null then
1649 /* insufficient parameters */
1650 /* l_level_pk := g_level_pk_not_found; */
1651 return null;
1652 else
1653 if p_sr_level_value_pk is not null then
1654 open c1;
1655 fetch c1 into l_level_pk, p_level_value;
1656 if c1%notfound then
1657 l_level_pk := g_level_pk_not_found;
1658 end if;
1659 close c1;
1660 elsif p_level_value is not null then
1661 open c2;
1662 fetch c2 into l_level_pk;
1663 if c2%notfound then
1664 l_level_pk := g_level_pk_not_found;
1665 end if;
1666 close c2;
1667 else /* p_level_value_pk is not null */
1668 open c3;
1669 fetch c3 into p_sr_level_value_pk, p_level_value;
1670 if c3%notfound then
1671 l_level_pk := g_level_pk_not_found;
1672 else
1673 l_level_pk := p_level_value_pk;
1674 end if;
1675 close c3;
1676 end if;
1677
1678 end if;
1679
1680 if l_level_pk <> g_level_pk_not_found then
1681 p_level_value_pk := l_level_pk;
1682 else
1683 debug_line(' p_instance ' || p_instance || ' p_level_id ' || p_level_id ||
1684 ' p_sr_level_value_pk ' || p_sr_level_value_pk || ' p_level_value ' || p_level_value ||
1685 ' p_level_value_pk ' || p_level_value_pk);
1686 end if;
1687
1688 return l_level_pk;
1689
1690 Exception
1691 When others then
1692 show_line(sqlerrm);
1693 raise;
1694
1695 End;
1696
1697 Function Build_Where_Clause (
1698 p_tokenized_where in varchar2,
1699 p_default_where in varchar2,
1700 p_parameter1 in varchar2,
1701 p_parameter2 in varchar2,
1702 p_parameter3 in varchar2,
1703 p_parameter4 in varchar2,
1704 p_parameter5 in varchar2,
1705 p_parameter6 in varchar2,
1706 p_parameter7 in varchar2,
1707 p_parameter8 in varchar2,
1708 p_parameter9 in varchar2,
1709 p_parameter10 in varchar2,
1710 p_request_id in number) return varchar2 is
1711
1712 Type param_list_type is varray(10) of varchar2(255);
1713
1714
1715 l_para_list param_list_type;
1716 l_where varchar2(3000);
1717
1718 Procedure find_and_subst_param ( p_where_cond in out NOCOPY varchar2,
1719 p_val in varchar2,
1720 p_request_id in number,
1721 p_para_num in number) is
1722
1723 start_pos number;
1724 end_pos number;
1725 para_type varchar2(10);
1726
1727 l_default_col varchar2(300) := NULL;
1728 l_count number := 0;
1729
1730 l_dblink varchar2(100) := NULL;
1731 l_retcode number := 0;
1732 l_multi_flag varchar2(30) := 'N';
1733
1734 -- 'CHAR:Prompt_Name:ValueSet_Name:Remote_Yes_No:Multi_Yes_NO:Default_Column_Name_For_Multi'
1735
1736 Begin
1737 debug_line('In find_and_subst_param');
1738 start_pos := instr(p_where_cond, '&&', 1);
1739 para_type := substr(p_where_cond, start_pos + 2, 7);
1740 end_pos := instr(p_where_cond, '''', start_pos);
1741
1742 if substr(upper(para_type), 1, 5) = 'CHAR:' then /* Character type */
1743 l_multi_flag :=
1744 nvl(upper(msd_cs_defn_utl2.get_char_property(p_where_cond, start_pos, end_pos, 4)), 'N');
1745
1746 l_default_col := msd_cs_defn_utl2.get_char_property(p_where_cond, start_pos, end_pos, 5);
1747
1748 IF l_multi_flag = 'Y' THEN
1749 /* If multi input parar then check whether user entered
1750 any values for the multi input parameters */
1751 select count(1) into l_count from msd_cs_coll_parameters
1752 where conc_request_id = p_request_id and
1753 parameter_number = p_para_num;
1754 /* If user hasn't entered any multi input parameters then
1755 use user specified default column name */
1756 IF (l_count = 0 AND l_default_col IS NOT NULL) THEN
1757 p_where_cond := substr(p_where_cond, 1, start_pos - 2) ||
1758 l_default_col ||
1759 substr(p_where_cond, end_pos + 1);
1760 ELSE
1761 p_where_cond := substr(p_where_cond, 1, start_pos - 2) ||
1762 ' (SELECT parameter_code FROM msd_cs_coll_parameters ' ||
1763 ' WHERE conc_request_id = ' || p_request_id ||
1764 ' AND parameter_number = ' || p_para_num || ' ) ' ||
1765 substr(p_where_cond, end_pos + 1);
1766 END IF;
1767 ELSE
1768 p_where_cond := substr(p_where_cond, 1, start_pos - 1) ||
1769 replace(p_val, '''', '''''') ||
1770 substr(p_where_cond, end_pos);
1771 END IF;
1772 elsif substr(upper(para_type), 1, 7) = 'NUMBER:' then /* Number type*/
1773 p_where_cond := substr(p_where_cond, 1, start_pos - 2) || p_val ||
1774 substr(p_where_cond, end_pos + 1);
1775 elsif substr(upper(para_type), 1, 5) = 'DATE:' then /* Date type */
1776 p_where_cond := substr(p_where_cond, 1, start_pos - 2) || 'to_date(''' || p_val || ''', ''YYYYMMDD'')' ||
1777 substr(p_where_cond, end_pos + 1);
1778 end if;
1779
1780 debug_line(p_where_cond);
1781
1782 End;
1783
1784
1785 Procedure substitute_parameter (
1786 p_where_cond in out NOCOPY varchar2,
1787 p_param_list in param_list_type,
1788 p_request_id in number) is
1789
1790 i number := 1;
1791 Begin
1792 debug_line('In substitute_parameter');
1793 /* DP-CRM Code changes by easwaran */
1794 while (i < 11 )
1795 loop
1796 find_and_subst_param( p_where_cond, p_param_list(i), p_request_id, i);
1797 i := i + 1;
1798 end loop;
1799
1800 End;
1801
1802 Procedure make_para_list(
1803 p_parameter1 in varchar2,
1804 p_parameter2 in varchar2,
1805 p_parameter3 in varchar2,
1806 p_parameter4 in varchar2,
1807 p_parameter5 in varchar2,
1808 p_parameter6 in varchar2,
1809 p_parameter7 in varchar2,
1810 p_parameter8 in varchar2,
1811 p_parameter9 in varchar2,
1812 p_parameter10 in varchar2,
1813 p_para_list in out NOCOPY param_list_type) is
1814 Begin
1815 debug_line('In make_para_list');
1816 p_para_list := param_list_type (p_parameter1, p_parameter2, p_parameter3, p_parameter4, p_parameter5,
1817 p_parameter6, p_parameter7, p_parameter8, p_parameter9, p_parameter10);
1818
1819 End;
1820
1821
1822 Begin
1823 debug_line('In Build_Where_Clause');
1824 if p_tokenized_where is not null then
1825 /*
1826 convert parameters into an array.
1827 */
1828 make_para_list( p_parameter1, p_parameter2, p_parameter3, p_parameter4, p_parameter5,
1829 p_parameter6, p_parameter7, p_parameter8, p_parameter9, p_parameter10,
1830 l_para_list);
1831
1832 /* Build additional Where */
1833 l_where := p_tokenized_where;
1834
1835 substitute_parameter ( l_where, l_para_list, p_request_id);
1836
1837 end if;
1838 if l_where is not null then
1839 if p_default_where is not null then
1840 l_where := p_default_where || ' and ' || l_where;
1841 end if;
1842 else
1843 l_where := p_default_where;
1844 end if;
1845
1846 return l_where;
1847
1848 Exception
1849 When others then
1850 show_line(sqlerrm);
1851 raise;
1852
1853 End;
1854
1855 Procedure Refresh_Target(
1856 p_process_type in varchar2,
1857 p_cs_definition_id in number,
1858 p_cs_name in varchar2,
1859 p_comp_refresh in varchar2,
1860 p_instance_id in number,
1861 p_new_refresh_num in NUMBER) is
1862
1863 l_sql_stmt varchar2(2000);
1864
1865 cursor C_GET_DEL_CRIT is
1866 select distinct attribute_1 instance, cs_name
1867 from msd_st_cs_data
1868 where cs_definition_id = p_cs_definition_id and
1869 cs_name = nvl(p_cs_name, cs_name);
1870
1871 /* DWK create a separe cursor to fetch instance in single stream case */
1872 cursor c_get_del_crit_single is
1873 select distinct attribute_1 instance
1874 from msd_st_cs_data
1875 where cs_definition_id = p_cs_definition_id;
1876
1877 cursor c_multi_stream is
1878 select nvl(multiple_stream_flag,'N')
1879 from msd_cs_definitions
1880 where cs_definition_id = p_cs_definition_id;
1881
1882 l_multi_flag VARCHAR2(10);
1883
1884 Begin
1885 debug_line('In refresh_target');
1886 if p_comp_refresh = 'Y' then
1887
1888 /* if p_process_type = C_SOURCE_TO_FACT then
1889 delete from msd_cs_data where cs_definition_id = p_cs_definition_id
1890 and cs_name = nvl(p_cs_name, cs_name) and attribute_1 = nvl(p_instance_id, attribute_1);
1891 */
1892 IF p_process_type = C_SOURCE_TO_STAGE then
1893 delete from msd_st_cs_data where cs_definition_id = p_cs_definition_id
1894 and cs_name = nvl(p_cs_name, cs_name) and attribute_1 = nvl(p_instance_id, attribute_1);
1895
1896 elsif p_process_type = C_STAGE_TO_FACT then
1897 /* DWK For single stream, ignore the CS_NAME column for refresh */
1898 open c_multi_stream;
1899 fetch c_multi_stream into l_multi_flag;
1900 close c_multi_stream;
1901
1902 IF (l_multi_flag = 'Y') THEN
1903 For l_rec IN c_get_del_crit LOOP
1904 UPDATE msd_cs_data
1905 SET Action_code = 'D',
1906 last_refresh_num = p_new_refresh_num
1907 WHERE cs_definition_id = p_cs_definition_id and
1908 cs_name = l_rec.cs_name and
1909 attribute_1 = l_rec.instance and
1910 action_code = 'I';
1911 END LOOP;
1912
1913 ELSE /* For single stream, ignore the cs_name in delete stmt */
1914 For l_rec IN c_get_del_crit_single LOOP
1915 UPDATE msd_cs_data
1916 SET Action_code = 'D',
1917 last_refresh_num = p_new_refresh_num
1918 WHERE cs_definition_id = p_cs_definition_id and
1919 attribute_1 = l_rec.instance and
1920 action_code = 'I';
1921
1922 END LOOP;
1923 END IF;
1924
1925 end if; /* End of C_STAGE_TO_FACT */
1926 else /* Not Complete Refresh
1927 /* Delete data from staging table to avoid double couting when user runs
1928 collection source to stage without complete refresh checkbox checked
1929 This will make custom stream collection behaviour same as other
1930 collection (Bookking/Shipment)
1931 */
1932 IF p_process_type = C_SOURCE_TO_STAGE then
1933 delete from msd_st_cs_data
1934 where cs_definition_id = p_cs_definition_id and
1935 cs_name = nvl(p_cs_name, cs_name) and
1936 attribute_1 = nvl(p_instance_id, attribute_1);
1937 END IF;
1938 end if; /* End of p_comp_refresh Y */
1939
1940 Exception
1941 When others then
1942 show_line(sqlerrm);
1943 raise;
1944
1945 End;
1946
1947 Procedure Process_1_Sub (
1948 p_cs_rec in out NOCOPY msd_cs_definitions_v1%rowtype,
1949 p_cs_name in varchar2,
1950 p_source_view in varchar2,
1951 p_target_table in varchar2,
1952 p_instance_id in number,
1953 p_sql_stmt in varchar2,
1954 p_new_refresh_num IN NUMBER) is
1955
1956 TYPE cur_type is REF CURSOR;
1957 l_cur cur_type;
1958 l_rec msd_cs_dfn_utl.G_TYP_SOURCE_STREAM;
1959
1960 l_valid boolean;
1961 l_err_msg varchar2(1000);
1962
1963 l_success_rows number:=0;
1964 l_error_rows number:=0;
1965
1966 /* Bug# 4349618 To commit in Batches */
1967 l_counter number:=0;
1968 l_commit_flag number:=0;
1969
1970 /* DWK */
1971 l_temp_designator VARCHAR2(40) := NULL;
1972 l_temp_instance_id NUMBER := NULL;
1973
1974 Begin
1975 debug_line('In Process_1_Sub');
1976
1977 open l_cur for p_sql_stmt;
1978 LOOP
1979 fetch l_cur into l_rec;
1980 exit when l_cur%notfound;
1981
1982 l_valid := null;
1983 l_err_msg := null;
1984
1985 debug_line('Validating ' || l_rec.pk_id);
1986
1987
1988 l_valid := validate_record (l_rec, p_cs_rec, nvl(l_rec.instance,p_instance_id), l_err_msg);
1989
1990 IF l_valid THEN
1991 /* IMP : Instance is p_instance in case of Collect
1992 and l_rec.instance in case of PULLL i.e. from the staging
1993 table */
1994 IF (p_target_table = 'MSD_CS_DATA') THEN
1995 ins_row_fact(l_rec, p_cs_rec, l_rec.designator,
1996 nvl(p_instance_id, l_rec.instance),
1997 p_new_refresh_num);
1998
1999 /* Insert designator into headers talbe when designator get modified. */
2000 IF ( l_rec.designator <> nvl(l_temp_designator,'-99999999~!@') OR
2001 nvl(p_instance_id,l_rec.instance) <> nvl(l_temp_instance_id,-99999999) ) THEN
2002 l_temp_designator := l_rec.designator;
2003 l_temp_instance_id := nvl(p_instance_id,l_rec.instance);
2004
2005 /* DWK Populate MSD_CS_DATA_HEADERS table after inserting rows
2006 into FACT table */
2007 insert_update_Into_Headers ( p_cs_rec.cs_definition_id,
2008 l_rec.designator,
2009 nvl(p_instance_id,l_rec.instance), p_new_refresh_num);
2010 END IF;
2011 ELSE
2012 ins_row_staging(l_rec, p_cs_rec, l_rec.designator, nvl(p_instance_id, l_rec.instance), null, null);
2013 END IF;
2014
2015 /* Mark record Processed */
2016 log_processed(l_rec, p_cs_rec, l_rec.designator, nvl(p_instance_id, l_rec.instance), p_source_view, p_target_table);
2017
2018 /* Count Success Rows */
2019 l_success_rows := l_success_rows + 1;
2020
2021 /* Bug# 4349618 To commit in Batches */
2022 l_counter := l_counter + 1;
2023
2024 ELSE /* IF not Valid */
2025
2026 /* Log Error */
2027 log_error(l_rec, p_cs_rec, l_rec.designator,
2028 nvl(p_instance_id, l_rec.instance),
2029 l_err_msg, p_source_view, p_target_table);
2030 /* Count Erroneous Rows */
2031 l_error_rows := l_error_rows + 1;
2032
2033 /* Bug# 4349618 To commit in Batches */
2034 l_counter := l_counter + 1;
2035
2036 END IF;
2037
2038 /* Bug# 4349618 To commit in Batches */
2039 SELECT mod( l_counter, C_BATCH_SIZE)
2040 INTO l_commit_flag
2041 FROM dual;
2042
2043 IF l_commit_flag = 0 THEN
2044 debug_line( 'Inside Process_1_Sub: commiting inside the loop.');
2045 commit;
2046 END IF;
2047
2048 END LOOP;
2049
2050 /* Bug$ 4349618 To commit in Batches*/
2051 debug_line( 'Inside Process_1_Sub: commiting after the loop ends.');
2052 commit;
2053
2054
2055 if l_error_rows > 0 then
2056 g_retcode := '1';
2057 g_errbuf := 'There were erroneous records in Collect/Pull.';
2058 end if;
2059
2060 if l_success_rows = 0 and l_error_rows = 0then
2061 g_retcode := '1';
2062 g_errbuf := 'There were no rows fetched.';
2063 end if;
2064
2065 /* Print Results */
2066
2067 show_line('Valid Records : ' || l_success_rows);
2068 show_line('Invalid Records : ' || l_error_rows);
2069
2070 close l_cur;
2071
2072 Exception
2073 When others then
2074 show_line(sqlerrm);
2075 show_line(p_sql_stmt);
2076 close l_cur;
2077 raise;
2078 End;
2079
2080 Function Build_Designator_Where_Clause(
2081 p_cs_rec in msd_cs_definitions_v1%rowtype,
2082 p_process_type in varchar2,
2083 p_cs_name in varchar2) return varchar2 is
2084
2085 Cursor C1 is
2086 select source_view_column_name
2087 from msd_cs_defn_column_dtls
2088 where
2089 cs_definition_id = p_cs_rec.cs_definition_id and
2090 table_column = 'CS_NAME';
2091
2092 l_where_cond varchar2(500);
2093 l_col_name varchar2(60);
2094
2095 Begin
2096 debug_Line('In Build_Designator_Where_Clause');
2097
2098 /* Build filter for designator(cs_name) */
2099 if p_process_type in (C_STAGE_TO_FACT) then
2100
2101 if p_cs_name is not null then
2102 l_where_cond := 'cs_name = ' || '''' || replace(p_cs_name, '''', '''''') || '''';
2103 end if;
2104 else
2105 if nvl(p_cs_rec.multiple_stream_flag, 'N') = 'Y' and p_cs_name is not null then
2106 open c1;
2107 fetch c1 into l_col_name;
2108 close c1;
2109
2110 if l_col_name is null then
2111 null;
2112 /*Raise Error*/
2113 else
2114 l_where_cond := l_col_name || ' = ' || '''' || replace(p_cs_name, '''', '''''') || '''';
2115 end if;
2116 end if;
2117 end if;
2118
2119 /* Add Default Where */
2120 if p_process_type = C_STAGE_TO_FACT then
2121 if l_where_cond is not null then
2122 l_where_cond := l_where_cond || ' and cs_definition_id = ' || p_cs_rec.cs_definition_id;
2123 else
2124 l_where_cond := ' cs_definition_id = ' || p_cs_rec.cs_definition_id;
2125 end if;
2126 end if;
2127
2128 return l_where_cond;
2129
2130 Exception
2131 When others then
2132 show_line(sqlerrm);
2133 raise;
2134
2135 End;
2136
2137 Procedure debug_line(p_sql in varchar2)is
2138 Begin
2139 if c_debug = 'Y' then
2140 show_line(p_sql);
2141 end if;
2142 End;
2143
2144 /* DWK */
2145 /*************************************************************************************************
2146 PROCEDURE Insert_update_Into_Headers
2147
2148 This procedure will decide whether insert cs_definition_id, cs_name, and instance into
2149 msd_cs_data_headers table or not and insert row if necessary.
2150 **************************************************************************************************/
2151 Procedure insert_update_Into_Headers ( p_cs_definition_id in number,
2152 p_cs_name in varchar2,
2153 p_instance_id in number,
2154 p_refresh_num in number) is
2155
2156
2157 p_count NUMBER:=0;
2158
2159 BEGIN
2160
2161 SELECT count(*) INTO p_count FROM msd_cs_data_headers_v1
2162 WHERE instance = p_instance_id AND
2163 cs_definition_id = p_cs_definition_id AND
2164 cs_name = p_cs_name;
2165
2166 IF ( p_count = 0 ) THEN
2167 Insert_Data_Into_Headers (p_cs_definition_id,
2168 p_cs_name,
2169 p_instance_id,
2170 p_refresh_num);
2171 ELSE
2172
2173 update msd_cs_data_headers
2174 set last_refresh_num = p_refresh_num
2175 where cs_definition_id = p_cs_definition_id
2176 and instance = p_instance_id
2177 and cs_name = p_cs_name;
2178
2179 END IF;
2180
2181
2182 Exception
2183 When others then
2184 show_line(sqlerrm);
2185 raise;
2186
2187 END insert_update_Into_Headers;
2188
2189
2190 /*************************************************************************************************
2191 PROCEDURE Insert_Data_Into_Headers
2192
2193 This procedure will insert cs_definition_id, cs_name, and instance into
2194 msd_cs_data_headers table.
2195 **************************************************************************************************/
2196 Procedure Insert_Data_Into_Headers ( p_cs_definition_id in number,
2197 p_cs_name in varchar2,
2198 p_instance_id in number,
2199 p_refresh_num in number) is
2200
2201
2202 BEGIN
2203
2204 INSERT INTO msd_cs_data_headers
2205 ( CS_DATA_HEADER_ID,
2206 INSTANCE,
2207 CS_DEFINITION_ID,
2208 CS_NAME,
2209 LAST_UPDATE_DATE,
2210 LAST_UPDATED_BY,
2211 CREATION_DATE,
2212 CREATED_BY,
2213 LAST_UPDATE_LOGIN,
2214 LAST_REFRESH_NUM
2215 )
2216 VALUES ( msd_cs_data_headers_s.nextval,
2217 p_instance_id,
2218 p_cs_definition_id,
2219 p_cs_name,
2220 sysdate,
2221 fnd_global.user_id,
2222 sysdate,
2223 fnd_global.user_id,
2224 fnd_global.login_id,
2225 p_refresh_num
2226 );
2227
2228
2229
2230 Exception
2231 When others then
2232 show_line('Error in inserting into MSD_CS_DATA_HEADERS');
2233 show_line(sqlerrm);
2234 raise;
2235
2236 END Insert_Data_Into_Headers;
2237
2238 End;