Bulk Import API Tutorial

This tutorial provides an example of how to bulk load rows into a table in a Treasure Data database.

About Bulk Data Import

Bulk data import is a process that loads large amounts of data into a TD database in a relatively short period of time. This method of importing is recommended when you need to import a huge amount of data into Treasure Data. For example, a first-time upload of a large dataset from an external system would benefit from a bulk load process. For smaller datasets, such as daily imports, Treasure Data recommends using the Data Ingestion API.

With typical database loads, rows are inserted one at a time. But a bulk data import load takes advantage of more efficient import methods. And while these methods may bypass triggers and some integrity checks, the resulting import performance is much faster.

Bulk Import APIs Documentation

A complete list of endpoints for bulk_import APIs can be found at Treasure Data Bulk Import API.

About the Bulk Import Process

Generally, the process of bulk loading a file to a Treasure Data database is:

  1. Create a msgpack.gz file to upload — Each line of the file is a JSON-formatted description of the row to be inserted into the table. After creating this file, it must be compressed into the MessagePack binary serialization format before it can be uploaded with the API.
  2. Create a bulk import session.
  3. Upload a msgpack.gz file.
  4. Create a job to perform the bulk import.
  5. Commit the import job when it is ready.

After the commit completes successfully, the new table rows can be viewed using either TD Console or TD APIs.

Bulk Import Tutorials

The following step-by-step tutorials are provided for bulk import:

Prerequisites

To complete the steps described in these tutorials, you need:

  • A valid TD API key
  • A valid msgpack.gz file that contains the data to be bulk loaded
  • An existing table to import into

Creating msgpack.gz Files

The MessagePack file that is uploaded is the same in all of the tutorials. It is created by specifying table rows in JSON format and them compressing the file with MessagePack.

For example, in the following file, sample.json, each line contains a JSON object that describes three columns: time, id, and email address. Each line of the file represents a row that will be added to the database.

Copy
Copied
{"time":1669161014,"id":100,"email":"email100@example.com"}
{"time":1669161015,"id":101,"email":"email101@example.com"}
{"time":1669161016,"id":102,"email":"email102@example.com"}

An easy way to create the MessagePack file is to use the TD CLI bulk_import:prepare_parts command. Here is an example of how to issue this command:

Copy
Copied
$  td bulk_import:prepare_parts sample.json --format json --time-column time  --output outputdir

Processing sample.json...
  Preparing part "sample_0"...
  sample: 2022-11-22 23:50:14 UTC {"time":1669161014,"id":100,"email":"email100@example.com"}
  sample.json: 3 entries.

The resulting file that is placed in the outputdir directory is named sample_0.msgpack.gz and can be used to bulk load data.

You can also create the MessagePack file using any of the programming languages supported on the MessagePack website.

important

Each row must must contain a time column whose value is specified as a UNIX timestamp. Invalid time values are not imported.

Bulk Import Example using TD Toolbelt CLI

In this example, the TD Toolbelt CLI is used to perform the following steps:

  1. Create a database in Treasure Data.
  2. Create a table in the database.
  3. Create a bulk import session.
  4. Upload a msgpack.gz file.
  5. Create a job to perform the bulk import.
  6. Get the status of the job.
  7. After the job is ready , commit the job.

Verifying TD Toolbelt CLI Configuration

The Treasure Data instance that the TD CLI commands point to, and the API key that they use, are contained in the td.conf file. To see the contents of the td.conf file enter the following command:

Copy
Copied
$ cat ~/.td/td.conf

[account]
  user = my.name@mycompany.com
  apikey = 123/abcdef•••••••••••••••••••••••0123456789
  endpoint = https://api.treasuredata.com

Modify the contents of this file as necessary for the CLI commands to run against the correct Treasure Data instance.

Creating a Database

Issue the td db:create command to create the database in Treasure Data. In this example, the name of the database is my_bulk_load_db_cli.

