What is incremental_update?
incremental_update is a mechanism that reduces processing time by leveraging the results of the previous Unification process and applying stitching only to updated records.
How does incremental_update improve efficiency?
- 
When 
incremental_columns: [time]is specified, the Unification Algorithm combines the previous final graph with the graph generated from newly added records. This allows the process to start from a state where most of the stitching is already complete, leading to faster convergence. - 
When 
incremental_columns: [time]is specified, the enrichment process applies thecanonical_idonly to the delta records. This significantly shortens processing time compared to enriching thecanonical_idfor all records. 
Dataset
Using the table from Example as a base, we will insert records during the incremental_update process and observe how the update is applied.
Table Used in Example1
Assume the data from Example1 is stored in the test_id_unification_ex5 database.
| date | site_aaa | site_aaa | site_xxx | site_xxx | site_yyy | site_yyy | site_zzz | site_zzz | ||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| month | day | td_client_id | td_global_id | td_client_id | td_global_id | td_client_id | td_global_id | td_client_id | td_global_id | |||
| 1 | 5 | aaa_001 | 3rd_001 | yyy_001 | 3rd_001 | |||||||
| 15 | aaa_001 | 3rd_002 | zzz_001 | 3rd_002 | ||||||||
| 25 | aaa_001 | 3rd_003 | ||||||||||
| 2 | 5 | aaa_001 | 3rd_004 | xxx_001 | 3rd_004 | |||||||
| 15 | xxx_001 | 3rd_005 | yyy_002 | 3rd_005 | ||||||||
| 25 | yyy_002 | 3rd_006 | zzz_003 | 3rd_006 | ||||||||
| 3 | 5 | zzz_003 | 3rd_007 | |||||||||
| 15 | xxx_002 | 3rd_008 | zzz_003 | 3rd_008 | ||||||||
| 25 | aaa_002 | 3rd_009 | xxx_002 | 3rd_009 | ||||||||
| 4 | 5 | aaa_002 | 3rd_010 | yyy_003 | 3rd_010 | |||||||
| 15 | yyy_003 | 3rd_011 | zzz_004 | 3rd_011 | ||||||||
| 25 | xxx_003 | 3rd_012 | zzz_004 | 3rd_012 | ||||||||
| 5 | 5 | aaa_003 | 3rd_013 | xxx_003 | 3rd_013 | |||||||
| 15 | aaa_003 | 3rd_014 | ||||||||||
| 25 | aaa_003 | 3rd_015 | yyy_004 | 3rd_015 | zzz_005 | 3rd_015 | ||||||
| 6 | 5 | aaa_003 | 3rd_016 | xxx_004 | 3rd_016 | |||||||
| 15 | xxx_004 | 3rd_017 | zzz_005 | 3rd_017 | ||||||||
| 25 | yyy_005 | 3rd_018 | zzz_005 | 3rd_018 | 
Records to be Added Later
| date | site_aaa | site_aaa | site_xxx | site_xxx | site_yyy | site_yyy | site_zzz | site_zzz | ||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| month | day | td_client_id | td_global_id | td_client_id | td_global_id | td_client_id | td_global_id | td_client_id | td_global_id | |||
| 7 | 5 | aaa_004 | 3rd_017 | xxx_005 | 3rd_018 | yyy_006 | 3rd_019 | zzz_006 | 3rd_018 | |||
| 15 | aaa_004 | 3rd_018 | zzz_007 | 3rd_018 | ||||||||
| 25 | ||||||||||||
| 8 | 5 | aaa_005 | 3rd_018 | xxx_005 | 3rd_018 | |||||||
| 15 | xxx_006 | 3rd_019 | yyy_006 | 3rd_019 | zzz_008 | 3rd_019 | ||||||
| 25 | aaa_005 | 3rd_019 | yyy_007 | 3rd_019 | 
We will add the above records midway and observe the incremental_update process.
Workflow example
id_unification_ex5.dig
timezone: UTC # Asia/Tokyo
schedule:
  daily>: 09:00:00
+call_unification:
  http_call>: https://api-cdp.treasuredata.com/unifications/workflow_call  
  headers:
    - authorization: ${secret:td.apikey}
  method: POST
  retry: true
  content_format: json
  content:
    run_canonical_ids: true
    run_enrichments: true
    run_master_tables: true
    full_refresh: false
    keep_debug_tables: true
    unification:
      !include : unification_ex5.ymlunification_ex5.yml
name: test_id_unification_ex5
keys:
  - name: td_client_id
  - name: td_global_id
tables:
  - database: test_id_unification_ex5
    table: site_aaa
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}
  - database: test_id_unification_ex5
    table: site_xxx
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}
  - database: test_id_unification_ex5
    table: site_yyy
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}
  - database: test_id_unification_ex5
    table: site_zzz
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}
canonical_ids:
  - name: unified_cookie_id
    merge_by_keys: [td_client_id, td_global_id]
    merge_iterations: 5
    incremental_merge_iterations: 3
