© Mohammed Firdaus

This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

1. Getting Started

1.1. Hello World

Workflow and activity worker:

helloworld-worker.py
import asyncio
import logging
from datetime import timedelta

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"


# Activities Interface
class GreetingActivities:
    @activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=60))
    async def compose_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError


# Activities Implementation
class GreetingActivitiesImpl:
    async def compose_greeting(self, greeting: str, name: str) -> str:
        return greeting + " " + name


# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name: str) -> str:
        raise NotImplementedError


# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):

    def __init__(self):
        self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

    async def get_greeting(self, name):
        return await self.greeting_activities.compose_greeting("Hello!", name)


async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
    worker.register_workflow_implementation_type(GreetingWorkflowImpl)
    factory.start()
    print("Worker started")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()  # (1)
    asyncio.ensure_future(worker_main())
    loop.run_forever()

Start the worker:

% python helloworld.py
Worker started
INFO:temporal.activity_loop:Activity task worker started: 53801@onepointzero.local
INFO:temporal.decision_loop:Decision task worker started: 53801@onepointzero.local

1.1.1. Invoking the workflow using tctl

% tctl --ns default workflow run workflow --taskqueue HelloWorld --workflow_type 'GreetingWorkflow::get_greeting' --input  '"Bob"'   --execution_timeout 3000
Running execution:
  Workflow Id : 27a938ca-910b-480f-bf88-35b1470c2314
  Run Id      : 0433b954-f17f-4175-a750-e2766b623b11
  Type        : GreetingWorkflow::get_greeting
  Namespace   : default
  Task Queue  : HelloWorld
  Args        : [Bob]
Progress:
  1, 2021-02-27T19:54:56Z, WorkflowExecutionStarted
  2, 2021-02-27T19:54:56Z, WorkflowTaskScheduled
  3, 2021-02-27T19:54:56Z, WorkflowTaskStarted
  4, 2021-02-27T19:54:56Z, WorkflowTaskCompleted
  5, 2021-02-27T19:54:56Z, ActivityTaskScheduled
  6, 2021-02-27T19:54:56Z, ActivityTaskStarted
  7, 2021-02-27T19:54:56Z, ActivityTaskCompleted
  8, 2021-02-27T19:54:56Z, WorkflowTaskScheduled
  9, 2021-02-27T19:54:56Z, WorkflowTaskStarted
  10, 2021-02-27T19:54:56Z, WorkflowTaskCompleted
  11, 2021-02-27T19:54:56Z, WorkflowExecutionCompleted

Result:
  Run Time: 1 seconds
  Status: COMPLETED
  Output: [Hello! Bob]

1.1.2. Programmatically starting the workflow

helloworld-invoker.py
import asyncio
from temporal.workflow import workflow_method, WorkflowClient

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"


# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name: str) -> str:
        raise NotImplementedError


async def main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
    result = await greeting_workflow.get_greeting("Bob")
    print("Workflow returned:", result)


if __name__ == '__main__':
    asyncio.run(main())

Running:

% python helloworld-invoker.py
Workflow returned: Hello! Bob

1.2. Signals and Queries

Signals and queries allow us to interact with a running workflow. Let’s modify the greeting example so that:

  • we can update the salutation

  • retrieve the greeting based on the current salutation set

1.2.1. Defining signal and query methods in workflows

helloworld-signal-query-worker.py
import asyncio
import logging
from datetime import timedelta

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient, signal_method, \
    query_method

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"

# Activities Interface
class GreetingActivities:
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def compose_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError

# Activities Implementation
class GreetingActivitiesImpl:
    async def compose_greeting(self, greeting: str, name: str) -> str:
        return greeting + " " + name

# Workflow Interface
class GreetingWorkflow:

    @signal_method
    async def terminate(self):
        raise NotImplementedError

    @signal_method
    async def update_greeting(self, salutation: str):
        raise NotImplementedError

    @query_method
    async def get_greeting(self):
        raise NotImplementedError

    @workflow_method(task_queue=TASK_QUEUE)
    async def greeting_workflow(self, name: str) -> str:
        raise NotImplementedError

# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
    def __init__(self):
        self.greeting_activities: GreetingActivities = \
            Workflow.new_activity_stub(GreetingActivities)
        self.greeting = ""
        self._terminate = False

    async def terminate(self):
        self._terminate = True

    async def update_greeting(self, name: str):
        self.greeting = await self.greeting_activities.compose_greeting("Hello!", name)

    async def get_greeting(self):
        return self.greeting

    async def greeting_workflow(self, name):
        self.greeting = await self.greeting_activities.compose_greeting("Hello!", name)
        await Workflow.await_till(lambda: self._terminate)

async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(GreetingActivitiesImpl(),
                                              "GreetingActivities")
    worker.register_workflow_implementation_type(GreetingWorkflowImpl)
    factory.start()
    print("Worker started")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(worker_main())
    loop.run_forever()
Run the worker
% python helloworld-signal-query-worker.py
Worker started
INFO:temporal.activity_loop:Activity task worker started: 56226@onepointzero.local
INFO:temporal.decision_loop:Decision task worker started: 56226@onepointzero.local
Invoking the workflow
% tctl --ns default workflow run workflow --taskqueue HelloWorld --workflow_type 'GreetingWorkflow::greeting_workflow' --input  '"Bob"'   --execution_timeout 3000 --workflow_id "greeting-workflow-1"
Running execution:
  Workflow Id : greeting-workflow-1
  Run Id      : 518590b2-6e23-4f5b-8319-9565f6ed9178
  Type        : GreetingWorkflow::greeting_workflow
  Namespace   : default
  Task Queue  : HelloWorld
  Args        : [Bob]
Progress:
  1, 2021-02-28T09:20:15Z, WorkflowExecutionStarted
  2, 2021-02-28T09:20:15Z, WorkflowTaskScheduled
  3, 2021-02-28T09:20:15Z, WorkflowTaskStarted
  4, 2021-02-28T09:20:15Z, WorkflowTaskCompleted
  5, 2021-02-28T09:20:15Z, ActivityTaskScheduled
  6, 2021-02-28T09:20:15Z, ActivityTaskStarted
  7, 2021-02-28T09:20:15Z, ActivityTaskCompleted
  8, 2021-02-28T09:20:15Z, WorkflowTaskScheduled
  9, 2021-02-28T09:20:15Z, WorkflowTaskStarted
  10, 2021-02-28T09:20:15Z, WorkflowTaskCompleted

Time elapse: 8s

Unlike with the previous workflow, this workflow doesn’t exit immediately. That is expected.

1.2.2. Invoking the query and signal via tctl

In another terminal window run the following:

Invoke the queries and signals of the workflow
% tctl --ns default workflow query --workflow_id "greeting-workflow-1" --query_type "GreetingWorkflow::get_greeting"
Query result:
[Hello! Bob]
% tctl --ns default workflow signal --workflow_id "greeting-workflow-1" --name "GreetingWorkflow::update_greeting" --input '"Alice"'
Signal workflow succeeded.
% tctl --ns default workflow query --workflow_id "greeting-workflow-1" --query_type "GreetingWorkflow::get_greeting"
Query result:
[Hello! Alice]