Copy
Copied
$ td db:create my_bulk_load_db_cli

Database 'my_bulk_load_db_cli' is created.
Use 'td table:create my_bulk_load_db_cli <table_name>' to create a table.

Creating a Table

Issue the td table:create comnmand to create the table in the database you just created. In this example, the name of the table is my_bulk_load_table_cli.

Copy
Copied
$ td table:create my_bulk_load_db_cli my_bulk_load_table_cli

Table 'my_bulk_load_db_cli.my_bulk_load_table_cli' is created.
}

If desired, you can further define the schema of the table. You can bulk import into a table that is empty or into a table that already contains data. Additionally, the columns, in the rows of the data that you upload, do not need to match the current schema of the table. If you are uploading columns that don't exist, the new columns will be added to the table schema.

Creating a Bulk Import Session

Issue the td bulk_import:create command to create the session. The session name needs to be unique. In this example, the session name is my_bulk_load_session_cli.

Copy
Copied
$ td bulk_import:create my_bulk_load_session_cli my_bulk_load_db_cli my_bulk_load_table_cli

Bulk import session 'my_bulk_load_session_cli' is created.

Uploading the msgpack.gz File

Issue the td bulk_import:upload_parts command to upload the msgpack.gz file.

In this example the file name you are uploading is named sample.msgpack.gz.

Copy
Copied
$  td bulk_import:upload_parts my_bulk_load_session_cli sample.msgpack.gz

Uploading 'sample.msgpack.gz' -> 'sample'... (142 bytes)
done.

Creating a Job to Perform the Bulk Import

Issue the td bulk_import:perform command to create the job that performs the import.

Copy
Copied
$  td bulk_import:perform my_bulk_load_session_cli

Job 55302846 is queued.
Use 'td job:show [-w] 55302846' to show the status.

Checking the Status of the Bulk Import Job

Issue the td job:show command to see the status of the import job.

Copy
Copied
td job:show 55302846

JobID       : 55302846
Status      : success
Type        : bulk_import_perform
Database    : my_bulk_load_db_cli
Destination : -- LOAD DATA SESSION my_bulk_load_session_cli INTO TABLE my_bulk_load_table_cli
Use '-v' option to show detailed messages.

For additional information as to whether the import was successful you can use the -v (verbose) option. In this example, after Status shows success, valid.parts is set to 1 and valid records is set to 10:

Copy
Copied
td job:show 55302846 -v | grep '^Status\|valid.'

Status      : success
        valid.parts=1
        valid.records=10

Commit the Bulk Import Job

Issue the td bulk_import:commit command to commit the results of the job.

Copy
Copied
$  td bulk_import:commit my_bulk_load_session_cli

Bulk import session 'my_bulk_load_session_cli' started to commit.

You can see the results of the import by using a command like td table:show. It shows the new row Count and the new Schema.

Copy
Copied
$  td table:show my_bulk_load_db_cli my_bulk_load_table_cli

Name        : my_bulk_load_db_cli.my_bulk_load_table_cli
Type        : log
Count       : 10
Schema      : (
    id:long
    email:string
)

Or you can inspect the table using the TD Console.

image

Bulk Import Example using Postman Collection

This example uses a curated Postman collection to execute the Treasure Data APIs that perform bulk data import. Before starting this tutorial, download the TD API Postman Collections from github, and then configure the collection as documented in the README file.

A set of Postman API requests are provided that perform the following steps:

  1. Create a database in Treasure Data.
  2. Create a table in the database.
  3. Create a bulk import session.
  4. Upload msgpack.gz file.
  5. Create a job to perform the bulk import.
  6. Get the status of the job.
  7. After the job is ready , commit the job.

Creating a Database

Send the Bulk Data Import > Create database request to create the database in Treasure Data. By default, the database name is set to my_bulk_load_db.

The Create database request requires

  • a unique name for the database

