© Mohammed Firdaus
This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.
1. Getting Started
1.1. Hello World
Workflow and activity worker:
import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=60))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str) -> str:
return greeting + " " + name
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
async def get_greeting(self, name):
return await self.greeting_activities.compose_greeting("Hello!", name)
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop() # (1)
asyncio.ensure_future(worker_main())
loop.run_forever()
Start the worker:
% python helloworld.py Worker started INFO:temporal.activity_loop:Activity task worker started: 53801@onepointzero.local INFO:temporal.decision_loop:Decision task worker started: 53801@onepointzero.local
1.1.1. Invoking the workflow using tctl
% tctl --ns default workflow run workflow --taskqueue HelloWorld --workflow_type 'GreetingWorkflow::get_greeting' --input '"Bob"' --execution_timeout 3000
Running execution:
Workflow Id : 27a938ca-910b-480f-bf88-35b1470c2314
Run Id : 0433b954-f17f-4175-a750-e2766b623b11
Type : GreetingWorkflow::get_greeting
Namespace : default
Task Queue : HelloWorld
Args : [Bob]
Progress:
1, 2021-02-27T19:54:56Z, WorkflowExecutionStarted
2, 2021-02-27T19:54:56Z, WorkflowTaskScheduled
3, 2021-02-27T19:54:56Z, WorkflowTaskStarted
4, 2021-02-27T19:54:56Z, WorkflowTaskCompleted
5, 2021-02-27T19:54:56Z, ActivityTaskScheduled
6, 2021-02-27T19:54:56Z, ActivityTaskStarted
7, 2021-02-27T19:54:56Z, ActivityTaskCompleted
8, 2021-02-27T19:54:56Z, WorkflowTaskScheduled
9, 2021-02-27T19:54:56Z, WorkflowTaskStarted
10, 2021-02-27T19:54:56Z, WorkflowTaskCompleted
11, 2021-02-27T19:54:56Z, WorkflowExecutionCompleted
Result:
Run Time: 1 seconds
Status: COMPLETED
Output: [Hello! Bob]
1.1.2. Programmatically starting the workflow
import asyncio
from temporal.workflow import workflow_method, WorkflowClient
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
async def main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
result = await greeting_workflow.get_greeting("Bob")
print("Workflow returned:", result)
if __name__ == '__main__':
asyncio.run(main())
Running:
% python helloworld-invoker.py
Workflow returned: Hello! Bob
1.2. Signals and Queries
Signals and queries allow us to interact with a running workflow. Let’s modify the greeting example so that:
-
we can update the salutation
-
retrieve the greeting based on the current salutation set
1.2.1. Defining signal and query methods in workflows
import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient, signal_method, \
query_method
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str) -> str:
return greeting + " " + name
# Workflow Interface
class GreetingWorkflow:
@signal_method
async def terminate(self):
raise NotImplementedError
@signal_method
async def update_greeting(self, salutation: str):
raise NotImplementedError
@query_method
async def get_greeting(self):
raise NotImplementedError
@workflow_method(task_queue=TASK_QUEUE)
async def greeting_workflow(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = \
Workflow.new_activity_stub(GreetingActivities)
self.greeting = ""
self._terminate = False
async def terminate(self):
self._terminate = True
async def update_greeting(self, name: str):
self.greeting = await self.greeting_activities.compose_greeting("Hello!", name)
async def get_greeting(self):
return self.greeting
async def greeting_workflow(self, name):
self.greeting = await self.greeting_activities.compose_greeting("Hello!", name)
await Workflow.await_till(lambda: self._terminate)
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(GreetingActivitiesImpl(),
"GreetingActivities")
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main())
loop.run_forever()
% python helloworld-signal-query-worker.py
Worker started
INFO:temporal.activity_loop:Activity task worker started: 56226@onepointzero.local
INFO:temporal.decision_loop:Decision task worker started: 56226@onepointzero.local
% tctl --ns default workflow run workflow --taskqueue HelloWorld --workflow_type 'GreetingWorkflow::greeting_workflow' --input '"Bob"' --execution_timeout 3000 --workflow_id "greeting-workflow-1"
Running execution:
Workflow Id : greeting-workflow-1
Run Id : 518590b2-6e23-4f5b-8319-9565f6ed9178
Type : GreetingWorkflow::greeting_workflow
Namespace : default
Task Queue : HelloWorld
Args : [Bob]
Progress:
1, 2021-02-28T09:20:15Z, WorkflowExecutionStarted
2, 2021-02-28T09:20:15Z, WorkflowTaskScheduled
3, 2021-02-28T09:20:15Z, WorkflowTaskStarted
4, 2021-02-28T09:20:15Z, WorkflowTaskCompleted
5, 2021-02-28T09:20:15Z, ActivityTaskScheduled
6, 2021-02-28T09:20:15Z, ActivityTaskStarted
7, 2021-02-28T09:20:15Z, ActivityTaskCompleted
8, 2021-02-28T09:20:15Z, WorkflowTaskScheduled
9, 2021-02-28T09:20:15Z, WorkflowTaskStarted
10, 2021-02-28T09:20:15Z, WorkflowTaskCompleted
Time elapse: 8s
Unlike with the previous workflow, this workflow doesn’t exit immediately. That is expected.
1.2.2. Invoking the query and signal via tctl
In another terminal window run the following:
% tctl --ns default workflow query --workflow_id "greeting-workflow-1" --query_type "GreetingWorkflow::get_greeting"
Query result:
[Hello! Bob]
% tctl --ns default workflow signal --workflow_id "greeting-workflow-1" --name "GreetingWorkflow::update_greeting" --input '"Alice"'
Signal workflow succeeded.
% tctl --ns default workflow query --workflow_id "greeting-workflow-1" --query_type "GreetingWorkflow::get_greeting"
Query result:
[Hello! Alice]
1.2.3. Invoking the query and signal programmatically
import asyncio
from temporal.workflow import workflow_method, WorkflowClient, signal_method, query_method
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Workflow Interface
class GreetingWorkflow:
@signal_method
async def terminate(self):
raise NotImplementedError
@signal_method
async def update_greeting(self, salutation: str):
raise NotImplementedError
@query_method
async def get_greeting(self):
raise NotImplementedError
@workflow_method(task_queue=TASK_QUEUE)
async def greeting_workflow(self, name: str) -> str:
raise NotImplementedError
async def main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
context = await WorkflowClient.start(greeting_workflow.greeting_workflow, "Bob")
await asyncio.sleep(5) # Give workflow some time to run (1)
print("Invoking query get_greeting():", await greeting_workflow.get_greeting())
print("Invoking signal update_greeting('Alice')")
await greeting_workflow.update_greeting('Alice')
await asyncio.sleep(5) # Give signal some time to run (1)
print("Invoking query get_greeting():", await greeting_workflow.get_greeting())
await greeting_workflow.terminate()
await client.wait_for_close(context)
if __name__ == '__main__':
asyncio.run(main())
-
Both the workflow and signal methods will invoke activities. Since this is done asynchronously (delegated to a TaskQueue) it’s possible that the query is invoked while the activity is being executed which will cause to retrieve a soon to be stale greeting.
Output:
% python helloworld-signal-query-invoker.py
Invoking query get_greeting(): Hello! Bob
Invoking signal update_greeting('Alice')
Invoking query get_greeting(): Hello! Alice
1.3. Accessing External Resources
Activities are coroutines so be sure to use asyncio clients for the resources that you’re accessing, or you will risk blocking the event loop, e.g.:
-
MongoDB: motor
-
HTTP: aiohttp
-
Redis: aioredis
-
PostgreSQL: aiopg etc..
For this example we are going to be using MongoDB.
% docker run -e MONGO_INITDB_ROOT_USERNAME=mongoadmin -e MONGO_INITDB_ROOT_PASSWORD=secret -p 27017:27017 mongo
% pip install motor
1.3.1. Workflow and activity that utilizes an external resource (MongoDB)
import asyncio
import logging
from datetime import timedelta
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorDatabase
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
import motor.motor_asyncio
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def create_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def compose_greeting(self, greeting_id) -> str:
raise NotImplementedError
class GreetingActivitiesImpl:
def __init__(self, db):
self.db = db
async def create_greeting(self, greeting: str, name: str) -> str:
result = await self.db.greetings.insert_one({
"greeting": greeting,
"name": name
})
return str(result.inserted_id)
async def compose_greeting(self, greeting_id) -> str:
document = await self.db.greetings.find_one(ObjectId(greeting_id))
return document["greeting"] + " " + document["name"]
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def create_and_retrieve_greeting(self, name) -> str:
raise NotImplementedError
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = \
Workflow.new_activity_stub(GreetingActivities)
async def create_and_retrieve_greeting(self, name) -> str:
greeting_id = await self.greeting_activities.create_greeting("Hello!", name)
greeting_document = await self.greeting_activities.compose_greeting(greeting_id)
return greeting_document
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
motor_client = motor.motor_asyncio.AsyncIOMotorClient(
'mongodb://mongoadmin:secret@localhost:27017/')
db = motor_client.greeting_db
activities_impl = GreetingActivitiesImpl(db)
worker.register_activities_implementation(activities_impl, "GreetingActivities")
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(worker_main())
loop.run_forever()
% python helloworld-activities-external-resources-worker.py
Worker started
INFO:temporal.activity_loop:Activity task worker started: 61100@onepointzero.local
INFO:temporal.decision_loop:Decision task worker started: 61100@onepointzero.local
tctl --ns default workflow run --workflow_type "GreetingWorkflow::create_and_retrieve_greeting" --taskqueue "HelloWorld" --execution_timeout 300 --input '"Bob"'
Running execution:
Workflow Id : 0e78ca2b-03e4-47ff-b93c-537721488690
Run Id : 115bf8a8-6b10-44fc-891d-8aebb99d8b2d
Type : GreetingWorkflow::create_and_retrieve_greeting
Namespace : default
Task Queue : HelloWorld
Args : [Bob]
Progress:
1, 2021-02-28T16:21:31Z, WorkflowExecutionStarted
2, 2021-02-28T16:21:31Z, WorkflowTaskScheduled
3, 2021-02-28T16:21:31Z, WorkflowTaskStarted
4, 2021-02-28T16:21:31Z, WorkflowTaskCompleted
5, 2021-02-28T16:21:31Z, ActivityTaskScheduled
6, 2021-02-28T16:21:31Z, ActivityTaskStarted
7, 2021-02-28T16:21:31Z, ActivityTaskCompleted
8, 2021-02-28T16:21:31Z, WorkflowTaskScheduled
9, 2021-02-28T16:21:31Z, WorkflowTaskStarted
10, 2021-02-28T16:21:31Z, WorkflowTaskCompleted
11, 2021-02-28T16:21:31Z, ActivityTaskScheduled
12, 2021-02-28T16:21:31Z, ActivityTaskStarted
13, 2021-02-28T16:21:31Z, ActivityTaskCompleted
14, 2021-02-28T16:21:31Z, WorkflowTaskScheduled
15, 2021-02-28T16:21:31Z, WorkflowTaskStarted
16, 2021-02-28T16:21:31Z, WorkflowTaskCompleted
17, 2021-02-28T16:21:31Z, WorkflowExecutionCompleted
Result:
Run Time: 1 seconds
Status: COMPLETED
Output: [Hello! Bob]
1.4. Invoking Workflows at Regular Intervals
Workflow can be triggered at regular intervals using cron workflows.
1.4.1. Cron workflow example
We’ll reuse the helloworld example from earlier:
import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=60))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str) -> str:
return greeting + " " + name
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
async def get_greeting(self, name):
return await self.greeting_activities.compose_greeting("Hello!", name)
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop() # (1)
asyncio.ensure_future(worker_main())
loop.run_forever()
1.4.2. Starting the cron workflow from the command line
% tctl --ns default workflow start workflow --taskqueue HelloWorld --workflow_type 'GreetingWorkflow::get_greeting' --input '"Bob"' --execution_timeout '315360000' --cron "*/5 * * * *" --workflow_id "greeting-cron-workflow"
Started Workflow Id: greeting-cron-workflow, run Id: 4c5cc4e3-62aa-40f0-bf0a-be4fbc5e3711
1.4.3. Starting the cron workflow from Python
import asyncio
from datetime import timedelta
from temporal.api.enums.v1 import WorkflowIdReusePolicy
from temporal.workflow import workflow_method, WorkflowClient, WorkflowOptions
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
async def main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
workflow_options: WorkflowOptions = WorkflowOptions()
# by default workflow_options.workflow_execution_timeout is set to 0 so the cron workflow
# will run forever.
workflow_options.cron_schedule = "*/5 * * * *"
workflow_options.workflow_id = "greeting-cron-workflow"
workflow_options.workflow_id_reuse_policy = WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow,
workflow_options=workflow_options)
await WorkflowClient.start(greeting_workflow.get_greeting, "Bob")
print("Workflow started")
if __name__ == '__main__':
asyncio.run(main())
1.4.4. Terminating the cron workflow
% tctl workflow terminate --workflow_id "greeting-cron-workflow"
Terminate workflow succeeded.
2. Advanced Activities
2.1. Handling Errors
2.1.1. Detecting activity timeouts
import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method, RetryParameters
from temporal.exceptions import ActivityFailureException, ActivityTaskTimeoutException
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "ActivityTimeoutTaskQueue"
NAMESPACE = "default"
class Activities:
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=10))
async def sleep(self) -> str:
await asyncio.sleep(20)
class MainWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def main(self) -> str:
activities = Workflow.new_activity_stub(Activities)
try:
await activities.sleep()
except ActivityFailureException as ex:
cause = ex.get_cause()
if isinstance(cause, ActivityTaskTimeoutException):
return "timeout"
else:
raise ex
return "did not timeout"
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(Activities())
worker.register_workflow_implementation_type(MainWorkflow)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(worker_main())
loop.run_forever()
$ tctl workflow run --taskqueue "ActivityTimeoutTaskQueue" --workflow_type "MainWorkflow::main" --execution_timeout 600
Running execution:
Workflow Id : 7e26c623-40fa-413e-9cae-05ea285b1b07
Run Id : 4669cf23-0088-4687-b50e-d14b98276a77
Type : MainWorkflow::main
Namespace : default
Task Queue : ActivityTimeoutTaskQueue
Args : []
Progress:
1, 2021-03-16T19:02:11Z, WorkflowExecutionStarted
2, 2021-03-16T19:02:11Z, WorkflowTaskScheduled
3, 2021-03-16T19:02:11Z, WorkflowTaskStarted
4, 2021-03-16T19:02:11Z, WorkflowTaskCompleted
5, 2021-03-16T19:02:11Z, ActivityTaskScheduled
6, 2021-03-16T19:02:11Z, ActivityTaskStarted
7, 2021-03-16T19:02:21Z, ActivityTaskTimedOut
8, 2021-03-16T19:02:21Z, WorkflowTaskScheduled
9, 2021-03-16T19:02:21Z, WorkflowTaskStarted
10, 2021-03-16T19:02:21Z, WorkflowTaskCompleted
11, 2021-03-16T19:02:21Z, WorkflowExecutionCompleted
Result:
Run Time: 11 seconds
Status: COMPLETED
Output: ["timeout"]
2.2. File Processing Workflows
2.2.1. Worker
import asyncio
import logging
import os
import socket
from datetime import timedelta
from temporal.activity_method import activity_method, ActivityOptions
from temporal.exceptions import ActivityFailureException
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "FileProcessing"
HOST_TASK_QUEUE = TASK_QUEUE + "-%d@%s" % (os.getpid(), socket.gethostname())
RETRY_ATTEMPTS = 3
NAMESPACE = "default"
class StoreActivities:
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def download(self, url: str) -> dict:
raise NotImplementedError
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def process(self, input_file_name: str) -> str:
raise NotImplementedError
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def upload(self, local_file_name: str, url: str):
raise NotImplementedError
class StoreActivitiesImpl(StoreActivities):
def __init__(self, task_queue: str):
self.task_queue = task_queue
async def download(self, url: str) -> dict:
print(f"Downloading {url}... to downloaded-file.txt")
# ....... actual code for downloading url goes here
return {
"file_name": "downloaded-file.txt",
"host_task_queue": self.task_queue
}
async def process(self, input_file_name: str) -> str:
print(f"Processing {input_file_name}... and generating processed-file.txt")
# ....... actual code for for processing input_file_name goes here
return "processed-file.txt"
async def upload(self, local_file_name: str, url: str):
print(f"Uploading {local_file_name} to {url}...")
# ...... actual code for uploading local_file_name goes here
class FileProcessingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def process_file(self, source: str, destination: str) -> str:
raise NotImplementedError
class FileProcessWorkflowImpl(FileProcessingWorkflow):
def __init__(self):
self.default_task_queue_store: StoreActivities = Workflow.new_activity_stub(
StoreActivities)
self.logger = Workflow.get_logger("FileProcessWorkflowImpl")
async def process_file(self, source: str, destination: str) -> str:
exception = None
for _ in range(RETRY_ATTEMPTS):
try:
exception = None
await self.process_file_impl(source, destination)
break
except ActivityFailureException as ex:
self.logger.error("Error: %s", str(ex))
exception = ex
if exception:
raise exception
return "done"
async def process_file_impl(self, source: str, destination: str):
downloaded: dict = await self.default_task_queue_store.download(source)
host_specific_store: StoreActivities
activity_options = ActivityOptions(task_queue=downloaded["host_task_queue"])
host_specific_store = Workflow.new_activity_stub(StoreActivities,
activity_options=activity_options)
processed: str = await host_specific_store.process(downloaded["file_name"])
await host_specific_store.upload(processed, destination)
async def worker_main():
store_activities = StoreActivitiesImpl(task_queue=HOST_TASK_QUEUE)
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(store_activities, "StoreActivities")
worker.register_workflow_implementation_type(FileProcessWorkflowImpl)
worker = factory.new_worker(HOST_TASK_QUEUE)
worker.register_activities_implementation(store_activities, "StoreActivities")
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(worker_main())
loop.run_forever()
2.2.2. Invoking the workflow
When you inspect the workflow history in temporal web you should see that the
StoreActivities::download
was invoked via the FileProcessing
task queue and that
the other two activities were invoked via the host specific task queue which in my case
was FileProcessing-15201@onepointzero.local
.
% tctl workflow run --workflow_type "FileProcessingWorkflow::process_file" --taskqueue "FileProcessing" --execution_timeout 120 --input '"http://example.com/source.txt"' --input '"http://example.com/dest.txt"'
Running execution:
Workflow Id : 9ea8e69c-b064-45fe-9152-c211d1f7f9c4
Run Id : 2eb957aa-51b9-4b08-ad3c-b533a1dd7a55
Type : FileProcessingWorkflow::process_file
Namespace : default
Task Queue : FileProcessing
Args : [http://example.com/source.txt,
: http://example.com/dest.txt]
Progress:
1, 2021-03-11T16:22:21Z, WorkflowExecutionStarted
2, 2021-03-11T16:22:21Z, WorkflowTaskScheduled
3, 2021-03-11T16:22:21Z, WorkflowTaskStarted
4, 2021-03-11T16:22:21Z, WorkflowTaskCompleted
5, 2021-03-11T16:22:21Z, ActivityTaskScheduled
6, 2021-03-11T16:22:21Z, ActivityTaskStarted
7, 2021-03-11T16:22:21Z, ActivityTaskCompleted
8, 2021-03-11T16:22:21Z, WorkflowTaskScheduled
9, 2021-03-11T16:22:21Z, WorkflowTaskStarted
10, 2021-03-11T16:22:21Z, WorkflowTaskCompleted
11, 2021-03-11T16:22:21Z, ActivityTaskScheduled
12, 2021-03-11T16:22:21Z, ActivityTaskStarted
13, 2021-03-11T16:22:21Z, ActivityTaskCompleted
14, 2021-03-11T16:22:21Z, WorkflowTaskScheduled
15, 2021-03-11T16:22:21Z, WorkflowTaskStarted
16, 2021-03-11T16:22:21Z, WorkflowTaskCompleted
17, 2021-03-11T16:22:21Z, ActivityTaskScheduled
18, 2021-03-11T16:22:21Z, ActivityTaskStarted
19, 2021-03-11T16:22:21Z, ActivityTaskCompleted
20, 2021-03-11T16:22:21Z, WorkflowTaskScheduled
21, 2021-03-11T16:22:21Z, WorkflowTaskStarted
22, 2021-03-11T16:22:21Z, WorkflowTaskCompleted
23, 2021-03-11T16:22:21Z, WorkflowExecutionCompleted
Result:
Run Time: 1 seconds
Status: COMPLETED
Output: [done]
2.3. Untyped Activity Stubs
Untyped activity stubs allow activities to be invoked without the activity interface definition.
import asyncio
import logging
from datetime import timedelta
from typing import List
from temporal.activity_method import ActivityOptions
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
NAMESPACE = "default"
TASK_QUEUE = "invoke-random-activity-tq"
class InvokeRandomActivityWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def invoke_activity(self, activity_name: str, args: List[object],
task_queue: str, timeout: int) -> object:
options = ActivityOptions(task_queue=task_queue,
start_to_close_timeout=(timedelta(seconds=timeout)))
stub = Workflow.new_untyped_activity_stub(activity_options=options)
return await stub.execute(activity_name, *args)
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_workflow_implementation_type(InvokeRandomActivityWorkflow)
factory.start()
print("Worker started")
invoker: InvokeRandomActivityWorkflow = client.new_workflow_stub(InvokeRandomActivityWorkflow)
result = await invoker.invoke_activity("ComposeGreeting", ["Bob"], "golang-activity-tq", 60)
print(result)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main())
loop.run_forever()
3. Compatibility with the Golang and Java SDKs
3.1. Invoking Go Workflows
3.1.1. Golang Workflows
package main
import (
"fmt"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"log"
)
type Person struct {
Name string
Age int
}
func Greetings(ctx workflow.Context, value Person) error {
fmt.Println(value)
return nil
}
func main() {
serviceClient, err := client.NewClient(client.Options{HostPort: "localhost:7233"})
if err != nil {
log.Fatalf("Unable to create client. Error: %v", err)
}
w := worker.New(serviceClient, "golang-workflow-tq", worker.Options{})
w.RegisterWorkflow(Greetings)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalf("Unable to start worker. Error: %v", err)
}
}
3.1.2. Python Invoker
import asyncio
import dataclasses
from dataclasses import dataclass
from temporal.api.common.v1 import Payload
from temporal.converter import DefaultDataConverter
from temporal.workflow import workflow_method, WorkflowClient
TASK_QUEUE = "golang-workflow-tq"
NAMESPACE = "default"
@dataclass
class Person:
name: str
age: int
# Workflow Interface
class Greetings:
@workflow_method(task_queue=TASK_QUEUE, name="Greetings")
async def greetings(self, person: Person) -> str:
raise NotImplementedError
class CustomDataConverter(DefaultDataConverter):
def to_payload(self, arg: object) -> Payload:
if isinstance(arg, Person):
return super().to_payload(dataclasses.asdict(arg))
return super().to_payload(arg)
async def main():
client = WorkflowClient.new_client(namespace=NAMESPACE,
data_converter=CustomDataConverter())
greetings: Greetings = client.new_workflow_stub(Greetings)
person = Person(name="Bob", age=30)
result = await greetings.greetings(person)
print(result)
if __name__ == '__main__':
asyncio.run(main())
3.2. Invoke Golang Activity from Python Workflow
Main thing to note here is that the Golang and Python workers listen on different task queues.
3.2.1. Golang Activity Worker
package main
import (
"context"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"log"
)
func ComposeGreeting(ctx context.Context, name string) (string, error) {
return "Hello " + name, nil
}
func main() {
serviceClient, err := client.NewClient(client.Options{HostPort: "localhost:7233"})
if err != nil {
log.Fatalf("Unable to create client. Error: %v", err)
}
w := worker.New(serviceClient, "golang-activity-tq", worker.Options{})
w.RegisterActivity(ComposeGreeting)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalf("Unable to start worker. Error: %v", err)
}
}
3.2.2. Python Workflow that Invokes Golang Activity
import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
NAMESPACE = "default"
TASK_QUEUE = "python-workflow-tq"
GOLANG_TASK_QUEUE = "golang-activity-tq"
class GolangActivity:
@activity_method(task_queue=GOLANG_TASK_QUEUE, name="ComposeGreeting",
schedule_to_close_timeout=timedelta(seconds=60))
async def compose_greeting(self, name: str) -> str:
raise NotImplementedError
class GreetingWorkflow:
def __init__(self):
self.activities: GolangActivity = Workflow.new_activity_stub(GolangActivity)
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name) -> str:
return await self.activities.compose_greeting(name)
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_workflow_implementation_type(GreetingWorkflow)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main())
loop.run_forever()
3.2.3. Running the Workflow
% tctl workflow run --taskqueue python-workflow-tq --workflow_type GreetingWorkflow::get_greeting --execution_timeout 60 --input '"Bob"'
Running execution:
Workflow Id : 3598c2af-4337-40e1-ad5e-8ab58e782d66
Run Id : 94ab1892-3f8f-4878-b92e-8c90593f2c8d
Type : GreetingWorkflow::get_greeting
Namespace : default
Task Queue : python-workflow-tq
Args : ["Bob"]
Progress:
1, 2021-04-05T01:46:21Z, WorkflowExecutionStarted
2, 2021-04-05T01:46:21Z, WorkflowTaskScheduled
3, 2021-04-05T01:46:21Z, WorkflowTaskStarted
4, 2021-04-05T01:46:22Z, WorkflowTaskCompleted
5, 2021-04-05T01:46:22Z, ActivityTaskScheduled
6, 2021-04-05T01:46:22Z, ActivityTaskStarted
7, 2021-04-05T01:46:22Z, ActivityTaskCompleted
8, 2021-04-05T01:46:22Z, WorkflowTaskScheduled
9, 2021-04-05T01:46:22Z, WorkflowTaskStarted
10, 2021-04-05T01:46:22Z, WorkflowTaskCompleted
11, 2021-04-05T01:46:22Z, WorkflowExecutionCompleted
Result:
Run Time: 1 seconds
Status: COMPLETED
Output: ["Hello Bob"]
4. Migrating from Airflow
4.1. Example 1
In this section, we’ll be converting an Airflow data pipeline described in this article.
4.1.1. Setup
% pip install paramiko
% pip install asyncpg
version: "3.5" services: sftp: image: atmoz/sftp volumes: - ./upload/:/home/foo/ ports: - "127.0.0.1:2222:22" command: foo:pass:1001 db: image: postgres restart: always environment: POSTGRES_USER: admin POSTGRES_PASSWORD: admin123 POSTGRES_DB: error_monitoring_job ports: - "127.0.0.1:5432:5432" mailhog: image: mailhog/mailhog ports: - "127.0.0.1:1025:1025" - "127.0.0.1:8025:8025"
4.1.2. Converting the individual DAG tasks to activities
4.1.2.1. File Retrieval Code (SFTPOperator)
op = SFTPOperator(task_id=f"download_{file}",
ssh_conn_id="log_server",
local_filepath=f"{base_folder}/{file}",
remote_filepath=f"{remote_path}/{file}",
operation=SFTPOperation.GET,
create_intermediate_dirs=True,
dag=dag)
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def sftp_download(self, session: dict, remote_file: str):
transport, sftp = None, None
try:
transport = paramiko.Transport(("localhost", 2222))
transport.connect(None, "foo", "pass")
sftp = paramiko.SFTPClient.from_transport(transport)
local_file = os.path.join(session["working_directory"], remote_file)
sftp.get(remote_file, local_file)
finally:
if sftp:
sftp.close()
if transport:
transport.close()
4.1.2.2. grep_exception (BashOperator)
bash_command = """
grep -E 'Exception' --include=\\*.log -rnw '{{ params.base_folder }}' > {{ params.base_folder }}/errors.txt
ls -l {{ params.base_folder }}/errors.txt && cat {{ params.base_folder }}/errors.txt
"""
grep_exception = BashOperator(task_id="grep_exception",
bash_command=bash_command,
params={'base_folder': base_folder},
dag=dag)
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def grep_exception(self, session: dict):
bash_command = """
grep -E 'Exception' --include=\\*.log -rnw '{0}' > '{0}/errors.txt'
ls -l {0}/errors.txt && cat {0}/errors.txt
""".format(session["working_directory"])
process = await asyncio.create_subprocess_shell(bash_command)
await process.wait()
# TODO: error handling
4.1.2.3. create_table (PostgresOperator)
create_table = PostgresOperator(task_id='create_table',
sql='''DROP TABLE IF EXISTS {0};
CREATE TABLE {0} (
id SERIAL PRIMARY KEY,
filename VARCHAR (100) NOT NULL,
line integer NOT NULL,
date VARCHAR (15) NOT NULL,
time VARCHAR (15) NOT NULL,
session VARCHAR (50),
app VARCHAR (50),
module VARCHAR (100),
error VARCHAR(512)
);'''.format(table_name),
dag=dag)
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def create_table(self, session: dict):
sql = '''DROP TABLE IF EXISTS {0};
CREATE TABLE {0} (
id SERIAL PRIMARY KEY,
filename VARCHAR (100) NOT NULL,
line integer NOT NULL,
date VARCHAR (15) NOT NULL,
time VARCHAR (15) NOT NULL,
session VARCHAR (50),
app VARCHAR (50),
module VARCHAR (100),
error VARCHAR(512)
);'''.format(self.get_table_name())
async with db_pool.acquire() as conn:
await conn.execute(sql)
4.1.2.4. parse_log (PythonOperator)
def save_to_database(tablename, records):
....
def parse_log(logString):
....
def parse_log_file(filepath, tablename):
....
parse_log = PythonOperator(task_id='parse_log',
python_callable=parse_log_file,
op_kwargs={'filepath': f'{base_folder}/errors.txt',
'tablename': f'{table_name}'},
dag=dag)
@staticmethod
def parse_log(log_string):
r = r".+\/(?P<file>.+):(?P<line>\d+):\[\[\]\] (?P<date>.+)/(?P<time>\d{2}:\d{2}:\d{2},\d{3}) ERROR ?(?:SessionId : )?(?P<session>.+)? \[(?P<app>\w+)\] .+ (?:Error \:|service Exception) (?P<module>(?=[\w\.-]+ : )[\w\.-]+)?(?: \: )?(?P<errMsg>.+)"
group = re.match(r, log_string)
return group.groups()
@staticmethod
def get_table_name():
date_tag = date.today().strftime('%Y%m%d')
table_name = f'error_logs_{date_tag}'
return table_name
@classmethod
async def save_to_database(cls, records):
if not records:
print("Empty record!")
return
table_name = cls.get_table_name()
sql = """INSERT INTO {} (filename, line, date, time, session, app, module, error)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)""".format(table_name)
records = [list(r) for r in records]
for r in records:
r[1] = int(r[1])
async with db_pool.acquire() as conn:
await conn.executemany(sql, records)
print(f" -> {len(records)} records are saved to table: {table_name}.")
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def parse_log_file(self, session: dict):
filepath = os.path.join(session["working_directory"], "errors.txt")
print(f'Opening {filepath} ...')
with open(filepath) as fp:
records = []
for line in fp:
records.append(self.parse_log(line))
await self.save_to_database(records)
4.1.2.5. gen_reports (PythonOperator)
def gen_error_reports(statfile, logfile, tablename, **kwargs):
....
gen_reports = PythonOperator(task_id='gen_reports',
python_callable=gen_error_reports,
op_kwargs={'statfile': f'{base_folder}/error_stats.csv',
'logfile': f'{base_folder}/error_logs.csv',
'tablename': f'{table_name}'},
provide_context=True,
dag=dag)
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def gen_reports(self, session: dict):
table_name = self.get_table_name()
log_file = os.path.join(session['working_directory'], "error_logs.csv")
stat_file = os.path.join(session['working_directory'], "error_stats.csv")
async with db_pool.acquire() as connection:
await connection.copy_from_query(f"SELECT * from {table_name}", format="csv",
header=True, output=log_file)
sql = f"SELECT error, count(*) as occurrence FROM {table_name} group by error ORDER BY occurrence DESC"
await connection.copy_from_query(sql, format="csv", header=True, output=stat_file)
top_result = await connection.fetchrow(sql)
return dict(top_result) if top_result else None
4.1.2.6. send_email (PythonOperator)
send_email = EmailOperator(task_id='send_email',
to='tony.xu@airflow.com',
subject='Daily report of error log generated',
html_content=""" <h1>Here is the daily report of error log for {{ ds }}</h1> """,
files=[f'{base_folder}/error_stats.csv', f'{base_folder}/error_logs.csv'],
dag=dag)
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=2))
async def send_email(self, session: dict):
with smtplib.SMTP(host="localhost", port=1025) as s:
msg = MIMEMultipart()
msg['From'] = "bob@example.com"
msg['To'] = "alice@example.com"
msg['Date'] = formatdate(localtime=True)
today = date.today().isoformat()
msg['Subject'] = "Errors for " + today
msg.attach(MIMEText(f"<h1>Here is the daily report of error log for { today }</h1>", "html"))
working_directory = session['working_directory']
for f in ["error_logs.csv", "error_stats.csv"]:
with open(os.path.join(working_directory, f), "rb") as fp:
part = MIMEApplication(fp.read(),Name=basename(f))
part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
msg.attach(part)
s.sendmail("bob@example.com", "alice@example.com", msg.as_string())
4.1.3. Convert the DAG to a workflow method
check_threshold = BranchPythonOperator(task_id='check_threshold', python_callable=check_error_threshold, provide_context=True, dag=dag)
dummy_op = DummyOperator(task_id='dummy_op', dag=dag)
dl_tasks >> grep_exception >> create_table >> parse_log >> gen_reports >> check_threshold >> [send_email, dummy_op]
@workflow_method(task_queue=TASK_QUEUE)
async def monitor(self, error_threshold=3) -> str:
self.error_threshold = error_threshold
for _ in range(RETRY_ATTEMPTS):
try:
exception = None
session = await self.activities.new_session()
await self.with_session(session)
break
except ActivityFailureException as ex:
# In the case of a timeout ex.get_cause() will be a
# ActivityTaskTimeoutException
self.logger.error("Error: %s", str(ex))
exception = ex
if exception:
raise exception
async def with_session(self, session):
self.activities: MonitorErrorsActivities = \
Workflow.new_activity_stub(MonitorErrorsActivities)
activity_options = ActivityOptions(task_queue=session["host_task_queue"])
retry_parameters = RetryParameters(maximum_attempts=1)
session_activities = Workflow.new_activity_stub(MonitorErrorsActivities,
retry_parameters=retry_parameters,
activity_options=activity_options)
futures = [
Async.function(session_activities.sftp_download, session, "a.log"),
Async.function(session_activities.sftp_download, session, "b.log"),
Async.function(session_activities.sftp_download, session, "c.log"),
Async.function(session_activities.sftp_download, session, "d.log"),
]
await Async.all_of(futures)
await session_activities.grep_exception(session)
await session_activities.create_table(session)
await session_activities.parse_log_file(session)
top_result = await session_activities.gen_reports(session)
if top_result and top_result["occurrence"] >= self.error_threshold:
await session_activities.send_email(session)
4.1.4. Complete Source Code
import asyncio
import logging
import os
import re
import socket
import tempfile
from datetime import timedelta, date
import smtplib
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from os.path import basename
import asyncpg
import paramiko
from temporal.activity_method import activity_method, ActivityOptions, RetryParameters
from temporal.async_activity import Async
from temporal.exceptions import ActivityFailureException
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, WorkflowClient, Workflow
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "MonitorErrorsTaskQueue"
HOST_TASK_QUEUE = TASK_QUEUE + "-%d@%s" % (os.getpid(), socket.gethostname())
NAMESPACE = "default"
DSN = 'postgres://admin:admin123@127.0.0.1/error_monitoring_job'
RETRY_ATTEMPTS = 3
db_pool = None
class MonitorErrorsActivities:
def __init__(self, task_queue):
self.task_queue = task_queue
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def new_session(self) -> dict:
working_directory = tempfile.mkdtemp()
return {
"working_directory": working_directory,
"host_task_queue": HOST_TASK_QUEUE
}
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def sftp_download(self, session: dict, remote_file: str):
transport, sftp = None, None
try:
transport = paramiko.Transport(("localhost", 2222))
transport.connect(None, "foo", "pass")
sftp = paramiko.SFTPClient.from_transport(transport)
local_file = os.path.join(session["working_directory"], remote_file)
sftp.get(remote_file, local_file)
finally:
if sftp:
sftp.close()
if transport:
transport.close()
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def grep_exception(self, session: dict):
bash_command = """
grep -E 'Exception' --include=\\*.log -rnw '{0}' > '{0}/errors.txt'
ls -l {0}/errors.txt && cat {0}/errors.txt
""".format(session["working_directory"])
process = await asyncio.create_subprocess_shell(bash_command)
await process.wait()
# TODO: error handling
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def create_table(self, session: dict):
sql = '''DROP TABLE IF EXISTS {0};
CREATE TABLE {0} (
id SERIAL PRIMARY KEY,
filename VARCHAR (100) NOT NULL,
line integer NOT NULL,
date VARCHAR (15) NOT NULL,
time VARCHAR (15) NOT NULL,
session VARCHAR (50),
app VARCHAR (50),
module VARCHAR (100),
error VARCHAR(512)
);'''.format(self.get_table_name())
async with db_pool.acquire() as conn:
await conn.execute(sql)
@staticmethod
def parse_log(log_string):
r = r".+\/(?P<file>.+):(?P<line>\d+):\[\[\]\] (?P<date>.+)/(?P<time>\d{2}:\d{2}:\d{2},\d{3}) ERROR ?(?:SessionId : )?(?P<session>.+)? \[(?P<app>\w+)\] .+ (?:Error \:|service Exception) (?P<module>(?=[\w\.-]+ : )[\w\.-]+)?(?: \: )?(?P<errMsg>.+)"
group = re.match(r, log_string)
return group.groups()
@staticmethod
def get_table_name():
date_tag = date.today().strftime('%Y%m%d')
table_name = f'error_logs_{date_tag}'
return table_name
@classmethod
async def save_to_database(cls, records):
if not records:
print("Empty record!")
return
table_name = cls.get_table_name()
sql = """INSERT INTO {} (filename, line, date, time, session, app, module, error)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)""".format(table_name)
records = [list(r) for r in records]
for r in records:
r[1] = int(r[1])
async with db_pool.acquire() as conn:
await conn.executemany(sql, records)
print(f" -> {len(records)} records are saved to table: {table_name}.")
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def parse_log_file(self, session: dict):
filepath = os.path.join(session["working_directory"], "errors.txt")
print(f'Opening {filepath} ...')
with open(filepath) as fp:
records = []
for line in fp:
records.append(self.parse_log(line))
await self.save_to_database(records)
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=60))
async def gen_reports(self, session: dict):
table_name = self.get_table_name()
log_file = os.path.join(session['working_directory'], "error_logs.csv")
stat_file = os.path.join(session['working_directory'], "error_stats.csv")
async with db_pool.acquire() as connection:
await connection.copy_from_query(f"SELECT * from {table_name}", format="csv",
header=True, output=log_file)
sql = f"SELECT error, count(*) as occurrence FROM {table_name} group by error ORDER BY occurrence DESC"
await connection.copy_from_query(sql, format="csv", header=True, output=stat_file)
top_result = await connection.fetchrow(sql)
return dict(top_result) if top_result else None
@activity_method(task_queue=TASK_QUEUE,
schedule_to_close_timeout=timedelta(seconds=2))
async def send_email(self, session: dict):
with smtplib.SMTP(host="localhost", port=1025) as s:
msg = MIMEMultipart()
msg['From'] = "bob@example.com"
msg['To'] = "alice@example.com"
msg['Date'] = formatdate(localtime=True)
today = date.today().isoformat()
msg['Subject'] = "Errors for " + today
msg.attach(MIMEText(f"<h1>Here is the daily report of error log for { today }</h1>", "html"))
working_directory = session['working_directory']
for f in ["error_logs.csv", "error_stats.csv"]:
with open(os.path.join(working_directory, f), "rb") as fp:
part = MIMEApplication(fp.read(),Name=basename(f))
part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
msg.attach(part)
s.sendmail("bob@example.com", "alice@example.com", msg.as_string())
class MonitorErrorsWorkflow:
def __init__(self):
self.activities: MonitorErrorsActivities = \
Workflow.new_activity_stub(MonitorErrorsActivities)
self.error_threshold = 0
self.logger = Workflow.get_logger("MonitorErrorsWorkflow")
@workflow_method(task_queue=TASK_QUEUE)
async def monitor(self, error_threshold=3) -> str:
self.error_threshold = error_threshold
for _ in range(RETRY_ATTEMPTS):
try:
exception = None
session = await self.activities.new_session()
await self.with_session(session)
break
except ActivityFailureException as ex:
# In the case of a timeout ex.get_cause() will be a
# ActivityTaskTimeoutException
self.logger.error("Error: %s", str(ex))
exception = ex
if exception:
raise exception
async def with_session(self, session):
self.activities: MonitorErrorsActivities = \
Workflow.new_activity_stub(MonitorErrorsActivities)
activity_options = ActivityOptions(task_queue=session["host_task_queue"])
retry_parameters = RetryParameters(maximum_attempts=1)
session_activities = Workflow.new_activity_stub(MonitorErrorsActivities,
retry_parameters=retry_parameters,
activity_options=activity_options)
futures = [
Async.function(session_activities.sftp_download, session, "a.log"),
Async.function(session_activities.sftp_download, session, "b.log"),
Async.function(session_activities.sftp_download, session, "c.log"),
Async.function(session_activities.sftp_download, session, "d.log"),
]
await Async.all_of(futures)
await session_activities.grep_exception(session)
await session_activities.create_table(session)
await session_activities.parse_log_file(session)
top_result = await session_activities.gen_reports(session)
if top_result and top_result["occurrence"] >= self.error_threshold:
await session_activities.send_email(session)
async def worker_main():
global db_pool
db_pool = await asyncpg.create_pool(DSN)
monitor_error_activities = MonitorErrorsActivities(task_queue=HOST_TASK_QUEUE)
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(monitor_error_activities)
worker.register_workflow_implementation_type(MonitorErrorsWorkflow)
worker = factory.new_worker(HOST_TASK_QUEUE)
worker.num_activity_tasks = 1
worker.register_activities_implementation(monitor_error_activities)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(worker_main())
loop.run_forever()
4.1.5. Invoking the workflow
% tctl workflow run --workflow_type "MonitorErrorsWorkflow::monitor" --taskqueue "MonitorErrorsTaskQueue" --execution_timeout 600
Running execution:
Workflow Id : eaf2c1ca-39cb-43e6-b491-5aa372873a2d
Run Id : a527fed0-dbeb-4ec9-91f1-1bec91a3f73b
Type : MonitorErrorsWorkflow::monitor
Namespace : default
Task Queue : MonitorErrorsTaskQueue
Args : []
Progress:
1, 2021-03-14T19:10:48Z, WorkflowExecutionStarted
2, 2021-03-14T19:10:48Z, WorkflowTaskScheduled
3, 2021-03-14T19:10:48Z, WorkflowTaskStarted
4, 2021-03-14T19:10:48Z, WorkflowTaskCompleted
5, 2021-03-14T19:10:48Z, ActivityTaskScheduled
6, 2021-03-14T19:10:48Z, ActivityTaskStarted
7, 2021-03-14T19:10:48Z, ActivityTaskCompleted
8, 2021-03-14T19:10:48Z, WorkflowTaskScheduled
9, 2021-03-14T19:10:48Z, WorkflowTaskStarted
10, 2021-03-14T19:10:48Z, WorkflowTaskCompleted
11, 2021-03-14T19:10:48Z, ActivityTaskScheduled
12, 2021-03-14T19:10:48Z, ActivityTaskScheduled
13, 2021-03-14T19:10:48Z, ActivityTaskScheduled
14, 2021-03-14T19:10:48Z, ActivityTaskScheduled
15, 2021-03-14T19:10:49Z, ActivityTaskStarted
16, 2021-03-14T19:10:49Z, ActivityTaskCompleted
17, 2021-03-14T19:10:49Z, WorkflowTaskScheduled
18, 2021-03-14T19:10:49Z, WorkflowTaskStarted
19, 2021-03-14T19:10:49Z, WorkflowTaskCompleted
20, 2021-03-14T19:10:49Z, ActivityTaskStarted
21, 2021-03-14T19:10:49Z, ActivityTaskCompleted
22, 2021-03-14T19:10:49Z, WorkflowTaskScheduled
23, 2021-03-14T19:10:49Z, WorkflowTaskStarted
24, 2021-03-14T19:10:49Z, WorkflowTaskCompleted
25, 2021-03-14T19:10:50Z, ActivityTaskStarted
26, 2021-03-14T19:10:51Z, ActivityTaskCompleted
27, 2021-03-14T19:10:51Z, WorkflowTaskScheduled
28, 2021-03-14T19:10:51Z, WorkflowTaskStarted
29, 2021-03-14T19:10:51Z, WorkflowTaskCompleted
30, 2021-03-14T19:10:51Z, ActivityTaskStarted
31, 2021-03-14T19:10:51Z, ActivityTaskCompleted
32, 2021-03-14T19:10:51Z, WorkflowTaskScheduled
33, 2021-03-14T19:10:51Z, WorkflowTaskStarted
34, 2021-03-14T19:10:51Z, WorkflowTaskCompleted
35, 2021-03-14T19:10:51Z, ActivityTaskScheduled
36, 2021-03-14T19:10:51Z, ActivityTaskStarted
37, 2021-03-14T19:10:51Z, ActivityTaskCompleted
38, 2021-03-14T19:10:51Z, WorkflowTaskScheduled
39, 2021-03-14T19:10:52Z, WorkflowTaskStarted
40, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
41, 2021-03-14T19:10:52Z, ActivityTaskScheduled
42, 2021-03-14T19:10:52Z, ActivityTaskStarted
43, 2021-03-14T19:10:52Z, ActivityTaskCompleted
44, 2021-03-14T19:10:52Z, WorkflowTaskScheduled
45, 2021-03-14T19:10:52Z, WorkflowTaskStarted
46, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
47, 2021-03-14T19:10:52Z, ActivityTaskScheduled
48, 2021-03-14T19:10:52Z, ActivityTaskStarted
49, 2021-03-14T19:10:52Z, ActivityTaskCompleted
50, 2021-03-14T19:10:52Z, WorkflowTaskScheduled
51, 2021-03-14T19:10:52Z, WorkflowTaskStarted
52, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
53, 2021-03-14T19:10:52Z, ActivityTaskScheduled
54, 2021-03-14T19:10:52Z, ActivityTaskStarted
55, 2021-03-14T19:10:52Z, ActivityTaskCompleted
56, 2021-03-14T19:10:52Z, WorkflowTaskScheduled
57, 2021-03-14T19:10:52Z, WorkflowTaskStarted
58, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
59, 2021-03-14T19:10:52Z, ActivityTaskScheduled
60, 2021-03-14T19:10:52Z, ActivityTaskStarted
61, 2021-03-14T19:10:52Z, ActivityTaskCompleted
62, 2021-03-14T19:10:52Z, WorkflowTaskScheduled
63, 2021-03-14T19:10:52Z, WorkflowTaskStarted
64, 2021-03-14T19:10:52Z, WorkflowTaskCompleted
65, 2021-03-14T19:10:52Z, WorkflowExecutionCompleted
Result:
Run Time: 5 seconds
Status: COMPLETED
Output: [nil]
5. Others
5.1. Create a namespace
import asyncio
from datetime import timedelta
from temporal.api.workflowservice.v1 import RegisterNamespaceRequest
from temporal.workflow import WorkflowClient
async def main():
client = WorkflowClient.new_client()
request = RegisterNamespaceRequest()
request.name = "the-new-namespace"
# MaxWorkflowRetentionPeriod is 30 days
request.workflow_execution_retention_period = timedelta(days=25)
await client.service.register_namespace(request=request)
client.close()
if __name__ == '__main__':
asyncio.run(main())
5.2. Querying Workflow History
import asyncio
import itertools
from typing import List
from temporal.api.common.v1 import WorkflowExecution
from temporal.api.enums.v1 import EventType
from temporal.api.history.v1 import HistoryEvent
from temporal.api.workflowservice.v1 import GetWorkflowExecutionHistoryRequest
from temporal.workflow import WorkflowClient
NAMESPACE = "default"
WORKFLOW_ID = "querying-workflow-history"
async def main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
request = GetWorkflowExecutionHistoryRequest()
request.execution = WorkflowExecution()
request.namespace = NAMESPACE
request.execution.workflow_id = WORKFLOW_ID
responses = []
while True:
r = await client.service.get_workflow_execution_history(request=request)
responses.append(r)
if not r.next_page_token:
break
request.next_page_token = r.next_page_token
events: List[HistoryEvent] = list(itertools.chain(*[e.history.events for e in responses]))
print(f"Number of responses: {len(responses)}")
print(f"Number of events: {len(events)}")
for i, e in enumerate(events):
print(i+1, EventType(e.event_type))
client.close()
if __name__ == '__main__':
asyncio.run(main())