1.2.3. Invoking the query and signal programmatically

helloworld-signal-query-invoker.py
import asyncio
from temporal.workflow import workflow_method, WorkflowClient, signal_method, query_method

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"

# Workflow Interface
class GreetingWorkflow:

    @signal_method
    async def terminate(self):
        raise NotImplementedError

    @signal_method
    async def update_greeting(self, salutation: str):
        raise NotImplementedError

    @query_method
    async def get_greeting(self):
        raise NotImplementedError

    @workflow_method(task_queue=TASK_QUEUE)
    async def greeting_workflow(self, name: str) -> str:
        raise NotImplementedError

async def main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
    context = await WorkflowClient.start(greeting_workflow.greeting_workflow, "Bob")
    await asyncio.sleep(5)  # Give workflow some time to run (1)
    print("Invoking query get_greeting():", await greeting_workflow.get_greeting())
    print("Invoking signal update_greeting('Alice')")
    await greeting_workflow.update_greeting('Alice')
    await asyncio.sleep(5)  # Give  signal some time to run (1)
    print("Invoking query get_greeting():", await greeting_workflow.get_greeting())
    await greeting_workflow.terminate()
    await client.wait_for_close(context)

if __name__ == '__main__':
    asyncio.run(main())
  1. Both the workflow and signal methods will invoke activities. Since this is done asynchronously (delegated to a TaskQueue) it’s possible that the query is invoked while the activity is being executed which will cause to retrieve a soon to be stale greeting.

Output:

% python helloworld-signal-query-invoker.py
Invoking query get_greeting(): Hello! Bob
Invoking signal update_greeting('Alice')
Invoking query get_greeting(): Hello! Alice

1.3. Accessing External Resources

Activities are coroutines so be sure to use asyncio clients for the resources that you’re accessing, or you will risk blocking the event loop, e.g.:

  • MongoDB: motor

  • HTTP: aiohttp

  • Redis: aioredis

  • PostgreSQL: aiopg etc..

For this example we are going to be using MongoDB.

Run MongoDB
% docker run -e MONGO_INITDB_ROOT_USERNAME=mongoadmin -e MONGO_INITDB_ROOT_PASSWORD=secret -p 27017:27017 mongo
Install asyncio compatible MongoDB library for Python
% pip install motor

1.3.1. Workflow and activity that utilizes an external resource (MongoDB)

helloworld-activities-external-resources-worker.py
import asyncio
import logging
from datetime import timedelta

from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorDatabase
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

import motor.motor_asyncio

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"

class GreetingActivities:
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def create_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def compose_greeting(self, greeting_id) -> str:
        raise NotImplementedError

class GreetingActivitiesImpl:
    def __init__(self, db):
        self.db = db

    async def create_greeting(self, greeting: str, name: str) -> str:
        result = await self.db.greetings.insert_one({
            "greeting": greeting,
            "name": name
        })
        return str(result.inserted_id)

    async def compose_greeting(self, greeting_id) -> str:
        document = await self.db.greetings.find_one(ObjectId(greeting_id))
        return document["greeting"] + " "  + document["name"]

class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def create_and_retrieve_greeting(self, name) -> str:
        raise NotImplementedError

class GreetingWorkflowImpl(GreetingWorkflow):

    def __init__(self):
        self.greeting_activities: GreetingActivities = \
            Workflow.new_activity_stub(GreetingActivities)

    async def create_and_retrieve_greeting(self, name) -> str:
        greeting_id = await self.greeting_activities.create_greeting("Hello!", name)
        greeting_document = await self.greeting_activities.compose_greeting(greeting_id)
        return greeting_document

async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)

    motor_client = motor.motor_asyncio.AsyncIOMotorClient(
        'mongodb://mongoadmin:secret@localhost:27017/')
    db = motor_client.greeting_db
    activities_impl = GreetingActivitiesImpl(db)
    worker.register_activities_implementation(activities_impl, "GreetingActivities")

    worker.register_workflow_implementation_type(GreetingWorkflowImpl)
    factory.start()
    print("Worker started")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(worker_main())
    loop.run_forever()
Start the worker
% python helloworld-activities-external-resources-worker.py
Worker started
INFO:temporal.activity_loop:Activity task worker started: 61100@onepointzero.local
INFO:temporal.decision_loop:Decision task worker started: 61100@onepointzero.local
Running the workflow
tctl --ns default workflow run --workflow_type "GreetingWorkflow::create_and_retrieve_greeting" --taskqueue "HelloWorld"  --execution_timeout 300 --input '"Bob"'
Running execution:
  Workflow Id : 0e78ca2b-03e4-47ff-b93c-537721488690
  Run Id      : 115bf8a8-6b10-44fc-891d-8aebb99d8b2d
  Type        : GreetingWorkflow::create_and_retrieve_greeting
  Namespace   : default
  Task Queue  : HelloWorld
  Args        : [Bob]
Progress:
  1, 2021-02-28T16:21:31Z, WorkflowExecutionStarted
  2, 2021-02-28T16:21:31Z, WorkflowTaskScheduled
  3, 2021-02-28T16:21:31Z, WorkflowTaskStarted
  4, 2021-02-28T16:21:31Z, WorkflowTaskCompleted
  5, 2021-02-28T16:21:31Z, ActivityTaskScheduled
  6, 2021-02-28T16:21:31Z, ActivityTaskStarted
  7, 2021-02-28T16:21:31Z, ActivityTaskCompleted
  8, 2021-02-28T16:21:31Z, WorkflowTaskScheduled
  9, 2021-02-28T16:21:31Z, WorkflowTaskStarted
  10, 2021-02-28T16:21:31Z, WorkflowTaskCompleted
  11, 2021-02-28T16:21:31Z, ActivityTaskScheduled
  12, 2021-02-28T16:21:31Z, ActivityTaskStarted
  13, 2021-02-28T16:21:31Z, ActivityTaskCompleted
  14, 2021-02-28T16:21:31Z, WorkflowTaskScheduled
  15, 2021-02-28T16:21:31Z, WorkflowTaskStarted
  16, 2021-02-28T16:21:31Z, WorkflowTaskCompleted
  17, 2021-02-28T16:21:31Z, WorkflowExecutionCompleted

Result:
  Run Time: 1 seconds
  Status: COMPLETED
  Output: [Hello! Bob]

1.4. Invoking Workflows at Regular Intervals

Workflow can be triggered at regular intervals using cron workflows.

1.4.1. Cron workflow example

We’ll reuse the helloworld example from earlier:

import asyncio
import logging
from datetime import timedelta

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"


# Activities Interface
class GreetingActivities:
    @activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=60))
    async def compose_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError


# Activities Implementation
class GreetingActivitiesImpl:
    async def compose_greeting(self, greeting: str, name: str) -> str:
        return greeting + " " + name


# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name: str) -> str:
        raise NotImplementedError


# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):

    def __init__(self):
        self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

    async def get_greeting(self, name):
        return await self.greeting_activities.compose_greeting("Hello!", name)


async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
    worker.register_workflow_implementation_type(GreetingWorkflowImpl)
    factory.start()
    print("Worker started")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()  # (1)
    asyncio.ensure_future(worker_main())
    loop.run_forever()

1.4.2. Starting the cron workflow from the command line

% tctl --ns default workflow start workflow --taskqueue HelloWorld --workflow_type 'GreetingWorkflow::get_greeting' --input  '"Bob"' --execution_timeout '315360000' --cron "*/5 * * * *" --workflow_id "greeting-cron-workflow"
Started Workflow Id: greeting-cron-workflow, run Id: 4c5cc4e3-62aa-40f0-bf0a-be4fbc5e3711
cron running

1.4.3. Starting the cron workflow from Python

import asyncio
from datetime import timedelta

from temporal.api.enums.v1 import WorkflowIdReusePolicy
from temporal.workflow import workflow_method, WorkflowClient, WorkflowOptions

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"


# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name: str) -> str:
        raise NotImplementedError


async def main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    workflow_options: WorkflowOptions = WorkflowOptions()
    # by default workflow_options.workflow_execution_timeout is set to 0 so the cron workflow
    # will run forever.
    workflow_options.cron_schedule = "*/5 * * * *"
    workflow_options.workflow_id = "greeting-cron-workflow"
    workflow_options.workflow_id_reuse_policy = WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
    greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow,
                                                                   workflow_options=workflow_options)
    await WorkflowClient.start(greeting_workflow.get_greeting, "Bob")
    print("Workflow started")


if __name__ == '__main__':
    asyncio.run(main())

1.4.4. Terminating the cron workflow

% tctl workflow terminate --workflow_id "greeting-cron-workflow"
Terminate workflow succeeded.

2. Advanced Activities

2.1. Handling Errors

2.1.1. Detecting activity timeouts

activity-timeouts.py
import asyncio
import logging
from datetime import timedelta

from temporal.activity_method import activity_method, RetryParameters
from temporal.exceptions import ActivityFailureException, ActivityTaskTimeoutException
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "ActivityTimeoutTaskQueue"
NAMESPACE = "default"

class Activities:
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=10))
    async def sleep(self) -> str:
        await asyncio.sleep(20)

class MainWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def main(self) -> str:
        activities = Workflow.new_activity_stub(Activities)
        try:
            await activities.sleep()
        except ActivityFailureException as ex:
            cause = ex.get_cause()
            if isinstance(cause, ActivityTaskTimeoutException):
                return "timeout"
            else:
                raise ex
        return "did not timeout"

async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(Activities())
    worker.register_workflow_implementation_type(MainWorkflow)
    factory.start()
    print("Worker started")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(worker_main())
    loop.run_forever()
Invoking the workflow
$ tctl workflow run --taskqueue "ActivityTimeoutTaskQueue" --workflow_type "MainWorkflow::main" --execution_timeout 600
Running execution:
  Workflow Id : 7e26c623-40fa-413e-9cae-05ea285b1b07
  Run Id      : 4669cf23-0088-4687-b50e-d14b98276a77
  Type        : MainWorkflow::main
  Namespace   : default
  Task Queue  : ActivityTimeoutTaskQueue
  Args        : []
Progress:
  1, 2021-03-16T19:02:11Z, WorkflowExecutionStarted
  2, 2021-03-16T19:02:11Z, WorkflowTaskScheduled
  3, 2021-03-16T19:02:11Z, WorkflowTaskStarted
  4, 2021-03-16T19:02:11Z, WorkflowTaskCompleted
  5, 2021-03-16T19:02:11Z, ActivityTaskScheduled
  6, 2021-03-16T19:02:11Z, ActivityTaskStarted
  7, 2021-03-16T19:02:21Z, ActivityTaskTimedOut
  8, 2021-03-16T19:02:21Z, WorkflowTaskScheduled
  9, 2021-03-16T19:02:21Z, WorkflowTaskStarted
  10, 2021-03-16T19:02:21Z, WorkflowTaskCompleted
  11, 2021-03-16T19:02:21Z, WorkflowExecutionCompleted

Result:
  Run Time: 11 seconds
  Status: COMPLETED
  Output: ["timeout"]

2.2. File Processing Workflows

2.2.1. Worker

file-processing-workflows.py
import asyncio
import logging
import os
import socket
from datetime import timedelta

from temporal.activity_method import activity_method, ActivityOptions
from temporal.exceptions import ActivityFailureException
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "FileProcessing"
HOST_TASK_QUEUE = TASK_QUEUE + "-%d@%s" % (os.getpid(), socket.gethostname())
RETRY_ATTEMPTS = 3
NAMESPACE = "default"

class StoreActivities:
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def download(self, url: str) -> dict:
        raise NotImplementedError

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def process(self, input_file_name: str) -> str:
        raise NotImplementedError

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def upload(self, local_file_name: str, url: str):
        raise NotImplementedError

class StoreActivitiesImpl(StoreActivities):
    def __init__(self, task_queue: str):
        self.task_queue = task_queue

    async def download(self, url: str) -> dict:
        print(f"Downloading {url}... to downloaded-file.txt")
        # ....... actual code for downloading url goes here
        return {
            "file_name": "downloaded-file.txt",
            "host_task_queue": self.task_queue
        }

    async def process(self, input_file_name: str) -> str:
        print(f"Processing {input_file_name}... and generating processed-file.txt")
        # ....... actual code for for processing input_file_name goes here
        return "processed-file.txt"

    async def upload(self, local_file_name: str, url: str):
        print(f"Uploading {local_file_name} to {url}...")
        # ...... actual code for uploading local_file_name goes here

class FileProcessingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def process_file(self, source: str, destination: str) -> str:
        raise NotImplementedError

class FileProcessWorkflowImpl(FileProcessingWorkflow):
    def __init__(self):
        self.default_task_queue_store: StoreActivities = Workflow.new_activity_stub(
            StoreActivities)
        self.logger = Workflow.get_logger("FileProcessWorkflowImpl")

    async def process_file(self, source: str, destination: str) -> str:
        exception = None
        for _ in range(RETRY_ATTEMPTS):
            try:
                exception = None
                await self.process_file_impl(source, destination)
                break
            except ActivityFailureException as ex:
                self.logger.error("Error: %s", str(ex))
                exception = ex
        if exception:
            raise exception
        return "done"

    async def process_file_impl(self, source: str, destination: str):
        downloaded: dict = await self.default_task_queue_store.download(source)
        host_specific_store: StoreActivities
        activity_options = ActivityOptions(task_queue=downloaded["host_task_queue"])
        host_specific_store = Workflow.new_activity_stub(StoreActivities,
                                                         activity_options=activity_options)
        processed: str = await host_specific_store.process(downloaded["file_name"])
        await host_specific_store.upload(processed, destination)