A successful request returns a 200 OK response, and the response body will look something like this:

Copy
Copied
{
    "database": "my_bulk_load_db"
}

Optionally, you can verify the creation of the database and see additional information about it by sending the Bulk Data Import > Get database request. A sucessful response will look something like this:

Copy
Copied
{
    "id": "1697690",
    "name": "my_bulk_load_db",
    "created_at": "2023-03-06 18:35:27 UTC",
    "updated_at": "2023-03-06 18:35:27 UTC",
    "count": 0,
    "organization": null,
    "permission": "owner",
    "delete_protected": false,
    "datatank": false
}

Creating a Table

Send the Bulk Data Import > Create table request to create a table in the database. By default, the table name is set to my_bulk_load_table.

The Create table request requires

  • a database name
  • a unique name for the table

A successful request returns a 200 OK response, and the response body will look something like this:

Copy
Copied
{
    "database": "my_bulk_load_db",
    "table": "my_bulk_load_table",
    "type": "log"
}

Optionally, you can verify the creation of the table and see additional information about it by sending the Bulk Data Import > Get tables request. A successful request returns a 200 OK response, and the response body will look something like this:

Copy
Copied
{
    "id": 12040734,
    "name": "my_bulk_load_table",
    "estimated_storage_size": 0,
    "counter_updated_at": null,
    "last_log_timestamp": null,
    "delete_protected": false,
    "created_at": "2023-03-06 18:44:27 UTC",
    "updated_at": "2023-03-06 18:44:27 UTC",
    "type": "log",
    "include_v": true,
    "count": 0,
    "schema": "[]",
    "expire_days": null
}

If desired, you can further define the schema of the table. You can bulk import into a table that is empty or into a table that already contains data. Additionally, the columns, in the rows of the data that you upload, do not need to match the current schema of the table. If you are uploading columns that don't exist, the new columns are added to the table schema.

Creating a Bulk Import Session

Send the Bulk Data Import > Create bulk import session request to create the session. The session name needs to be unique. In this example, the session name is my_bulk_load_session.

The Create bulk import session request requires

  • a database name
  • a table name
  • a unique name for the session

A successful request returns a 200 OK response, and the response body will look something like this:

Copy
Copied
{
    "bulk_import": "my_bulk_load_session",
    "name": "my_bulk_load_session"
}

Uploading the msgpack.gz File

Send the Bulk Data Import > Upload MessagePack file request to create a table in the database. The last parameter of this request is a part name. This part name is arbitrary. However, it should be unique for every file you upload in the session.

The Upload MessagePack File request requires

  • a session name
  • a unique part name
  • a valid messagepack.gz file

A successful request returns a 200 OK response, and the response body will look something like this:

Copy
Copied
{
    "bulk_import": "my_bulk_load_session",
    "name": "my_bulk_load_session"
}

Getting the Status of the Bulk Import Job

Send the Bulk Data Import > Get status bulk import request to see the status of the import job.

The Get status bulk import request requires

  • a session name

In this first example, the output of the Get status bulk import request shows a status of performing. Also, error_parts and error_records are displayed as null.

Copy
Copied
{
  "job_id": 55469098,
  "database": "my_bulk_load_db",
  "error_parts": null,
  "error_records": null,
  "name": "my_bulk_load_session",
  "status": "performing",
  "table": "my_bulk_load_table",
  "upload_frozen": false,
  "valid_parts": null,
  "valid_records": null
}

In this second example you can see that, when a job is successful, the output changes as follows:

  • status changes to ready
  • valid_parts changes to 1
  • valid_records changes to 10
  • error_parts and error change to 0
Copy
Copied
{
    "job_id": 55469098,
    "database": "my_bulk_load_db",
    "error_parts": 0,
    "error_records": 0,
    "name": "my_bulk_load_session",
    "status": "ready",
    "table": "my_bulk_load_table",
    "upload_frozen": false,
    "valid_parts": 1,
    "valid_records": 10
}

