py> operator runs a Python script using python command. This feature is called "Custom Scripts" in Treasure Workflow.
See Python API documents for details including variable mappings to keyword arguments.
+step1:
py>: my_step1_method
+step2:
py>: tasks.MyWorkflow.step2When you don't know how to set secrets, please refer to Managing Workflow Secret
aws.s3.region, aws.region
An optional explicit AWS Region in which to access S3. Default is us-east-1.
aws.s3.access_key_id, aws.access_key_id
The AWS Access Key ID to use when accessing S3. When using
s3_credential_provider: assume_role, this is not required.aws.s3.secret_access_key, aws.secret_access_key
The AWS Secret Access Key to use when accessing S3. When using
s3_credential_provider: assume_role, this is not required.
py>: [PACKAGE.CLASS.]METHOD
Name of a method to run.
Examples:
# sample.dig py>: tasks.MyWorkflow.my_taskThis example assume the following directory structure:
. ├── sample.dig └── tasks └── __init__.pyYou can write
__init__.pylike:# __init__.py class MyWorkflow(object): def my_task(self): print("awesome execution")Or, you can create put a Python script named
tasks.pyin a same directory as dig file.. ├── sample.dig └── tasks.pyHere is the example of
tasks.py:# tasks.py class MyWorkflow(object): def my_task(self): print("awesome execution")You can write a function without creating a class as the following:
# simple_sample.dig py>: simple_tasks.my_func. ├── simple_sample.dig └── simple_tasks.py# simple_tasks.py def my_func(): print("simple execution")You can pass arguments to class for initialization by defining arguments under the
py>:operation as the following:# sample.dig +some_task: py>: tasks.MyWorkflow.my_task required1_1: awesome execution required1_2: "awesome execution" required2: {a: "a"} required3: 1 required4: 1.0 required5: [a, 1, 1.0, "a"]Also, you can do the same thing using
_exportas the following:# sample.dig +some_task: _export: required1_1: awesome execution required1_2: "awesome execution" required2: {a: "a"} required3: 1 required4: 1.0 required5: [a, 1, 1.0, "a"] py>: tasks.MyWorkflow.my_taskThis example assume following Python script:
# tasks.py from typing import Union class MyWorkflow(object): def __init__( self, required1_1: str, required1_2: str, required2: dict[str, str], required3: int, required4: float, required5: list[Union[str, int, float]] ): print(f"{required1_1} same as {required1_2}") self.arg2 = required2 print(f"{float(required3)} same as {required4}") self.arg5 = required5 def my_task(self): passOr, you can pass arguments to function as the following:
# sample.dig +some_task: py>: simple_tasks.my_func required1: simple execution required2: {a: "a"}# simple_sample.dig +some_task: _export: required1: simple execution required2: {a: "a"} py>: simple_tasks.my_func# simple_tasks.py def my_func(required1: str, required2: dict[str, str]): print(f"{required1}: {required2}")Finally, you can pass combination (must have different names) of class and mehtod arguments to Python script as the following:
# sample.dig +some_task: py>: tasks.MyWorkflow.my_task required_class_arg: awesome execution required_method_arg: ["a", "b"]# sample.dig +some_task: _export: required_class_arg: awesome execution required_method_arg: ["a", "b"] py>: tasks.MyWorkflow.my_task# tasks.py class MyWorkflow: def __init__(self, required_class_arg: str): self.arg = required_class_arg def my_task(self, required_method_arg: list[str]): print(f"{self.arg}: {required_method_arg}")python: PATH STRING or COMMAND ARGUMENTS LIST
The python defaults to
python. If an alternate python and options are desired, use thepythonoption.Examples:
python: /opt/conda/bin/pythonpython: ["python", "-v"]It is also possible to configure in
_exportsection.Examples:
_export: py: python: /opt/conda/bin/python
The py> operator supports S3 file operations (s3_get and s3_put) with both access key and assume role credential providers.
s3_get: LIST
List of S3 objects to download before executing the Python script. Each item should specify
from(S3 path) andto(local path).The
recursiveoption can be used to download all files in a directory.Examples:
py>: tasks.MyWorkflow.my_task s3_get: - from: my-bucket/data/input-data.csv to: tmp/data/input.csv - from: my-bucket/data/config.json to: tmp/data/config.json - from: my-bucket/scripts/ to: tmp/scripts/ recursive: trues3_put: LIST
List of local files to upload to S3 after executing the Python script. Each item should specify
from(local path) andto(S3 path).The
recursiveoption can be used to upload all files in a directory.Examples:
py>: tasks.MyWorkflow.my_task s3_put: - from: tmp/output/result.csv to: my-bucket/results/result.csv - from: tmp/logs/ to: my-bucket/logs/ recursive: trues3_credential_provider: NAME
The credential provider to use for S3 operations. Supported values are
access_key(default) andassume_role.Examples:
py>: tasks.MyWorkflow.my_task s3_credential_provider: assume_role s3_assume_role_authentication_id: ${auth_id} s3_region: us-east-1 s3_get: ... s3_put: ...s3_assume_role_authentication_id: NUMBER
The authentication ID for assume role when using
s3_credential_provider: assume_role. This corresponds to the TD Data Connector configuration.How to get authentication_id is written in Reusing the existing Authentication.
s3_region: REGION
AWS region for S3 operations. Default is us-east-1.