What is incremental_update?
incremental_update is a mechanism that reduces processing time by leveraging the results of the previous Unification process and applying stitching only to updated records.
How does incremental_update improve efficiency?
-
When
incremental_columns: [time]
is specified, the Unification Algorithm combines the previous final graph with the graph generated from newly added records. This allows the process to start from a state where most of the stitching is already complete, leading to faster convergence. -
When
incremental_columns: [time]
is specified, the enrichment process applies thecanonical_id
only to the delta records. This significantly shortens processing time compared to enriching thecanonical_id
for all records.
Dataset
Using the table from Example as a base, we will insert records during the incremental_update process and observe how the update is applied.
Table Used in Example1
Assume the data from Example1 is stored in the test_id_unification_ex5
database.
date | site_aaa | site_aaa | site_xxx | site_xxx | site_yyy | site_yyy | site_zzz | site_zzz | ||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
month | day | td_client_id | td_global_id | td_client_id | td_global_id | td_client_id | td_global_id | td_client_id | td_global_id | |||
1 | 5 | aaa_001 | 3rd_001 | yyy_001 | 3rd_001 | |||||||
15 | aaa_001 | 3rd_002 | zzz_001 | 3rd_002 | ||||||||
25 | aaa_001 | 3rd_003 | ||||||||||
2 | 5 | aaa_001 | 3rd_004 | xxx_001 | 3rd_004 | |||||||
15 | xxx_001 | 3rd_005 | yyy_002 | 3rd_005 | ||||||||
25 | yyy_002 | 3rd_006 | zzz_003 | 3rd_006 | ||||||||
3 | 5 | zzz_003 | 3rd_007 | |||||||||
15 | xxx_002 | 3rd_008 | zzz_003 | 3rd_008 | ||||||||
25 | aaa_002 | 3rd_009 | xxx_002 | 3rd_009 | ||||||||
4 | 5 | aaa_002 | 3rd_010 | yyy_003 | 3rd_010 | |||||||
15 | yyy_003 | 3rd_011 | zzz_004 | 3rd_011 | ||||||||
25 | xxx_003 | 3rd_012 | zzz_004 | 3rd_012 | ||||||||
5 | 5 | aaa_003 | 3rd_013 | xxx_003 | 3rd_013 | |||||||
15 | aaa_003 | 3rd_014 | ||||||||||
25 | aaa_003 | 3rd_015 | yyy_004 | 3rd_015 | zzz_005 | 3rd_015 | ||||||
6 | 5 | aaa_003 | 3rd_016 | xxx_004 | 3rd_016 | |||||||
15 | xxx_004 | 3rd_017 | zzz_005 | 3rd_017 | ||||||||
25 | yyy_005 | 3rd_018 | zzz_005 | 3rd_018 |
Records to be Added Later
date | site_aaa | site_aaa | site_xxx | site_xxx | site_yyy | site_yyy | site_zzz | site_zzz | ||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
month | day | td_client_id | td_global_id | td_client_id | td_global_id | td_client_id | td_global_id | td_client_id | td_global_id | |||
7 | 5 | aaa_004 | 3rd_017 | xxx_005 | 3rd_018 | yyy_006 | 3rd_019 | zzz_006 | 3rd_018 | |||
15 | aaa_004 | 3rd_018 | zzz_007 | 3rd_018 | ||||||||
25 | ||||||||||||
8 | 5 | aaa_005 | 3rd_018 | xxx_005 | 3rd_018 | |||||||
15 | xxx_006 | 3rd_019 | yyy_006 | 3rd_019 | zzz_008 | 3rd_019 | ||||||
25 | aaa_005 | 3rd_019 | yyy_007 | 3rd_019 |
We will add the above records midway and observe the incremental_update process.
Workflow example
id_unification_ex5.dig
timezone: UTC # Asia/Tokyo
schedule:
daily>: 09:00:00
+call_unification:
http_call>: https://api-cdp.treasuredata.com/unifications/workflow_call
headers:
- authorization: ${secret:td.apikey}
method: POST
retry: true
content_format: json
content:
run_canonical_ids: true
run_enrichments: true
run_master_tables: true
full_refresh: false
keep_debug_tables: true
unification:
!include : unification_ex5.yml
unification_ex5.yml
name: test_id_unification_ex5
keys:
- name: td_client_id
- name: td_global_id
tables:
- database: test_id_unification_ex5
table: site_aaa
incremental_columns: [time]
key_columns:
- {column: td_client_id, key: td_client_id}
- {column: td_global_id, key: td_global_id}
- database: test_id_unification_ex5
table: site_xxx
incremental_columns: [time]
key_columns:
- {column: td_client_id, key: td_client_id}
- {column: td_global_id, key: td_global_id}
- database: test_id_unification_ex5
table: site_yyy
incremental_columns: [time]
key_columns:
- {column: td_client_id, key: td_client_id}
- {column: td_global_id, key: td_global_id}
- database: test_id_unification_ex5
table: site_zzz
incremental_columns: [time]
key_columns:
- {column: td_client_id, key: td_client_id}
- {column: td_global_id, key: td_global_id}
canonical_ids:
- name: unified_cookie_id
merge_by_keys: [td_client_id, td_global_id]
merge_iterations: 5
incremental_merge_iterations: 3
master_tables:
- name: master_table_ex5
canonical_id: unified_cookie_id
attributes:
- name: td_client_id
array_elements: 5
source_columns:
- {table: site_aaa, order: last, order_by: time, priority: 1}
- {table: site_xxx, order: last, order_by: time, priority: 1}
- {table: site_yyy, order: last, order_by: time, priority: 1}
- {table: site_zzz, order: last, order_by: time, priority: 1}
- name: td_global_id
array_elements: 5
source_columns:
- {table: site_aaa, order: last, order_by: time, priority: 1}
- {table: site_xxx, order: last, order_by: time, priority: 1}
- {table: site_yyy, order: last, order_by: time, priority: 1}
- {table: site_zzz, order: last, order_by: time, priority: 1}
Settings for incremental_update
full_refresh: false
content:
run_canonical_ids: true
run_enrichments: true
run_master_tables: true
full_refresh: false
keep_debug_tables: true
unification:
!include : unification_ex5.yml
By setting full_refresh: false
in the content:
section of the dig file, you enable the configuration for incremental_update
. However, note that incremental_update
is not performed on a daily basis. Instead, a full_refresh
(processing all records as usual) is performed once every three days.
incremental_columns:
Warning
Currently, any setting other than incremental_columns: [time]
will result in all processes being executed as full_refresh
. Avoid using other configurations.
tables:
- database: test_id_unification_ex5
table: site_aaa
incremental_columns: [time]
key_columns:
- {column: td_client_id, key: td_client_id}
- {column: td_global_id, key: td_global_id}
In the tables:
section of the YAML file, you can configure incremental_columns: [column1, column2,...]
for each table. When the +get_next_high_water_mark
task records where processing left off, the order specified in this option will be used for sorting:
ORDER BY column1 DESC, column2 DESC,...
The first row in this sorted order (the most recent record) will have its column1, column2,... values recorded.
If this option is not set, the +get_next_high_water_mark
task will not run. The Unification Algorithm will operate in a manner similar to full_refresh
, and all records in the enriched_
tables will be replaced.
incremental_columns: [time]
When incremental_columns: [time]
is specified (only the time
column is defined), the process qualifies as incremental_update
. The Unification Algorithm considers only newly added records, and instead of replacing entire enriched_
tables, enrichment is performed on newly added records and appended to the table. As of now, only the [time]
setting enables efficient incremental updates, so keep this in mind.
incremental_merge_iterations:
canonical_ids:
- name: unified_cookie_id
merge_by_keys: [td_client_id, td_global_id]
merge_iterations: 5
incremental_merge_iterations: 3
In the canonical_ids:
configuration, the number of iterations for loops during incremental_update
can be specified using incremental_merge_iterations:
. If not set, the default value is 2
.
How to Verify if incremental_update Was Performed
A full_refresh
is performed once every three days, while incremental_update
is executed on other days. On the second or third day of the schedule execution, check the most recent session (history) of the workflow (WF) using the methods below to identify sessions where incremental_update
was processed.
Method 1: Check the +extract_and_merge Task
drop table if exists "unified_cookie_id_graph_unify_loop_0";
create table "unified_cookie_id_graph_unify_loop_0" with (bucketed_on = array['follower_id'], bucket_count = 512) as
-- incremental extraction and merge to the previous graph
select
coalesce(prev.follower_id, next.follower_id) as follower_id,
coalesce(prev.follower_ns, next.follower_ns) as follower_ns,
If the comment just above the select
statement reads:
-- incremental extraction and merge to the previous graph
then this WF session performed an incremental_update
. Conversely, for full_refresh
, the comment will read:
-- full extraction
Method 2: Check if the +source_key_stats Task Was Executed
In the TIMELINE or TASKS section of the WF session, if the +source_key_stats:
task was executed, it indicates a full_refresh
. If this task was not executed, it signifies an incremental_update
.
On days when incremental_update
is executed, you can add new records using the SQL for Appending Records and rerun the WF to confirm the processing of newly added records during the incremental_update
.
Explanation of the Unification Algorithm
graph_unify_loop_0
In an incremental_update
, the creation of the initial graph differs. The initial graph is formed by combining the following two graphs:
- The graph for newly added records
- The final graph from the previous execution
1. The Graph for Newly Added Records
2. The Final Graph from the Previous Execution
graph_unify_loop_0
The initial graph for incremental_update
is the result of combining these two graphs.
Starting from this graph allows the algorithm to converge with fewer iterations. In most cases, during the first loop, when the leader is replaced, it converges to aaa_001
(see Example 2), bringing it closer to convergence.
graph_unify_loop_1 (and Similar Graphs for 2, etc.)
In practice, it is confirmed that the graph converges in the first iteration.
Output
master_table
Since the newly added records also belong to the same individual, the master_table
consists of a single record. The canonical_id
is generated based on aaa_001
, so its value remains unchanged from the previous execution.
Result Example
unified_cookie_id | td_client_id | td_global_id |
---|---|---|
Su-bHvUu9NN_ | ["yyy_007", "aaa_005", "xxx_006", "yyy_006", "zzz_008"] | ["3rd_019", "3rd_019", "3rd_019", "3rd_019", "3rd_019"] |
enriched_ Table
Under the configuration incremental_columns: [time]
, when an incremental_update
is performed, canonical_id
is not assigned to all records in the enriched_
Table with a Replace operation. Instead, canonical_id
is assigned only to the updated records, and these are appended to the existing enriched_
Table. This drastically reduces the time required for the enrichment task.
Limitation for incremental_update
When Records from the Past are Deleted or Updated
In incremental_update
, any deletions or updates to past records are not taken into the ID unification immediately. This requires careful attention. However, since a full_refresh
is performed once every three days, those changes will be addressed during that process.