Testing Distributed Components of Storage Engine

In this post we will cover a recent experience of testing distributed storage engine components in Treasure Data.

As developers, we usually write unit tests for our code, integration tests for components, and system tests. However, just having tests is not enough for distributed systems. To fully leverage the benefits of these test cases, it's crucial to run them in a CI/CD pipeline regularly. This also includes running system tests to ensure our distributed storage engine components operate properly at a system level.

What is a Distributed Storage Engine?

At Treasure Data, we use a proprietary distributed analytical database called PlazmaDB. Customer data is stored there and can be queried with Hive or Presto Query Engine. We currently store petabytes of data, and the storage size is growing every day. Therefore, it is essential to have a robust and reliable system capable of handling such amounts of data, providing high availability, and ensuring durability.

Challenges of Testing the TD Storage Engine

Testing distributed systems is not straightforward, and there are many challenges to overcome:

  • Eventual consistency : Although our storage engine core (Plazma) has a strong consistency model, there are some ingest components that process data ingest requests asynchronously.
  • Background processes : These outsourced processes take arbitrary time to finish and, depending on the testing environment, there might be delays in execution as we don't have strict requirements for execution time.
  • Simultaneous test execution : We need to ensure that tests are isolated and don't affect each other.
  • Environment setup : Setting up multiple components is not always easy.
  • Integration with CI/CD : We need to integrate tests with the CI/CD pipeline, checking commits to the code base and running them regularly.

It's also worth mentioning that running tests in parallel is its own unique challenge, especially in shared environments. At Treasure Data, we run tests in such environments, but we've achieved good test isolation using system specifics for isolating data. Thus we can run tests in parallel without any issues.

Below you can see a typical test setup for integrating system tests with a CI/CD pipeline.

Our desired pipeline

Our desired pipeline

This CI/CD setup is simple, having one job that spins tests while blocking the pipeline. From our experience, this job is flaky because it relies on many timings and consumes our CI/CD credits while blocking the pipeline.

Asynchronous Tests Executor Service

To mitigate issues with flaky tests and pipeline blocking, we developed a separate service that is able to spin multiple tests and individual test steps asynchronously in an orchestrated manner. It is also responsible for reporting test results back to the CI/CD pipeline.

System test service

System test service

We are using SQS queue to trigger test run and store each test profile (current test state) in DynamoDB and report test results to S3, from where we can easily get and evaluate results. The purpose of agent is to receive test requests via the SQS queue and pass it to the test executor. The agent also periodically locks up for test profiles in a running state and triggers the test executor for each profile to resume tests. The idea of this service is simple - we should care only about initiating tests with request to SQS queue and processing result in callback handler, everything else should be handled by service automatically. Service can receive test requests from any service or CI/CD, however at the moment it can notify on test finished via callback only CI/CD.

Test executor extension

As a test executor, we chose JUnit 5, which is a test framework for Java. Its components, JUnit Platform and JUnit Jupiter, offer great flexibility for extension, and we were confident that it is sufficient for our needs. Each test handled by this service can be ordered by our custom annotation, it is similar to @Order annotation from JUnit, except we don't wait for all ordered steps to complete in one go and rather run steps asynchronously. We achieve that by overriding default ordering method of JUnit5 with LauncherDiscoveryRequestBuilder class:

val request = LauncherDiscoveryRequestBuilder.request()
    .selectors(...) // pass list of runnable test steps

Test steps that are not yet ready to be executed become not discoverable for JUnit5. Now, we can launch tests steps that were discovered:

val launcher = LauncherFactory.create()

For us, it means that individual test steps can be delayed without clogging the service's test executor with unnecessary waiting and resumed after a specific time passes in a separate run. This allows us to have slim individual test steps with tiny execution times.

Timing issues

However, there is still a timing issue. Our new service addresses this issue by retrying test steps if the step failed due to storage engine is not yet pushed data down the stream. For that purpose we wrote a TestStateManager. It stored something we called a test profile in DynamoDB. It sees all test steps and can decide if test step needs to be retried. Now, test steps failing due to unreliable timing in the distributed system can be retried multiple times before failing the whole test. It also makes test steps resilient to temporary issues like temporary network failures.

Test Results

Our agent and test executor service are responsible for reporting test results back to the CI/CD pipeline and storing test reports on S3 bucket. Here is example of how we use TestExecutionListener for that purpose:

val resultListener = TestResultListener(...)

TestResultListener is a preparatory implementation of TestExecutionListener that collects detailed information of test run results and that information will be distributed further to appropriate handlers.

New CI/CD pipeline

As it might be noticed at this time if we integrate this service with CI/CD, we will have one point where we start test and another point where we get notified about test completion via callback. Here, we had to make a compromise and have a step in CI/CD that can be manually overridden. If we don't want to wait system test to complete, we just manually resume pipeline.

New CI/CD pipeline

New CI/CD pipeline


The new asynchronous test executor service has allowed us to reduce individual test flakiness to basically zero. We were able to seamlessly integrate system tests with multiple CI/CD pipelines. System tests have become assets rather than liabilities, and we can rely on them to catch issues before they hit production.

Latest from our blog

Upcoming Evolution of Treasure Data Query Engines

Journey to Containers in Core Services Worker Platform

Sharing our journey with container technology from Worker Platform that manages the fair scheduling of jobs and helps manage the state of individual jobs.

Automatic Customer Segmentation with Machine Learning

Making Your Auto-Segmentation Model Work For You