Committing the Bulk Import Job

Send the Bulk Data Import > Commit bulk import job request to commit the results of the job.

The Commit bulk import job request requires

  • a session name

A successful request returns a 200 OK response, and the response body will look something like this:

Copy
Copied
{
    "bulk_import": "my_bulk_load_session",
    "name": "my_bulk_load_session"
}

You can see the results of the import by sending Bulk Data Import > Get tables request. After a successfull bulk import, the response may look something like this:

Copy
Copied
{
    "id": 12040734,
    "name": "my_bulk_load_table",
    "estimated_storage_size": 0,
    "counter_updated_at": null,
    "last_log_timestamp": null,
    "delete_protected": false,
    "created_at": "2023-03-06 18:44:27 UTC",
    "updated_at": "2023-03-06 19:32:35 UTC",
    "type": "log",
    "include_v": true,
    "count": 10,
    "schema": "[[\"id\",\"long\"],[\"email\",\"string\"]]",
    "expire_days": null
}

Or you can inspect the table using TD Console.

image

Bulk Import Example using cURL

In this example, cURL commands are used to perform the following steps:

  1. Create a database in Treasure Data.
  2. Create a table in the database.
  3. Create a bulk import session.
  4. Upload msgpack.gz file.
  5. Create a job to perform the bulk import.
  6. Get the status of the job.
  7. After the job is ready , commit the job.

Creating a Database

Call the /v3/database/create/ endpoint to create the database in Treasure Data. In this example, the name of the database is my_bulk_load_db_curl.

Copy
Copied
$ curl --location --request POST 'https://api-development.treasuredata.com/v3/database/create/my_bulk_load_db_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789'

{"database":"my_bulk_load_db_curl"}%



$ curl --location  --request GET 'https://api-development.treasuredata.com/v3/database/show/my_bulk_load_db_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789' | jq

{
  "id": "1694339",
  "name": "my_bulk_load_db_curl",
  "created_at": "2023-03-03 20:20:45 UTC",
  "updated_at": "2023-03-03 20:20:45 UTC",
  "count": 0,
  "organization": null,
  "permission": "owner",
  "delete_protected": false,
  "datatank": false
}

Creating a Table

Call the /v3/table/create/ endpoint to create the table in the database you just created. In this example, the name of the table is my_bulk_load_table_curl.

Copy
Copied
$ curl --location --request POST 'https://api-development.treasuredata.com/v3/table/create/my_bulk_load_db_curl/my_bulk_load_table_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789'

{"database":"my_bulk_load_db_curl","table":"my_bulk_load_table_curl","type":"log"}



$ curl --location --request GET 'https://api-development.treasuredata.com/v3/table/show/my_bulk_load_db_curl/my_bulk_load_table_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789' | jq

{
  "id": 11922793,
  "name": "my_bulk_load_table_curl",
  "estimated_storage_size": 0,
  "counter_updated_at": null,
  "last_log_timestamp": null,
  "delete_protected": false,
  "created_at": "2023-03-03 20:32:25 UTC",
  "updated_at": "2023-03-03 20:32:25 UTC",
  "type": "log",
  "include_v": true,
  "count": 0,
  "schema": "[]",
  "expire_days": null
}

If desired, you can further define the schema of the table. You can bulk import into a table that is empty or into a table that already contains data. Additionally, the columns, in the rows of the data that you upload, do not need to match the current schema of the table. If you are uploading columns that don't exist, the new columns are added to the table schema.

Creating a Bulk Import Session

Call the /v3/bulk_import/create/ endpoint to create the session. The session name needs to be unique. In this example, the session name is my_bulk_load_session_curl.

Copy
Copied
$ curl --location --request POST 'https://api-development.treasuredata.com/v3/bulk_import/create/my_bulk_load_session_curl/my_bulk_load_db_curl/my_bulk_load_table_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789' | jq

