Implement File Processing Workflows in Python using Cadence

It’s common for use cases such as data science and CI/CD to need to create workflows that process large files. In this post, I’ll show how workflows such as these can be implemented in Cadence using Python.

Imagine a workflow with the following activities:

  1. Download file
  2. Process downloaded file, producing processed file
  3. Upload processed file

The activities 2 and 3 depends on the results of the previously invoked activity. These results can be very big so we cannot use function return values here.

One approach is to store the temporary files on an object storage system such as S3 or Minio but this creates another point of failure.

A simpler approach is to store the results in the worker machine’s filesystem itself but to do so we would need a way to route activities 2 and 3 to the same worker machine that processed activity 1.

The Cadence Java Client has an ingenious approach to this problem – using host specific task lists. Each worker instance listens on two task lists - a task list common to all workers and a host specific task list.

When the workflow executes the first activity, “Download file”, the activity is scheduled on the common task list. Once the file is downloaded, the activity returns back to the workflow two values - a.) the path to the downloaded file on the worker’s machine and b.) the name of the worker’s host specific task list.

The workflow then creates an activity stub that schedules activities on the host specific task list. The workflow uses this activity stub to execute activities 2 and 3 – and thereby ensuring that they run in the same worker as activity 1.

One problem here is that activity 2 and activity 3 are not retryable if the worker fails. Therefore, if activity 2 or activity 3 fails, we need to start again at activity 1 so that it has an opportunity to run on a different worker.

Below is my port of this approach to the cadence-python library:

import os
import socket
import sys
from asyncio import CancelledError

from cadence.activity_method import activity_method, ActivityOptions
from cadence.workerfactory import WorkerFactory
from cadence.workflow import workflow_method, WorkflowClient, Workflow

DOMAIN = "sample"
TASK_LIST = "FileProcessing"
HOST_TASK_LIST = TASK_LIST + "-%d@%s" % (os.getpid(), socket.gethostname())


class StoreActivities:
    @activity_method(schedule_to_close_timeout_seconds=60 * 3, task_list=TASK_LIST)
    async def download(self, url: str) -> dict:
        pass

    @activity_method(schedule_to_close_timeout_seconds=60 * 3, task_list=TASK_LIST)
    async def process(self, input_file_name: str) -> str:
        pass

    @activity_method(schedule_to_close_timeout_seconds=60 * 3, task_list=TASK_LIST)
    async def upload(self, local_file_name: str, url: str):
        pass


class StoreActivitiesImpl(StoreActivities):
    def __init__(self, task_list: str):
        self.task_list = task_list

    def download(self, url: str) -> dict:
        print(f"Downloading {url}... to downloaded-file.txt")
        return {
            "file_name": "downloaded-file.txt",
            "host_task_list": self.task_list
        }

    def process(self, input_file_name: str) -> str:
        print(f"Processing {input_file_name}... and generating processed-file.txt")
        return "processed-file.txt"

    def upload(self, local_file_name: str, url: str):
        print(f"Uploading {local_file_name} to {url}...")


class FileProcessingWorkflow:
    @workflow_method(execution_start_to_close_timeout_seconds=60 * 30, task_list=TASK_LIST)
    async def process_file(self, source: str, destination: str) -> str:
        raise NotImplementedError


class FileProcessWorkflowImpl(FileProcessingWorkflow):
    def __init__(self):
        self.default_task_list_store: StoreActivities = Workflow.new_activity_stub(StoreActivities)
        self.logger = Workflow.get_logger("FileProcessWorkflowImpl")

    async def process_file(self, source: str, destination: str) -> str:
        RETRY_ATTEMPTS = 3
        for _ in range(RETRY_ATTEMPTS):
            try:
                await self.process_file_impl(source, destination)
                break
            except Exception as ex:
                if isinstance(ex, CancelledError):
                    raise ex
                self.logger.error("Error: %s", str(ex))
        return "done"

    async def process_file_impl(self, source: str, destination: str):
        downloaded: dict = await self.default_task_list_store.download(source)
        host_specific_store: StoreActivities = Workflow.new_activity_stub(StoreActivities,
                                                                          activity_options=ActivityOptions(
                                                                              task_list=downloaded["host_task_list"]))
        processed: str = await host_specific_store.process(downloaded["file_name"])
        await host_specific_store.upload(processed, destination)


if __name__ == '__main__':
    factory = WorkerFactory("localhost", 7933, DOMAIN)
    task_list_worker = factory.new_worker(TASK_LIST)
    store_activities = StoreActivitiesImpl(task_list=HOST_TASK_LIST)
    task_list_worker.register_activities_implementation(store_activities, "StoreActivities")
    task_list_worker.register_workflow_implementation_type(FileProcessWorkflowImpl)

    host_task_list_worker = factory.new_worker(HOST_TASK_LIST)
    host_task_list_worker.register_activities_implementation(store_activities, "StoreActivities")
    factory.start()

    client = WorkflowClient.new_client(domain=DOMAIN)
    workflow: FileProcessWorkflowImpl = client.new_workflow_stub(FileProcessingWorkflow)
    result = workflow.process_file("http://example.com/source.txt", "http://example.com/dest.txt")

    print("Stopping workers....")
    task_list_worker.stop()
    host_task_list_worker.stop()
    print("Workers stopped...")
    sys.exit(0)

Comments

comments powered by Disqus