Implementing Informatica Incremental
Aggregation
Using incremental aggregation, we apply
captured changes in the source data (CDC part) to aggregate calculations in a
session. If the source changes incrementally and we can capture the changes,
then we can configure the session to process those changes. This allows the
Integration Service to update the target incrementally, rather than forcing it
to delete previous loads data, process the entire source data and recalculate
the same data each time you run the session.
Incremental Aggregation
When the session runs with incremental
aggregation enabled for the first time say 1st week of Jan, we will use the
entire source. This allows the Integration Service to read and store the
necessary aggregate data information. On 2nd week of Jan, when we run the
session again, we will filter out the CDC records from the source i. e the records loaded after the initial
load. The Integration Service then processes these new data and updates the
target accordingly.
Use incremental aggregation when the
changes do not significantly change the target.If processing the incrementally
changed source alters more than half the existing target, the session may not
benefit from using incremental aggregation. In this case, drop the table and
recreate the target with entire source data and recalculate the same
aggregation formula .
INCREMENTAL AGGREGATION, may be helpful in
cases when we need to load data in monthly facts in a weekly basis.
Sample Mapping
Let us see a sample mapping to implement
incremental aggregation:

Look at the Source Qualifier query to fetch
the CDC part using a BATCH_LOAD_CONTROL table that saves the last successful
load date for the particular mapping.

Look at the ports tab of Expression
transformation.

Look at the ports tab of Aggregator
Transformation.

Now the most important session properties
configuration to implement incremental Aggregation

If we want to reinitialize the aggregate
cache suppose during first week of every month we will configure the same
session in a new workflow at workflow level with the Reinitialize aggregate
cache property checked in.

Example with Data
Now have a look at the source table data:
CUSTOMER_KEY
|
INVOICE_KEY
|
AMOUNT
|
LOAD_DATE
|
1111
|
5001
|
100
|
01/01/2010
|
2222
|
5002
|
250
|
01/01/2010
|
3333
|
5003
|
300
|
01/01/2010
|
1111
|
6007
|
200
|
07/01/2010
|
1111
|
6008
|
150
|
07/01/2010
|
2222
|
6009
|
250
|
07/01/2010
|
4444
|
1234
|
350
|
07/01/2010
|
5555
|
6157
|
500
|
07/01/2010
|
After the first Load on 1st week of Jan
2010, the data in the target is as follows:
CUSTOMER_KEY
|
INVOICE_KEY
|
MON_KEY
|
AMOUNT
|
1111
|
5001
|
201001
|
100
|
2222
|
5002
|
201001
|
250
|
3333
|
5003
|
201001
|
300
|
Now during the 2nd week load it will
process only the incremental data in the source i.e those records having load
date greater than the last session run date. After the 2nd weeks load after
incremental aggregation of the incremental source data with the aggregate cache
file data will update the target table with the following dataset:
CUSTOMER_KEY
|
INVOICE_KEY
|
MON_KEY
|
AMOUNT
|
Remarks/Operation
|
1111
|
6008
|
201001
|
450
|
The cache file updated after aggretation
|
2222
|
6009
|
201001
|
500
|
The cache file updated after aggretation
|
3333
|
5003
|
201001
|
300
|
The cache file remains the same as before
|
4444
|
1234
|
201001
|
350
|
New group row inserted in cache file
|
5555
|
6157
|
201001
|
500
|
New group row inserted in cache file
|
Understanding Incremental Aggregation
Process
The first time we run an incremental
aggregation session, the Integration Service processes the entire source. At
the end of the session, the Integration Service stores aggregate data for that
session run in two files, the index file and the data file. The Integration
Service creates the files in the cache directory specified in the Aggregator
transformation properties.
Each subsequent time we run the session
with incremental aggregation, we use the incremental source changes in the
session. For each input record, the Integration Service checks historical
information in the index file for a corresponding group. If it finds a
corresponding group, the Integration Service performs the aggregate operation
incrementally, using the aggregate data for that group, and saves the
incremental change. If it does not find a corresponding group, the Integration
Service creates a new group and saves the record data.
When writing to the target, the Integration
Service applies the changes to the existing target. It saves modified aggregate
data in the index and data files to be used as historical data the next time
you run the session.
Each subsequent time we run a session with
incremental aggregation, the Integration Service creates a backup of the
incremental aggregation files. The cache directory for the Aggregator
transformation must contain enough disk space for two sets of the files.
The Integration Service creates new
aggregate data, instead of using historical data, when we configure the session
to reinitialize the aggregate cache, Delete cache files etc.
When the Integration Service rebuilds
incremental aggregation files, the data in the previous files is lost.
Note: To protect the incremental aggregation files from file corruption or disk failure, periodically back up the files.
No comments:
Post a Comment