Python Client for TD-API

td-client vs pytd

The td-client for Python wraps the functionality of the TD-API REST API; its capability is limited to what the TD-API can do. The pytd library has direct access to presto and the plazma backend as well as multiple data ingestion methods and a variety of utility functions.

This article will explain how to use Python bindings for the TD-API REST API.

Prerequisites

  • Basic knowledge of Treasure Data, including the Toolbelt .
  • Python 3.5+

Installation

The Python bindings are released on PyPI as td-client. You can install the package from pip or easy_install.

Copy
Copied
pip install td-client

List Databases and Tables

The example below lists the databases and tables.

Copy
Copied
import os
import tdclient
apikey = os.getenv("TD_API_KEY")
with tdclient.Client(apikey) as client:
    for db in client.databases():
        for table in db.tables():
            print(table.db_name)
            print(table.table_name)
            print(table.count)

Issue Queries

The example below issues a SQL query from a Python program. The query API is asynchronous--you can check for query completion by polling the job periodically (i.e. by issuing job.finished calls).

Copy
Copied
import os
import tdclient
apikey = os.getenv("TD_API_KEY")
with tdclient.Client(apikey) as client:
    job = client.query("sample_datasets", "SELECT COUNT(1) FROM www_access")
    # sleep until job's finish
    job.wait()
    for row in job.result():
        print(row)
Note

job.result() does not put the job result into memory. It iterates through the rows in a streaming fashion.

If you would like to get the result’s schema, you need to call job.result_schema after job finished.

List and Get the Status of Jobs

The example below lists and gets the status of jobs.

Copy
Copied
import os
import tdclient
apikey = os.getenv("TD_API_KEY")
with tdclient.Client(apikey) as client:
    # recent 20 jobs
    len(client.jobs())

    # recent 127 jobs of specific status
    client.jobs(0, 127, "running")
    client.jobs(0, 127, "success")
    client.jobs(0, 127, "error")
    client.jobs(0, 127, "killed")

    # get job status
    client.job(job_id)

    # get job result
    client.job_result(job_id)

Importing Data

There are two ways to import data via the TD API: in a streaming manner, similar to fluentd, or in a batch manner via bulk import.

Streaming Import

Warning

Importing data in a streaming manner requires certain amount of time to be ready to query since schema update will be executed with delay.

Copy
Copied
import sys
import tdclient

with tdclient.Client() as td:
    for file_name in sys.argv[:1]:
        td.import_file("mydb", "mytbl", "csv", file_name)

Bulk Import

Importing data into Treasure Data in batch manner.

Copy
Copied
import sys
import tdclient
import uuid
import warnings

if len(sys.argv) <= 1:
    sys.exit(0)

with tdclient.Client() as client:
    session_name = "session-{}".format(uuid.uuid1())
    bulk_import = client.create_bulk_import(session_name, "mydb", "mytbl")
    try:
        for file_name in sys.argv[1:]:
            part_name = "part-{}".format(file_name)
            bulk_import.upload_file(part_name, "json", file_name)
        bulk_import.freeze()
    except:
        bulk_import.delete()
        raise
    bulk_import.perform(wait=True)
    if 0 < bulk_import.error_records:
        warnings.warn("detected {} error records.".format(bulk_import.error_records))
    if 0 < bulk_import.valid_records:
        print("imported {} records.".format(bulk_import.valid_records))
    else:
        raise(RuntimeError("no records have been imported: {}".format(bulk_import.name)))
    bulk_import.commit(wait=True)
    bulk_import.delete()

API Reference

Both the Python td-client API reference and file import parameters documentation can be found at tdclient.readthedocs.io.

Further Reading