What is incremental_update?

incremental_update is a mechanism that reduces processing time by leveraging the results of the previous Unification process and applying stitching only to updated records.

How does incremental_update improve efficiency?

  1. When incremental_columns: [time] is specified, the Unification Algorithm combines the previous final graph with the graph generated from newly added records. This allows the process to start from a state where most of the stitching is already complete, leading to faster convergence.
  2. When incremental_columns: [time] is specified, the enrichment process applies the canonical_id only to the delta records. This significantly shortens processing time compared to enriching the canonical_id for all records.

Dataset

Using the table from Example as a base, we will insert records during the incremental_update process and observe how the update is applied.

Table Used in Example1

Assume the data from Example1 is stored in the test_id_unification_ex5 database.

date site_aaa site_aaa site_xxx site_xxx site_yyy site_yyy site_zzz site_zzz
month day td_client_id td_global_id td_client_id td_global_id td_client_id td_global_id td_client_id td_global_id
1 5 aaa_001 3rd_001 yyy_001 3rd_001
15 aaa_001 3rd_002 zzz_001 3rd_002
25 aaa_001 3rd_003
2 5 aaa_001 3rd_004 xxx_001 3rd_004
15 xxx_001 3rd_005 yyy_002 3rd_005
25 yyy_002 3rd_006 zzz_003 3rd_006
3 5 zzz_003 3rd_007
15 xxx_002 3rd_008 zzz_003 3rd_008
25 aaa_002 3rd_009 xxx_002 3rd_009
4 5 aaa_002 3rd_010 yyy_003 3rd_010
15 yyy_003 3rd_011 zzz_004 3rd_011
25 xxx_003 3rd_012 zzz_004 3rd_012
5 5 aaa_003 3rd_013 xxx_003 3rd_013
15 aaa_003 3rd_014
25 aaa_003 3rd_015 yyy_004 3rd_015 zzz_005 3rd_015
6 5 aaa_003 3rd_016 xxx_004 3rd_016
15 xxx_004 3rd_017 zzz_005 3rd_017
25 yyy_005 3rd_018 zzz_005 3rd_018

Records to be Added Later

date site_aaa site_aaa site_xxx site_xxx site_yyy site_yyy site_zzz site_zzz
month day td_client_id td_global_id td_client_id td_global_id td_client_id td_global_id td_client_id td_global_id
7 5 aaa_004 3rd_017 xxx_005 3rd_018 yyy_006 3rd_019 zzz_006 3rd_018
15 aaa_004 3rd_018 zzz_007 3rd_018
25
8 5 aaa_005 3rd_018 xxx_005 3rd_018
15 xxx_006 3rd_019 yyy_006 3rd_019 zzz_008 3rd_019
25 aaa_005 3rd_019 yyy_007 3rd_019

We will add the above records midway and observe the incremental_update process.

Workflow example

id_unification_ex5.dig

Copy
Copied
timezone: UTC # Asia/Tokyo
schedule:
  daily>: 09:00:00

+call_unification:
  http_call>: https://api-cdp.treasuredata.com/unifications/workflow_call  
  headers:
    - authorization: ${secret:td.apikey}
  method: POST
  retry: true
  content_format: json
  content:

    run_canonical_ids: true
    run_enrichments: true
    run_master_tables: true

    full_refresh: false
    keep_debug_tables: true

    unification:
      !include : unification_ex5.yml

unification_ex5.yml

Copy
Copied
name: test_id_unification_ex5

keys:
  - name: td_client_id
  - name: td_global_id

tables:
  - database: test_id_unification_ex5
    table: site_aaa
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}
  - database: test_id_unification_ex5
    table: site_xxx
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}

  - database: test_id_unification_ex5
    table: site_yyy
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}

  - database: test_id_unification_ex5
    table: site_zzz
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}

canonical_ids:
  - name: unified_cookie_id
    merge_by_keys: [td_client_id, td_global_id]
    merge_iterations: 5
    incremental_merge_iterations: 3

master_tables:
  - name: master_table_ex5
    canonical_id: unified_cookie_id
    attributes:
    - name: td_client_id
      array_elements: 5
      source_columns:
        - {table: site_aaa, order: last, order_by: time, priority: 1}
        - {table: site_xxx, order: last, order_by: time, priority: 1}
        - {table: site_yyy, order: last, order_by: time, priority: 1}
        - {table: site_zzz, order: last, order_by: time, priority: 1}                    
    - name: td_global_id
      array_elements: 5
      source_columns:
        - {table: site_aaa, order: last, order_by: time, priority: 1}
        - {table: site_xxx, order: last, order_by: time, priority: 1}
        - {table: site_yyy, order: last, order_by: time, priority: 1}
        - {table: site_zzz, order: last, order_by: time, priority: 1}                    

Settings for incremental_update

full_refresh: false

Copy
Copied
  content:

    run_canonical_ids: true
    run_enrichments: true
    run_master_tables: true

    full_refresh: false
    keep_debug_tables: true

    unification:
      !include : unification_ex5.yml