async def worker_main():
    store_activities = StoreActivitiesImpl(task_queue=HOST_TASK_QUEUE)
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)

    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(store_activities, "StoreActivities")
    worker.register_workflow_implementation_type(FileProcessWorkflowImpl)

    worker = factory.new_worker(HOST_TASK_QUEUE)
    worker.register_activities_implementation(store_activities, "StoreActivities")

    factory.start()
    print("Worker started")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(worker_main())
    loop.run_forever()

2.2.2. Invoking the workflow

When you inspect the workflow history in temporal web you should see that the StoreActivities::download was invoked via the FileProcessing task queue and that the other two activities were invoked via the host specific task queue which in my case was FileProcessing-15201@onepointzero.local.

% tctl workflow run --workflow_type "FileProcessingWorkflow::process_file" --taskqueue "FileProcessing" --execution_timeout 120 --input '"http://example.com/source.txt"' --input '"http://example.com/dest.txt"'
Running execution:
  Workflow Id : 9ea8e69c-b064-45fe-9152-c211d1f7f9c4
  Run Id      : 2eb957aa-51b9-4b08-ad3c-b533a1dd7a55
  Type        : FileProcessingWorkflow::process_file
  Namespace   : default
  Task Queue  : FileProcessing
  Args        : [http://example.com/source.txt,
              : http://example.com/dest.txt]
Progress:
  1, 2021-03-11T16:22:21Z, WorkflowExecutionStarted
  2, 2021-03-11T16:22:21Z, WorkflowTaskScheduled
  3, 2021-03-11T16:22:21Z, WorkflowTaskStarted
  4, 2021-03-11T16:22:21Z, WorkflowTaskCompleted
  5, 2021-03-11T16:22:21Z, ActivityTaskScheduled
  6, 2021-03-11T16:22:21Z, ActivityTaskStarted
  7, 2021-03-11T16:22:21Z, ActivityTaskCompleted
  8, 2021-03-11T16:22:21Z, WorkflowTaskScheduled
  9, 2021-03-11T16:22:21Z, WorkflowTaskStarted
  10, 2021-03-11T16:22:21Z, WorkflowTaskCompleted
  11, 2021-03-11T16:22:21Z, ActivityTaskScheduled
  12, 2021-03-11T16:22:21Z, ActivityTaskStarted
  13, 2021-03-11T16:22:21Z, ActivityTaskCompleted
  14, 2021-03-11T16:22:21Z, WorkflowTaskScheduled
  15, 2021-03-11T16:22:21Z, WorkflowTaskStarted
  16, 2021-03-11T16:22:21Z, WorkflowTaskCompleted
  17, 2021-03-11T16:22:21Z, ActivityTaskScheduled
  18, 2021-03-11T16:22:21Z, ActivityTaskStarted
  19, 2021-03-11T16:22:21Z, ActivityTaskCompleted
  20, 2021-03-11T16:22:21Z, WorkflowTaskScheduled
  21, 2021-03-11T16:22:21Z, WorkflowTaskStarted
  22, 2021-03-11T16:22:21Z, WorkflowTaskCompleted
  23, 2021-03-11T16:22:21Z, WorkflowExecutionCompleted

Result:
  Run Time: 1 seconds
  Status: COMPLETED
  Output: [done]

2.3. Untyped Activity Stubs

Untyped activity stubs allow activities to be invoked without the activity interface definition.

invoke-activity-by-string.py
import asyncio
import logging
from datetime import timedelta
from typing import List

from temporal.activity_method import ActivityOptions
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

logging.basicConfig(level=logging.INFO)

NAMESPACE = "default"
TASK_QUEUE = "invoke-random-activity-tq"

class InvokeRandomActivityWorkflow:

    @workflow_method(task_queue=TASK_QUEUE)
    async def invoke_activity(self, activity_name: str, args: List[object],
                              task_queue: str, timeout: int) -> object:
        options = ActivityOptions(task_queue=task_queue,
                                  start_to_close_timeout=(timedelta(seconds=timeout)))
        stub = Workflow.new_untyped_activity_stub(activity_options=options)
        return await stub.execute(activity_name, *args)

async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_workflow_implementation_type(InvokeRandomActivityWorkflow)
    factory.start()
    print("Worker started")

    invoker: InvokeRandomActivityWorkflow = client.new_workflow_stub(InvokeRandomActivityWorkflow)
    result = await invoker.invoke_activity("ComposeGreeting", ["Bob"], "golang-activity-tq", 60)
    print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(worker_main())
    loop.run_forever()

3. Compatibility with the Golang and Java SDKs

3.1. Invoking Go Workflows

3.1.1. Golang Workflows

worker/main.go
package main

import (
	"fmt"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
	"log"
)

type Person struct {
	Name string
	Age  int
}

func Greetings(ctx workflow.Context, value Person) error {
	fmt.Println(value)
	return nil
}

func main() {
	serviceClient, err := client.NewClient(client.Options{HostPort: "localhost:7233"})

	if err != nil {
		log.Fatalf("Unable to create client.  Error: %v", err)
	}

	w := worker.New(serviceClient, "golang-workflow-tq", worker.Options{})

	w.RegisterWorkflow(Greetings)

	err = w.Run(worker.InterruptCh())
	if err != nil {
		log.Fatalf("Unable to start worker.  Error: %v", err)

	}
}

3.1.2. Python Invoker

invoke-go.py
import asyncio
import dataclasses
from dataclasses import dataclass

from temporal.api.common.v1 import Payload
from temporal.converter import DefaultDataConverter
from temporal.workflow import workflow_method, WorkflowClient

TASK_QUEUE = "golang-workflow-tq"
NAMESPACE = "default"

@dataclass
class Person:
    name: str
    age: int

# Workflow Interface
class Greetings:
    @workflow_method(task_queue=TASK_QUEUE, name="Greetings")
    async def greetings(self, person: Person) -> str:
        raise NotImplementedError

class CustomDataConverter(DefaultDataConverter):

    def to_payload(self, arg: object) -> Payload:
        if isinstance(arg, Person):
            return super().to_payload(dataclasses.asdict(arg))
        return super().to_payload(arg)

async def main():
    client = WorkflowClient.new_client(namespace=NAMESPACE,
                                       data_converter=CustomDataConverter())
    greetings: Greetings = client.new_workflow_stub(Greetings)
    person = Person(name="Bob", age=30)
    result = await greetings.greetings(person)
    print(result)

if __name__ == '__main__':
    asyncio.run(main())

3.2. Invoke Golang Activity from Python Workflow

Main thing to note here is that the Golang and Python workers listen on different task queues.

3.2.1. Golang Activity Worker

activityworker/main.go
package main

import (
	"context"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/worker"
	"log"
)

func ComposeGreeting(ctx context.Context, name string) (string, error) {
	return "Hello " + name, nil
}

func main() {
	serviceClient, err := client.NewClient(client.Options{HostPort: "localhost:7233"})

	if err != nil {
		log.Fatalf("Unable to create client.  Error: %v", err)
	}

	w := worker.New(serviceClient, "golang-activity-tq", worker.Options{})

	w.RegisterActivity(ComposeGreeting)

	err = w.Run(worker.InterruptCh())
	if err != nil {
		log.Fatalf("Unable to start worker.  Error: %v", err)

	}
}

3.2.2. Python Workflow that Invokes Golang Activity

invoke-go-activity.py
import asyncio
import logging
from datetime import timedelta

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

logging.basicConfig(level=logging.INFO)

NAMESPACE = "default"
TASK_QUEUE = "python-workflow-tq"
GOLANG_TASK_QUEUE = "golang-activity-tq"

class GolangActivity:
    @activity_method(task_queue=GOLANG_TASK_QUEUE, name="ComposeGreeting",
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def compose_greeting(self, name: str) -> str:
        raise NotImplementedError

class GreetingWorkflow:

    def __init__(self):
        self.activities: GolangActivity = Workflow.new_activity_stub(GolangActivity)

    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name) -> str:
        return await self.activities.compose_greeting(name)

async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_workflow_implementation_type(GreetingWorkflow)
    factory.start()
    print("Worker started")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(worker_main())
    loop.run_forever()

3.2.3. Running the Workflow

% tctl workflow run --taskqueue python-workflow-tq --workflow_type GreetingWorkflow::get_greeting --execution_timeout 60 --input '"Bob"'
Running execution:
  Workflow Id : 3598c2af-4337-40e1-ad5e-8ab58e782d66
  Run Id      : 94ab1892-3f8f-4878-b92e-8c90593f2c8d
  Type        : GreetingWorkflow::get_greeting
  Namespace   : default
  Task Queue  : python-workflow-tq
  Args        : ["Bob"]
Progress:
  1, 2021-04-05T01:46:21Z, WorkflowExecutionStarted
  2, 2021-04-05T01:46:21Z, WorkflowTaskScheduled
  3, 2021-04-05T01:46:21Z, WorkflowTaskStarted
  4, 2021-04-05T01:46:22Z, WorkflowTaskCompleted
  5, 2021-04-05T01:46:22Z, ActivityTaskScheduled
  6, 2021-04-05T01:46:22Z, ActivityTaskStarted
  7, 2021-04-05T01:46:22Z, ActivityTaskCompleted
  8, 2021-04-05T01:46:22Z, WorkflowTaskScheduled
  9, 2021-04-05T01:46:22Z, WorkflowTaskStarted
  10, 2021-04-05T01:46:22Z, WorkflowTaskCompleted
  11, 2021-04-05T01:46:22Z, WorkflowExecutionCompleted

Result:
  Run Time: 1 seconds
  Status: COMPLETED
  Output: ["Hello Bob"]

4. Migrating from Airflow

4.1. Example 1

In this section, we’ll be converting an Airflow data pipeline described in this article.

4.1.1. Setup

Install additional python packages
% pip install paramiko
% pip install asyncpg
docker-compose.yaml for the software that we need
version: "3.5"
services:
  sftp:
    image: atmoz/sftp
    volumes:
      - ./upload/:/home/foo/
    ports:
      - "127.0.0.1:2222:22"
    command: foo:pass:1001
  db:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: admin123
      POSTGRES_DB: error_monitoring_job
    ports:
      - "127.0.0.1:5432:5432"
  mailhog:
    image: mailhog/mailhog
    ports:
      - "127.0.0.1:1025:1025"
      - "127.0.0.1:8025:8025"

4.1.2. Converting the individual DAG tasks to activities

4.1.2.1. File Retrieval Code (SFTPOperator)
Airflow code
op = SFTPOperator(task_id=f"download_{file}",
            ssh_conn_id="log_server",
            local_filepath=f"{base_folder}/{file}",
            remote_filepath=f"{remote_path}/{file}",
            operation=SFTPOperation.GET,
            create_intermediate_dirs=True,
            dag=dag)
Temporal code
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def sftp_download(self, session: dict, remote_file: str):
        transport, sftp = None, None
        try:
            transport = paramiko.Transport(("localhost", 2222))
            transport.connect(None, "foo", "pass")
            sftp = paramiko.SFTPClient.from_transport(transport)
            local_file = os.path.join(session["working_directory"], remote_file)
            sftp.get(remote_file, local_file)
        finally:
            if sftp:
                sftp.close()
            if transport:
                transport.close()
4.1.2.2. grep_exception (BashOperator)
Airflow code
bash_command = """
    grep -E 'Exception' --include=\\*.log -rnw '{{ params.base_folder }}' > {{ params.base_folder }}/errors.txt
    ls -l {{ params.base_folder }}/errors.txt && cat {{ params.base_folder }}/errors.txt
"""
grep_exception = BashOperator(task_id="grep_exception",
                        bash_command=bash_command,
                        params={'base_folder': base_folder},
                        dag=dag)
Temporal Code
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def grep_exception(self, session: dict):
        bash_command = """
            grep -E 'Exception' --include=\\*.log -rnw '{0}' > '{0}/errors.txt'
            ls -l {0}/errors.txt && cat {0}/errors.txt
        """.format(session["working_directory"])
        process = await asyncio.create_subprocess_shell(bash_command)
        await process.wait()
        # TODO: error handling
4.1.2.3. create_table (PostgresOperator)
Airflow code
create_table = PostgresOperator(task_id='create_table',
                        sql='''DROP TABLE IF EXISTS {0};
                                CREATE TABLE {0} (
                                id SERIAL PRIMARY KEY,
                                filename VARCHAR (100) NOT NULL,
                                line integer NOT NULL,
                                date VARCHAR (15) NOT NULL,
                                time VARCHAR (15) NOT NULL,
                                session VARCHAR (50),
                                app VARCHAR (50),
                                module VARCHAR (100),
                                error VARCHAR(512)
                            );'''.format(table_name),
                        dag=dag)
Temporal code
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def create_table(self, session: dict):
        sql = '''DROP TABLE IF EXISTS {0};
                                CREATE TABLE {0} (
                                id SERIAL PRIMARY KEY,
                                filename VARCHAR (100) NOT NULL,
                                line integer NOT NULL,
                                date VARCHAR (15) NOT NULL,
                                time VARCHAR (15) NOT NULL,
                                session VARCHAR (50),
                                app VARCHAR (50),
                                module VARCHAR (100),
                                error VARCHAR(512)
                            );'''.format(self.get_table_name())
        async with db_pool.acquire() as conn:
            await conn.execute(sql)
4.1.2.4. parse_log (PythonOperator)
Airflow code
def save_to_database(tablename, records):
    ....

def parse_log(logString):
    ....

def parse_log_file(filepath, tablename):
    ....

parse_log = PythonOperator(task_id='parse_log',
                        python_callable=parse_log_file,
                        op_kwargs={'filepath': f'{base_folder}/errors.txt',
                                   'tablename': f'{table_name}'},
                        dag=dag)
Temporal code
    @staticmethod
    def parse_log(log_string):
        r = r".+\/(?P<file>.+):(?P<line>\d+):\[\[\]\] (?P<date>.+)/(?P<time>\d{2}:\d{2}:\d{2},\d{3}) ERROR ?(?:SessionId : )?(?P<session>.+)? \[(?P<app>\w+)\] .+ (?:Error \:|service Exception) (?P<module>(?=[\w\.-]+ : )[\w\.-]+)?(?: \: )?(?P<errMsg>.+)"
        group = re.match(r, log_string)
        return group.groups()

    @staticmethod
    def get_table_name():
        date_tag = date.today().strftime('%Y%m%d')
        table_name = f'error_logs_{date_tag}'
        return table_name

    @classmethod
    async def save_to_database(cls, records):
        if not records:
            print("Empty record!")
            return
        table_name = cls.get_table_name()
        sql = """INSERT INTO {} (filename, line, date, time, session, app, module, error)
                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)""".format(table_name)
        records = [list(r) for r in records]
        for r in records:
            r[1] = int(r[1])
        async with db_pool.acquire() as conn:
            await conn.executemany(sql, records)
        print(f"  -> {len(records)} records are saved to table: {table_name}.")

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def parse_log_file(self, session: dict):
        filepath = os.path.join(session["working_directory"], "errors.txt")
        print(f'Opening {filepath} ...')
        with open(filepath) as fp:
            records = []
            for line in fp:
                records.append(self.parse_log(line))
            await self.save_to_database(records)
4.1.2.5. gen_reports (PythonOperator)
Airflow code
def gen_error_reports(statfile, logfile, tablename, **kwargs):
    ....

gen_reports = PythonOperator(task_id='gen_reports',
                        python_callable=gen_error_reports,
                        op_kwargs={'statfile': f'{base_folder}/error_stats.csv',
                                   'logfile': f'{base_folder}/error_logs.csv',
                                   'tablename': f'{table_name}'},
                        provide_context=True,
                        dag=dag)
Temporal code
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def gen_reports(self, session: dict):
        table_name = self.get_table_name()
        log_file = os.path.join(session['working_directory'], "error_logs.csv")
        stat_file = os.path.join(session['working_directory'], "error_stats.csv")
        async with db_pool.acquire() as connection:
            await connection.copy_from_query(f"SELECT * from {table_name}", format="csv",
                                             header=True, output=log_file)
            sql = f"SELECT error, count(*) as occurrence FROM {table_name} group by error ORDER BY occurrence DESC"
            await connection.copy_from_query(sql, format="csv", header=True, output=stat_file)
            top_result = await connection.fetchrow(sql)
            return dict(top_result) if top_result else None
4.1.2.6. send_email (PythonOperator)
Airflow code
send_email = EmailOperator(task_id='send_email',
        to='tony.xu@airflow.com',
        subject='Daily report of error log generated',
        html_content=""" <h1>Here is the daily report of error log for {{ ds }}</h1> """,
        files=[f'{base_folder}/error_stats.csv', f'{base_folder}/error_logs.csv'],
        dag=dag)
Temporal code
    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=2))
    async def send_email(self, session: dict):
        with smtplib.SMTP(host="localhost", port=1025) as s:
            msg = MIMEMultipart()
            msg['From'] = "bob@example.com"
            msg['To'] = "alice@example.com"
            msg['Date'] = formatdate(localtime=True)
            today = date.today().isoformat()
            msg['Subject'] = "Errors for " + today
            msg.attach(MIMEText(f"<h1>Here is the daily report of error log for { today }</h1>", "html"))
            working_directory = session['working_directory']
            for f in ["error_logs.csv", "error_stats.csv"]:
                with open(os.path.join(working_directory, f), "rb") as fp:
                    part = MIMEApplication(fp.read(),Name=basename(f))
                part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
                msg.attach(part)
            s.sendmail("bob@example.com", "alice@example.com", msg.as_string())