master_tables:
  - name: master_table_ex5
    canonical_id: unified_cookie_id
    attributes:
    - name: td_client_id
      array_elements: 5
      source_columns:
        - {table: site_aaa, order: last, order_by: time, priority: 1}
        - {table: site_xxx, order: last, order_by: time, priority: 1}
        - {table: site_yyy, order: last, order_by: time, priority: 1}
        - {table: site_zzz, order: last, order_by: time, priority: 1}                    
    - name: td_global_id
      array_elements: 5
      source_columns:
        - {table: site_aaa, order: last, order_by: time, priority: 1}
        - {table: site_xxx, order: last, order_by: time, priority: 1}
        - {table: site_yyy, order: last, order_by: time, priority: 1}
        - {table: site_zzz, order: last, order_by: time, priority: 1}                    Settings for incremental_update
full_refresh: false
  content:
    run_canonical_ids: true
    run_enrichments: true
    run_master_tables: true
    full_refresh: false
    keep_debug_tables: true
    unification:
      !include : unification_ex5.ymlBy setting full_refresh: false in the content: section of the dig file, you enable the configuration for incremental_update. However, note that incremental_update is not performed on a daily basis. Instead, a full_refresh (processing all records as usual) is performed once every three days.
incremental_columns:
Warning
Currently, any setting other than incremental_columns: [time] will result in all processes being executed as full_refresh. Avoid using other configurations.
tables:
  - database: test_id_unification_ex5
    table: site_aaa
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}In the tables: section of the YAML file, you can configure incremental_columns: [column1, column2,...] for each table. When the +get_next_high_water_mark task records where processing left off, the order specified in this option will be used for sorting:
ORDER BY column1 DESC, column2 DESC,...The first row in this sorted order (the most recent record) will have its column1, column2,... values recorded.
If this option is not set, the +get_next_high_water_mark task will not run. The Unification Algorithm will operate in a manner similar to full_refresh, and all records in the enriched_ tables will be replaced.
incremental_columns: [time]
When incremental_columns: [time] is specified (only the time column is defined), the process qualifies as incremental_update. The Unification Algorithm considers only newly added records, and instead of replacing entire enriched_ tables, enrichment is performed on newly added records and appended to the table. As of now, only the [time] setting enables efficient incremental updates, so keep this in mind.
incremental_merge_iterations:
canonical_ids:
  - name: unified_cookie_id
    merge_by_keys: [td_client_id, td_global_id]
    merge_iterations: 5
    incremental_merge_iterations: 3In the canonical_ids: configuration, the number of iterations for loops during incremental_update can be specified using incremental_merge_iterations:. If not set, the default value is 2.
How to Verify if incremental_update Was Performed
A full_refresh is performed once every three days, while incremental_update is executed on other days. On the second or third day of the schedule execution, check the most recent session (history) of the workflow (WF) using the methods below to identify sessions where incremental_update was processed.
Method 1: Check the +extract_and_merge Task
drop table if exists "unified_cookie_id_graph_unify_loop_0";
create table "unified_cookie_id_graph_unify_loop_0" with (bucketed_on = array['follower_id'], bucket_count = 512) as
-- incremental extraction and merge to the previous graph
select
  coalesce(prev.follower_id, next.follower_id) as follower_id,
  coalesce(prev.follower_ns, next.follower_ns) as follower_ns,If the comment just above the select statement reads:
-- incremental extraction and merge to the previous graphthen this WF session performed an incremental_update. Conversely, for full_refresh, the comment will read:
-- full extractionMethod 2: Check if the +source_key_stats Task Was Executed
      
  
    
In the TIMELINE or TASKS section of the WF session, if the +source_key_stats: task was executed, it indicates a full_refresh. If this task was not executed, it signifies an incremental_update.
On days when incremental_update is executed, you can add new records using the SQL for Appending Records and rerun the WF to confirm the processing of newly added records during the incremental_update.
Explanation of the Unification Algorithm
graph_unify_loop_0
In an incremental_update, the creation of the initial graph differs. The initial graph is formed by combining the following two graphs:
- The graph for newly added records
 - The final graph from the previous execution
 
1. The Graph for Newly Added Records
      
  
    
2. The Final Graph from the Previous Execution
      
  
    
graph_unify_loop_0
The initial graph for incremental_update is the result of combining these two graphs.
      
  
    
Starting from this graph allows the algorithm to converge with fewer iterations. In most cases, during the first loop, when the leader is replaced, it converges to aaa_001 (see Example 2), bringing it closer to convergence.
graph_unify_loop_1 (and Similar Graphs for 2, etc.)
      
  
    
In practice, it is confirmed that the graph converges in the first iteration.
Output
master_table
Since the newly added records also belong to the same individual, the master_table consists of a single record. The canonical_id is generated based on aaa_001, so its value remains unchanged from the previous execution.
Result Example
| unified_cookie_id | td_client_id | td_global_id | 
|---|---|---|
| Su-bHvUu9NN_ | ["yyy_007", "aaa_005", "xxx_006", "yyy_006", "zzz_008"] | ["3rd_019", "3rd_019", "3rd_019", "3rd_019", "3rd_019"] | 
enriched_ Table
Under the configuration incremental_columns: [time], when an incremental_update is performed, canonical_id is not assigned to all records in the enriched_ Table with a Replace operation. Instead, canonical_id is assigned only to the updated records, and these are appended to the existing enriched_ Table. This drastically reduces the time required for the enrichment task.
Limitation for incremental_update
When Records from the Past are Deleted or Updated
In incremental_update, any deletions or updates to past records are not taken into the ID unification immediately. This requires careful attention. However, since a full_refresh is performed once every three days, those changes will be addressed during that process.