Python Client for TD-API
The td-client for Python wraps the functionality of the TD-API REST API; its capability is limited to what the TD-API can do. The pytd library has direct access to presto and the plazma backend as well as multiple data ingestion methods and a variety of utility functions.
This article will explain how to use Python bindings for the TD-API REST API.
Prerequisites
- Basic knowledge of Treasure Data, including the Toolbelt .
- Python 3.5+
Installation
The Python bindings are released on PyPI
as td-client
. You can install the
package from pip
or easy_install
.
pip install td-client
List Databases and Tables
The example below lists the databases and tables.
import os
import tdclient
apikey = os.getenv("TD_API_KEY")
with tdclient.Client(apikey) as client:
for db in client.databases():
for table in db.tables():
print(table.db_name)
print(table.table_name)
print(table.count)
Issue Queries
The example below issues a SQL query from a Python program. The query API is asynchronous--you can check for query completion by polling the job periodically (i.e. by issuing job.finished
calls).
import os
import tdclient
apikey = os.getenv("TD_API_KEY")
with tdclient.Client(apikey) as client:
job = client.query("sample_datasets", "SELECT COUNT(1) FROM www_access")
# sleep until job's finish
job.wait()
for row in job.result():
print(row)
job.result()
does not put the job result into memory. It iterates through the rows in a streaming fashion.
If you would like to get the result’s schema, you need to call
job.result_schema
after job finished.
List and Get the Status of Jobs
The example below lists and gets the status of jobs.
import os
import tdclient
apikey = os.getenv("TD_API_KEY")
with tdclient.Client(apikey) as client:
# recent 20 jobs
len(client.jobs())
# recent 127 jobs of specific status
client.jobs(0, 127, "running")
client.jobs(0, 127, "success")
client.jobs(0, 127, "error")
client.jobs(0, 127, "killed")
# get job status
client.job(job_id)
# get job result
client.job_result(job_id)
Importing Data
There are two ways to import data via the TD API: in a streaming manner, similar to fluentd, or in a batch manner via bulk import.
Streaming Import
Importing data in a streaming manner requires certain amount of time to be ready to query since schema update will be executed with delay.
import sys
import tdclient
with tdclient.Client() as td:
for file_name in sys.argv[:1]:
td.import_file("mydb", "mytbl", "csv", file_name)
Bulk Import
Importing data into Treasure Data in batch manner.
import sys
import tdclient
import uuid
import warnings
if len(sys.argv) <= 1:
sys.exit(0)
with tdclient.Client() as client:
session_name = "session-{}".format(uuid.uuid1())
bulk_import = client.create_bulk_import(session_name, "mydb", "mytbl")
try:
for file_name in sys.argv[1:]:
part_name = "part-{}".format(file_name)
bulk_import.upload_file(part_name, "json", file_name)
bulk_import.freeze()
except:
bulk_import.delete()
raise
bulk_import.perform(wait=True)
if 0 < bulk_import.error_records:
warnings.warn("detected {} error records.".format(bulk_import.error_records))
if 0 < bulk_import.valid_records:
print("imported {} records.".format(bulk_import.valid_records))
else:
raise(RuntimeError("no records have been imported: {}".format(bulk_import.name)))
bulk_import.commit(wait=True)
bulk_import.delete()
API Reference
Both the Python td-client API reference and file import parameters documentation can be found at tdclient.readthedocs.io.
Further Reading
- GitHub Repo - Source code for td-client python.
- Python td-client API Reference - readthedocs library reference.
- TD API Reference - REST API this td-client library wraps.