4.1.3. Convert the DAG to a workflow method

Airflow code
check_threshold = BranchPythonOperator(task_id='check_threshold', python_callable=check_error_threshold, provide_context=True, dag=dag)
dummy_op = DummyOperator(task_id='dummy_op', dag=dag)
dl_tasks >> grep_exception >> create_table >> parse_log >> gen_reports >> check_threshold >> [send_email, dummy_op]
Temporal code
    @workflow_method(task_queue=TASK_QUEUE)
    async def monitor(self, error_threshold=3) -> str:
        self.error_threshold = error_threshold
        for _ in range(RETRY_ATTEMPTS):
            try:
                exception = None
                session = await self.activities.new_session()
                await self.with_session(session)
                break
            except ActivityFailureException as ex:
                # In the case of a timeout ex.get_cause() will be a
                # ActivityTaskTimeoutException
                self.logger.error("Error: %s", str(ex))
                exception = ex
        if exception:
            raise exception

    async def with_session(self, session):
        self.activities: MonitorErrorsActivities = \
            Workflow.new_activity_stub(MonitorErrorsActivities)
        activity_options = ActivityOptions(task_queue=session["host_task_queue"])
        retry_parameters = RetryParameters(maximum_attempts=1)
        session_activities = Workflow.new_activity_stub(MonitorErrorsActivities,
                                                        retry_parameters=retry_parameters,
                                                        activity_options=activity_options)
        futures = [
            Async.function(session_activities.sftp_download, session, "a.log"),
            Async.function(session_activities.sftp_download, session, "b.log"),
            Async.function(session_activities.sftp_download, session, "c.log"),
            Async.function(session_activities.sftp_download, session, "d.log"),
        ]
        await Async.all_of(futures)
        await session_activities.grep_exception(session)
        await session_activities.create_table(session)
        await session_activities.parse_log_file(session)
        top_result = await session_activities.gen_reports(session)
        if top_result and top_result["occurrence"] >= self.error_threshold:
            await session_activities.send_email(session)