By setting full_refresh: false in the content: section of the dig file, you enable the configuration for incremental_update. However, note that incremental_update is not performed on a daily basis. Instead, a full_refresh (processing all records as usual) is performed once every three days.

incremental_columns:

Warning

Currently, any setting other than incremental_columns: [time] will result in all processes being executed as full_refresh. Avoid using other configurations.

Copy
Copied
tables:
  - database: test_id_unification_ex5
    table: site_aaa
    incremental_columns: [time]
    key_columns:
      - {column: td_client_id, key: td_client_id}
      - {column: td_global_id, key: td_global_id}

In the tables: section of the YAML file, you can configure incremental_columns: [column1, column2,...] for each table. When the +get_next_high_water_mark task records where processing left off, the order specified in this option will be used for sorting:

Copy
Copied
ORDER BY column1 DESC, column2 DESC,...

The first row in this sorted order (the most recent record) will have its column1, column2,... values recorded.

If this option is not set, the +get_next_high_water_mark task will not run. The Unification Algorithm will operate in a manner similar to full_refresh, and all records in the enriched_ tables will be replaced.

incremental_columns: [time]

When incremental_columns: [time] is specified (only the time column is defined), the process qualifies as incremental_update. The Unification Algorithm considers only newly added records, and instead of replacing entire enriched_ tables, enrichment is performed on newly added records and appended to the table. As of now, only the [time] setting enables efficient incremental updates, so keep this in mind.

incremental_merge_iterations:

Copy
Copied
canonical_ids:
  - name: unified_cookie_id
    merge_by_keys: [td_client_id, td_global_id]
    merge_iterations: 5
    incremental_merge_iterations: 3

In the canonical_ids: configuration, the number of iterations for loops during incremental_update can be specified using incremental_merge_iterations:. If not set, the default value is 2.

How to Verify if incremental_update Was Performed

A full_refresh is performed once every three days, while incremental_update is executed on other days. On the second or third day of the schedule execution, check the most recent session (history) of the workflow (WF) using the methods below to identify sessions where incremental_update was processed.

Method 1: Check the +extract_and_merge Task

Copy
Copied
drop table if exists "unified_cookie_id_graph_unify_loop_0";
create table "unified_cookie_id_graph_unify_loop_0" with (bucketed_on = array['follower_id'], bucket_count = 512) as
-- incremental extraction and merge to the previous graph
select
  coalesce(prev.follower_id, next.follower_id) as follower_id,
  coalesce(prev.follower_ns, next.follower_ns) as follower_ns,

If the comment just above the select statement reads:

Copy
Copied
-- incremental extraction and merge to the previous graph

then this WF session performed an incremental_update. Conversely, for full_refresh, the comment will read:

Copy
Copied
-- full extraction

Method 2: Check if the +source_key_stats Task Was Executed

canonical_id_1

In the TIMELINE or TASKS section of the WF session, if the +source_key_stats: task was executed, it indicates a full_refresh. If this task was not executed, it signifies an incremental_update.

On days when incremental_update is executed, you can add new records using the SQL for Appending Records and rerun the WF to confirm the processing of newly added records during the incremental_update.

Explanation of the Unification Algorithm

graph_unify_loop_0

In an incremental_update, the creation of the initial graph differs. The initial graph is formed by combining the following two graphs:

  1. The graph for newly added records
  2. The final graph from the previous execution

1. The Graph for Newly Added Records

canonical_id_1

2. The Final Graph from the Previous Execution

canonical_id_1

graph_unify_loop_0

The initial graph for incremental_update is the result of combining these two graphs.

canonical_id_1

Starting from this graph allows the algorithm to converge with fewer iterations. In most cases, during the first loop, when the leader is replaced, it converges to aaa_001 (see Example 2), bringing it closer to convergence.

graph_unify_loop_1 (and Similar Graphs for 2, etc.)

canonical_id_1

In practice, it is confirmed that the graph converges in the first iteration.

Output

master_table

Since the newly added records also belong to the same individual, the master_table consists of a single record. The canonical_id is generated based on aaa_001, so its value remains unchanged from the previous execution.

Result Example

unified_cookie_id td_client_id td_global_id
Su-bHvUu9NN_ ["yyy_007", "aaa_005", "xxx_006", "yyy_006", "zzz_008"] ["3rd_019", "3rd_019", "3rd_019", "3rd_019", "3rd_019"]

enriched_ Table

Under the configuration incremental_columns: [time], when an incremental_update is performed, canonical_id is not assigned to all records in the enriched_ Table with a Replace operation. Instead, canonical_id is assigned only to the updated records, and these are appended to the existing enriched_ Table. This drastically reduces the time required for the enrichment task.

Limitation for incremental_update

When Records from the Past are Deleted or Updated

In incremental_update, any deletions or updates to past records are not taken into the ID unification immediately. This requires careful attention. However, since a full_refresh is performed once every three days, those changes will be addressed during that process.