{
  "bulk_import": "my_bulk_load_session_curl",
  "name": "my_bulk_load_session_curl"
}

Uploading the msgpack.gz File

Call the /v3/bulk_import/upload_part/ endpoint to upload the msgpack.gz file. The last parameter of this endpoint is a part name. This part name is arbitrary. However, it should be unique for every file you upload in the session.

In this example the file name you are uploading is named sample.msgpack.gz and the part name is part1.

Copy
Copied
$  curl --location --request PUT 'https://api-development.treasuredata.com/v3/bulk_import/upload_part/my_bulk_load_session_curl/part1' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789' --upload-file sample.msgpack.gz | jq

{
  "bulk_import": "my_bulk_load_session_curl",
  "name": "my_bulk_load_session_curl"
}

Creating a Job to Perform the Bulk Import

Call the /v3/bulk_import/perform/ endpoint to create the job that performs the import.

Copy
Copied
$  curl --location --request POST 'https://api-development.treasuredata.com/v3/bulk_import/perform/my_bulk_load_session_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789'

{
   "job_id":55299453,
   "bulk_import":"my_bulk_load_session_curl",
   "name":"my_bulk_load_session_curl"
}

Checking the Status of the Bulk Import Job

Call the /v3/bulk_import/show/ endpoint to see the status of the import job.

In this first example, the output of the /v3/bulk_import/show/ command shows a status of performing. Also, error_parts and error_records are displayed as null.

Copy
Copied
$  curl --location 'https://api-development.treasuredata.com/v3/bulk_import/show/my_bulk_load_session_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789' | jq

{
  "job_id": 55299453,
  "database": "my_bulk_load_db_curl",
  "error_parts": null,
  "error_records": null,
  "name": "my_bulk_load_session_curl",
  "status": "performing",
  "table": "my_bulk_load_table_curl",
  "upload_frozen": false,
  "valid_parts": null,
  "valid_records": null
}

In this second example you can see that, when a job is successful, the output changes as follows:

  • status changes to ready
  • valid_parts changes to 1
  • valid_records changes to 10
  • error_parts and error change to 0
Copy
Copied
session_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789' | jq

{
  "job_id": 55299453,
  "database": "my_bulk_load_db_curl",
  "error_parts": 0,
  "error_records": 0,
  "name": "my_bulk_load_session_curl",
  "status": "ready",
  "table": "my_bulk_load_table_curl",
  "upload_frozen": false,
  "valid_parts": 1,
  "valid_records": 10
}

Committing the Bulk Import Job

Call the /v3/bulk_import/commit/ endpoint to commit the results of the job.

Copy
Copied
$  curl --location --request POST 'https://api-development.treasuredata.com/v3/bulk_import/commit/my_bulk_load_session_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789' | jq

{
   "bulk_import":"my_bulk_load_session_curl",
   "name":"my_bulk_load_session_curl"
}

You can see the results of the import by calling an API like /v3/table/show/. It shows the new row count and the new schema.

Copy
Copied
$  curl --location --request GET 'https://api-development.treasuredata.com/v3/table/show/my_bulk_load_db_curl/my_bulk_load_table_curl' --header 'Accept: application/json' --header 'Authorization: TD1 123/abcdef•••••••••••••••••••••••0123456789' | jq

{
  "id": 11922793,
  "name": "my_bulk_load_table_curl",
  "estimated_storage_size": 0,
  "counter_updated_at": null,
  "last_log_timestamp": null,
  "delete_protected": false,
  "created_at": "2023-03-03 20:32:25 UTC",
  "updated_at": "2023-03-03 22:35:28 UTC",
  "type": "log",
  "include_v": true,
  "count": 10,
  "schema": "[[\"id\",\"long\"],[\"email\",\"string\"]]",
  "expire_days": null
}

Or you can inspect the table using TD Console.

image