4.1.4. Complete Source Code

import asyncio
import logging
import os
import re
import socket
import tempfile
from datetime import timedelta, date
import smtplib
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from os.path import basename

import asyncpg
import paramiko

from temporal.activity_method import activity_method, ActivityOptions, RetryParameters
from temporal.async_activity import Async
from temporal.exceptions import ActivityFailureException
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, WorkflowClient, Workflow

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "MonitorErrorsTaskQueue"
HOST_TASK_QUEUE = TASK_QUEUE + "-%d@%s" % (os.getpid(), socket.gethostname())
NAMESPACE = "default"
DSN = 'postgres://admin:admin123@127.0.0.1/error_monitoring_job'
RETRY_ATTEMPTS = 3
db_pool = None

class MonitorErrorsActivities:
    def __init__(self, task_queue):
        self.task_queue = task_queue

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def new_session(self) -> dict:
        working_directory = tempfile.mkdtemp()
        return {
            "working_directory": working_directory,
            "host_task_queue": HOST_TASK_QUEUE
        }

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def sftp_download(self, session: dict, remote_file: str):
        transport, sftp = None, None
        try:
            transport = paramiko.Transport(("localhost", 2222))
            transport.connect(None, "foo", "pass")
            sftp = paramiko.SFTPClient.from_transport(transport)
            local_file = os.path.join(session["working_directory"], remote_file)
            sftp.get(remote_file, local_file)
        finally:
            if sftp:
                sftp.close()
            if transport:
                transport.close()

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def grep_exception(self, session: dict):
        bash_command = """
            grep -E 'Exception' --include=\\*.log -rnw '{0}' > '{0}/errors.txt'
            ls -l {0}/errors.txt && cat {0}/errors.txt
        """.format(session["working_directory"])
        process = await asyncio.create_subprocess_shell(bash_command)
        await process.wait()
        # TODO: error handling

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def create_table(self, session: dict):
        sql = '''DROP TABLE IF EXISTS {0};
                                CREATE TABLE {0} (
                                id SERIAL PRIMARY KEY,
                                filename VARCHAR (100) NOT NULL,
                                line integer NOT NULL,
                                date VARCHAR (15) NOT NULL,
                                time VARCHAR (15) NOT NULL,
                                session VARCHAR (50),
                                app VARCHAR (50),
                                module VARCHAR (100),
                                error VARCHAR(512)
                            );'''.format(self.get_table_name())
        async with db_pool.acquire() as conn:
            await conn.execute(sql)

    @staticmethod
    def parse_log(log_string):
        r = r".+\/(?P<file>.+):(?P<line>\d+):\[\[\]\] (?P<date>.+)/(?P<time>\d{2}:\d{2}:\d{2},\d{3}) ERROR ?(?:SessionId : )?(?P<session>.+)? \[(?P<app>\w+)\] .+ (?:Error \:|service Exception) (?P<module>(?=[\w\.-]+ : )[\w\.-]+)?(?: \: )?(?P<errMsg>.+)"
        group = re.match(r, log_string)
        return group.groups()

    @staticmethod
    def get_table_name():
        date_tag = date.today().strftime('%Y%m%d')
        table_name = f'error_logs_{date_tag}'
        return table_name

    @classmethod
    async def save_to_database(cls, records):
        if not records:
            print("Empty record!")
            return
        table_name = cls.get_table_name()
        sql = """INSERT INTO {} (filename, line, date, time, session, app, module, error)
                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)""".format(table_name)
        records = [list(r) for r in records]
        for r in records:
            r[1] = int(r[1])
        async with db_pool.acquire() as conn:
            await conn.executemany(sql, records)
        print(f"  -> {len(records)} records are saved to table: {table_name}.")

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def parse_log_file(self, session: dict):
        filepath = os.path.join(session["working_directory"], "errors.txt")
        print(f'Opening {filepath} ...')
        with open(filepath) as fp:
            records = []
            for line in fp:
                records.append(self.parse_log(line))
            await self.save_to_database(records)

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=60))
    async def gen_reports(self, session: dict):
        table_name = self.get_table_name()
        log_file = os.path.join(session['working_directory'], "error_logs.csv")
        stat_file = os.path.join(session['working_directory'], "error_stats.csv")
        async with db_pool.acquire() as connection:
            await connection.copy_from_query(f"SELECT * from {table_name}", format="csv",
                                             header=True, output=log_file)
            sql = f"SELECT error, count(*) as occurrence FROM {table_name} group by error ORDER BY occurrence DESC"
            await connection.copy_from_query(sql, format="csv", header=True, output=stat_file)
            top_result = await connection.fetchrow(sql)
            return dict(top_result) if top_result else None

    @activity_method(task_queue=TASK_QUEUE,
                     schedule_to_close_timeout=timedelta(seconds=2))
    async def send_email(self, session: dict):
        with smtplib.SMTP(host="localhost", port=1025) as s:
            msg = MIMEMultipart()
            msg['From'] = "bob@example.com"
            msg['To'] = "alice@example.com"
            msg['Date'] = formatdate(localtime=True)
            today = date.today().isoformat()
            msg['Subject'] = "Errors for " + today
            msg.attach(MIMEText(f"<h1>Here is the daily report of error log for { today }</h1>", "html"))
            working_directory = session['working_directory']
            for f in ["error_logs.csv", "error_stats.csv"]:
                with open(os.path.join(working_directory, f), "rb") as fp:
                    part = MIMEApplication(fp.read(),Name=basename(f))
                part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
                msg.attach(part)
            s.sendmail("bob@example.com", "alice@example.com", msg.as_string())

class MonitorErrorsWorkflow:
    def __init__(self):
        self.activities: MonitorErrorsActivities = \
            Workflow.new_activity_stub(MonitorErrorsActivities)
        self.error_threshold = 0
        self.logger = Workflow.get_logger("MonitorErrorsWorkflow")

    @workflow_method(task_queue=TASK_QUEUE)
    async def monitor(self, error_threshold=3) -> str:
        self.error_threshold = error_threshold
        for _ in range(RETRY_ATTEMPTS):
            try:
                exception = None
                session = await self.activities.new_session()
                await self.with_session(session)
                break
            except ActivityFailureException as ex:
                # In the case of a timeout ex.get_cause() will be a
                # ActivityTaskTimeoutException
                self.logger.error("Error: %s", str(ex))
                exception = ex
        if exception:
            raise exception

    async def with_session(self, session):
        self.activities: MonitorErrorsActivities = \
            Workflow.new_activity_stub(MonitorErrorsActivities)
        activity_options = ActivityOptions(task_queue=session["host_task_queue"])
        retry_parameters = RetryParameters(maximum_attempts=1)
        session_activities = Workflow.new_activity_stub(MonitorErrorsActivities,
                                                        retry_parameters=retry_parameters,
                                                        activity_options=activity_options)
        futures = [
            Async.function(session_activities.sftp_download, session, "a.log"),
            Async.function(session_activities.sftp_download, session, "b.log"),
            Async.function(session_activities.sftp_download, session, "c.log"),
            Async.function(session_activities.sftp_download, session, "d.log"),
        ]
        await Async.all_of(futures)
        await session_activities.grep_exception(session)
        await session_activities.create_table(session)
        await session_activities.parse_log_file(session)
        top_result = await session_activities.gen_reports(session)
        if top_result and top_result["occurrence"] >= self.error_threshold:
            await session_activities.send_email(session)

async def worker_main():
    global db_pool
    db_pool = await asyncpg.create_pool(DSN)

    monitor_error_activities = MonitorErrorsActivities(task_queue=HOST_TASK_QUEUE)
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)

    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(monitor_error_activities)
    worker.register_workflow_implementation_type(MonitorErrorsWorkflow)

    worker = factory.new_worker(HOST_TASK_QUEUE)
    worker.num_activity_tasks = 1
    worker.register_activities_implementation(monitor_error_activities)

    factory.start()
    print("Worker started")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(worker_main())
    loop.run_forever()

4.1.5. Invoking the workflow

% tctl workflow run --workflow_type "MonitorErrorsWorkflow::monitor" --taskqueue "MonitorErrorsTaskQueue" --execution_timeout 600
Running execution:
  Workflow Id : eaf2c1ca-39cb-43e6-b491-5aa372873a2d
  Run Id      : a527fed0-dbeb-4ec9-91f1-1bec91a3f73b
  Type        : MonitorErrorsWorkflow::monitor
  Namespace   : default
  Task Queue  : MonitorErrorsTaskQueue
  Args        : []
Progress:
  1, 2021-03-14T19:10:48Z, WorkflowExecutionStarted
  2, 2021-03-14T19:10:48Z, WorkflowTaskScheduled
  3, 2021-03-14T19:10:48Z, WorkflowTaskStarted
  4, 2021-03-14T19:10:48Z, WorkflowTaskCompleted
  5, 2021-03-14T19:10:48Z, ActivityTaskScheduled
  6, 2021-03-14T19:10:48Z, ActivityTaskStarted
  7, 2021-03-14T19:10:48Z, ActivityTaskCompleted
  8, 2021-03-14T19:10:48Z, WorkflowTaskScheduled
  9, 2021-03-14T19:10:48Z, WorkflowTaskStarted
  10, 2021-03-14T19:10:48Z, WorkflowTaskCompleted
  11, 2021-03-14T19:10:48Z, ActivityTaskScheduled
  12, 2021-03-14T19:10:48Z, ActivityTaskScheduled
  13, 2021-03-14T19:10:48Z, ActivityTaskScheduled
  14, 2021-03-14T19:10:48Z, ActivityTaskScheduled
  15, 2021-03-14T19:10:49Z, ActivityTaskStarted
  16, 2021-03-14T19:10:49Z, ActivityTaskCompleted
  17, 2021-03-14T19:10:49Z, WorkflowTaskScheduled
  18, 2021-03-14T19:10:49Z, WorkflowTaskStarted
  19, 2021-03-14T19:10:49Z, WorkflowTaskCompleted
  20, 2021-03-14T19:10:49Z, ActivityTaskStarted
  21, 2021-03-14T19:10:49Z, ActivityTaskCompleted
  22, 2021-03-14T19:10:49Z, WorkflowTaskScheduled
  23, 2021-03-14T19:10:49Z, WorkflowTaskStarted
  24, 2021-03-14T19:10:49Z, WorkflowTaskCompleted
  25, 2021-03-14T19:10:50Z, ActivityTaskStarted
  26, 2021-03-14T19:10:51Z, ActivityTaskCompleted
  27, 2021-03-14T19:10:51Z, WorkflowTaskScheduled
  28, 2021-03-14T19:10:51Z, WorkflowTaskStarted
  29, 2021-03-14T19:10:51Z, WorkflowTaskCompleted
  30, 2021-03-14T19:10:51Z, ActivityTaskStarted
  31, 2021-03-14T19:10:51Z, ActivityTaskCompleted
  32, 2021-03-14T19:10:51Z, WorkflowTaskScheduled
  33, 2021-03-14T19:10:51Z, WorkflowTaskStarted
  34, 2021-03-14T19:10:51Z, WorkflowTaskCompleted
  35, 2021-03-14T19:10:51Z, ActivityTaskScheduled
  36, 2021-03-14T19:10:51Z, ActivityTaskStarted
  37, 2021-03-14T19:10:51Z, ActivityTaskCompleted
  38, 2021-03-14T19:10:51Z, WorkflowTaskScheduled
  39, 2021-03-14T19:10:52Z, WorkflowTaskStarted
  40, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
  41, 2021-03-14T19:10:52Z, ActivityTaskScheduled
  42, 2021-03-14T19:10:52Z, ActivityTaskStarted
  43, 2021-03-14T19:10:52Z, ActivityTaskCompleted
  44, 2021-03-14T19:10:52Z, WorkflowTaskScheduled
  45, 2021-03-14T19:10:52Z, WorkflowTaskStarted
  46, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
  47, 2021-03-14T19:10:52Z, ActivityTaskScheduled
  48, 2021-03-14T19:10:52Z, ActivityTaskStarted
  49, 2021-03-14T19:10:52Z, ActivityTaskCompleted
  50, 2021-03-14T19:10:52Z, WorkflowTaskScheduled
  51, 2021-03-14T19:10:52Z, WorkflowTaskStarted
  52, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
  53, 2021-03-14T19:10:52Z, ActivityTaskScheduled
  54, 2021-03-14T19:10:52Z, ActivityTaskStarted
  55, 2021-03-14T19:10:52Z, ActivityTaskCompleted
  56, 2021-03-14T19:10:52Z, WorkflowTaskScheduled
  57, 2021-03-14T19:10:52Z, WorkflowTaskStarted
  58, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
  59, 2021-03-14T19:10:52Z, ActivityTaskScheduled
  60, 2021-03-14T19:10:52Z, ActivityTaskStarted
  61, 2021-03-14T19:10:52Z, ActivityTaskCompleted
  62, 2021-03-14T19:10:52Z, WorkflowTaskScheduled
  63, 2021-03-14T19:10:52Z, WorkflowTaskStarted
  64, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
  65, 2021-03-14T19:10:52Z, WorkflowExecutionCompleted

Result:
  Run Time: 5 seconds
  Status: COMPLETED
  Output: [nil]

5. Others

5.1. Create a namespace

import asyncio
from datetime import timedelta

from temporal.api.workflowservice.v1 import RegisterNamespaceRequest
from temporal.workflow import WorkflowClient

async def main():
    client = WorkflowClient.new_client()
    request = RegisterNamespaceRequest()
    request.name = "the-new-namespace"
    # MaxWorkflowRetentionPeriod is 30 days
    request.workflow_execution_retention_period = timedelta(days=25)
    await client.service.register_namespace(request=request)
    client.close()

if __name__ == '__main__':
    asyncio.run(main())

5.2. Querying Workflow History

query_workflow_history.py
import asyncio
import itertools
from typing import List

from temporal.api.common.v1 import WorkflowExecution
from temporal.api.enums.v1 import EventType
from temporal.api.history.v1 import HistoryEvent
from temporal.api.workflowservice.v1 import GetWorkflowExecutionHistoryRequest
from temporal.workflow import WorkflowClient

NAMESPACE = "default"
WORKFLOW_ID = "querying-workflow-history"

async def main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    request = GetWorkflowExecutionHistoryRequest()
    request.execution = WorkflowExecution()
    request.namespace = NAMESPACE
    request.execution.workflow_id = WORKFLOW_ID
    responses = []
    while True:
        r = await client.service.get_workflow_execution_history(request=request)
        responses.append(r)
        if not r.next_page_token:
            break
        request.next_page_token = r.next_page_token
    events: List[HistoryEvent] = list(itertools.chain(*[e.history.events for e in responses]))
    print(f"Number of responses: {len(responses)}")
    print(f"Number of events: {len(events)}")
    for i, e in enumerate(events):
        print(i+1, EventType(e.event_type))
    client.close()

if __name__ == '__main__':
    asyncio.run(main())