Long Running Business Logic in Plain Old Code Part 1
Usually, when programmers are tasked with programming a long running piece of business logic (e.g. subscriptions, gamification, marketing campaigns, any customer journey) they will reach for familiar tools such as cron, message queues and manual state management with their db of choice.
In this series of posts I want to show an alternative approach - implementing long running business logic as a single long running function. Traditionally, we cannot implement anything long running as a single function because processes and machines fail and the function’s state is in volatile memory. However, what if there was a tool that allowed us to code in such a way that we could treat memory as persistent and reliable (Smalltalk programmers might be familiar with this concept)
The long running business that we will implement in this blog series is a loyalty system. The tool that we will be covering for implementing these long running functions1 is Temporal.
A Loyalty System
I chose to create a loyalty system to showcase Temporal in action. The business rules of our loyalty system are (no prizes for guessing which company I ripped this off from):
Membership Tiers
Customers enrolled into the loyalty system are called members. An email should be sent when a customer is first enrolled in this system. An email should also be sent when they are for whatever reason disenrolled from the loyalty system.
There are 4 membership tiers - Member, Silver, Gold and Platinum.
An email should be sent whenever the customer is downgraded or upgraded.
Loyalty Points
Points are earned by performing transactions in the system. Each 1 dollar spent earns the member 1 point. A multiplier is also applied based on the membership tier - Member (1x), Silver (1.5x), Gold (2.5x) and Platinum (3x).
All points have the same expiry period i.e extending the expiry period, extends the expiry period of all points. Upon making a transaction, the expiry period of all points are extended by 3 full calendar months e.g. earning any points in January extends the points expiry period to the end of April (3 complete months). Conducting a transaction in March will then extend it until the end of June.
Therefore, as long as the customer makes a transaction every 3 months the points will not expire.
Tier Upgrades
Accumulating enough new points within a 6 month period entitles members to upgrade to the next membership tier. The 6 month periods are January 1 to June 30 and July 1 to December 31.
Multiple upgrades can occur during the same 6-month period. Earning a membership tier during one 6-month period earns them the benefits of that tier during the current 6-month period and the next 6-month period.
At the end of the 6-month period, members are either upgraded or downgraded based on the points accumulated during that period.
Prerequisites
You need the following installed on your computer before continuing:
- docker
- docker-compose
- IntelliJ
- gradle
- access to some type of SMTP server. I’m using Mailtrap
Uber Cadence and Temporal
Uber Cadence is a workflow engine developed at Uber to help their engineers build long running business logic. Temporal is a fork of Cadence that is being actively developed by Temporal Technologies Inc. In this blog post, we’ll be covering Temporal but the equivalent code for Cadence will be very similar.
Temporal consists of a server, client SDKs, a command line tool and a web UI for viewing workflow state.
Temporal has two official SDKs - a Golang and a Java SDK. I’ll be implementing the loyalty system using the Java SDK and using the Kotlin programming language.
Behind the scenes, Temporal requires a database. The most battle tested option is Cassandra. MySQL is supported as well.
Temporal Concepts
There are a handful of concepts that we will need to familiarize ourselves with:
Workflow Methods - Workflow methods contain the business logic to be implemented. It should contain logic for coordinating/orchestration but should not “affect” the outside world directly. Generally speaking, the workflow ends when the workflow method “returns”.
Activities and Activity methods - Activity methods are how the workflow methods can change the outside world. Any type of logic that could possibly have a side effect should be done in an activity method e.g. API calls to other systems, database queries etc..
Signal Method - While the workflow is running it be can be triggered by external code when certain events occur using signal methods.
Query Methods - Query methods allow external code to pull information about the current state of the workfllow.
Tasklist - Tasklist is the term for the queueing abstraction within Temporal. Workflows and activities are scheduled for execution by workers by placing tasks on a tasklist.
Workers - The Temporal server does not run any application code directly.. That is the job of workers. Workers will listen on a task list for workflows and activities to execute.
Project Setup
Installing Temporal
The easiest way to get up and running is with docker-compose:
$ curl -O https://raw.githubusercontent.com/temporalio/temporal/master/docker/docker-compose.yml
$ docker-compose up
The docker compose file takes care of starting the Temporal service, Cassandra and the Temporal web UI.
At this point, your machine will have:
- Temporal listening on port 7233
- Cassandra running on port 9042
- Temporal web listening on port 8088
Gradle Project Setup
Create your Kotlin project using Gradle and making the following changes to your build.gradle
:
plugins {
....
id 'application'
}
repositories {
....
maven {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
}
dependencies {
....
compile group: 'io.temporal', name: 'temporal-sdk', version: '0.21.1'
compile group: 'commons-configuration', name: 'commons-configuration', version: '1.9'
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
}
application {
mainClassName = 'WorkerKt'
}
logback.xml
Create a file in src/main/resource/logback.xml
:
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Hello Loyalty
The Workflow Interface
// loyalty-system.kt
import io.temporal.workflow.WorkflowInterface
import io.temporal.workflow.WorkflowMethod
@WorkflowInterface
interface LoyaltyWorkflow {
@WorkflowMethod
fun enroll()
}
The WorkflowInterface
is the contract between the workflow logic and the outside world. WorkflowMethod
s are invoked to start workflow instances.
The enroll
method will be invoked by external code whenever a customer should be enrolled in the loyalty programme.
Implementing the Workflow Interface
This is where Temporal is a little magical. The bulk of the business logic will be implemented in the enroll()
method.
The customer will be enrolled into the loyalty programme for many years. Traditionally, we would need to store the state of the customer, with regards to this process, in the database and read and upate it each time an event happens. This crutch is necessary because there is no abstraction for the programmer to leverage that allows them to treat the computer’s memory as persistent.
However, with Temporal, we can implement this logic as just a plain method. As long as the method does not return, the customer is still enrolled in the loyalty programme.
In later parts of this blog series we will implement the actual business logic of the loyalty programme in this method and we will see how Temporal allows us to keep the method running for as long as we need.
// loyalty-system.kt
class LoyaltyWorkflowImpl : LoyaltyWorkflow {
override fun enroll() {
println("Customer enrolled")
// Business logic for members go here
println("Customer disenrolled")
}
}
Creating a worker
Workflows require a worker process to host them. The worker’s job is to connect to the Temporal service and then to listen on tasklists for workflows or activities to execute.
// worker.kt
import io.temporal.client.WorkflowClient
import io.temporal.serviceclient.WorkflowServiceStubs
import io.temporal.worker.WorkerFactory
fun main() {
val service = WorkflowServiceStubs.newInstance()
val client = WorkflowClient.newInstance(service)
val factory = WorkerFactory.newInstance(client)
val worker = factory.newWorker("LoyaltyTaskList");
worker.registerWorkflowImplementationTypes(LoyaltyWorkflowImpl::class.java)
factory.start()
}
Start the worker in IntelliJ or in Gradle using:
$ ./gradlew run
Executing the Workflow from the Command Line
Temporal includes a command line tool that can be used for (among other things) starting workflows. To start a workflow, we need to specify:
- the
workflow_type
- which by default follows this format -WorkflowInterfaceName_workflowMethodName
. In our case it would be -LoyaltyWorkflow_enroll
. - the
tasklist
-LoyaltyTaskList
execution_timeout
- how long the workflow method should be allowed to run until it is considered timed out. We will be using the value 400000000 which is around 12.5+ years in seconds.
$ docker run --network=host --rm temporalio/tctl:0.21.1 workflow run --tasklist
LoyaltyTaskList --workflow_type LoyaltyWorkflow_enroll --execution_timeout 400000000
This should yield the output:
Running execution:
Workflow Id : 2735037b-6f92-4947-ae0b-b5d994fbd47f
Run Id : 9b35e43e-366c-4f8e-a848-a6baa4ea81eb
Type : LoyaltyWorkflow_enroll
Namespace : default
Task List : LoyaltyTaskList
Args :
Progress:
1, 2020-04-19T06:08:12Z, WorkflowExecutionStarted
2, 2020-04-19T06:08:12Z, DecisionTaskScheduled
3, 2020-04-19T06:08:12Z, DecisionTaskStarted
4, 2020-04-19T06:08:12Z, DecisionTaskCompleted
5, 2020-04-19T06:08:12Z, WorkflowExecutionCompleted
Result:
Run Time: 1 seconds
Status: COMPLETED
Output:
The workflow run
command starts the workflow and waits for it to finish. workflow start
will start the workflow and exit immediately
Workflows with arguments
Right now we can’t do much because we don’t have much information about the customer that just enrolled. To get that information we need to modify our workflow to accept arguments.
Lets create a data class for the arguments to the workflow
data class CustomerInfo(var id: String,
var email: String)
Then update the signature of the @WorkflowMethod
@WorkflowInterface
interface LoyaltyWorkflow {
@WorkflowMethod
fun enroll(customer: CustomerInfo)
}
Update the implementation of the workflow method as well.
class LoyaltyWorkflowImpl : LoyaltyWorkflow {
override fun enroll(customer: CustomerInfo) {
println("Customer enrolled: id=${customer.id} email=${customer.email}")
// Business logic for members goes here
println("Customer disenrolled: id=${customer.id} email=${customer.email}")
}
}
Note: It’s actually not a good idea to use println() here but we’ll get to that in a future post.
Running the Workflow with Arguments
docker run --network=host --rm temporalio/tctl:0.21.1 workflow run --tasklist LoyaltyTaskList --workflow_type LoyaltyWorkflow_enroll --execution_timeout 400000000 --input '{"id": "71659d9e-51c0-4f59-9b4c-74436402ed14 ", "email": "bob@example.com"}'
In the console of the worker you should see:
Customer enrolled: id=71659d9e-51c0-4f59-9b4c-74436402ed14 email=bob@example.com
Customer disenrolled: id=71659d9e-51c0-4f59-9b4c-74436402ed14 email=bob@example.com
Affecting the Outside World: Sending Emails
The workflow exists but it doesn’t change the outside world in any way. For that, we need activities. Let’s start by sending an email to the customer upon their enrollment in the loyalty program as well as sending an email when they get disenrolled from the program (i.e. when the workflow method is about to exit).
Activity Interface
Activity methods are encapsulated inside an ActivityInterface.
// activities.kt
import io.temporal.activity.ActivityInterface
@ActivityInterface
interface NotificationActivities {
fun sendEmail(from: String, to: String, subject: String, body: String)
}
SMTP Server Configuration
Fill in the values below based on the configuration of your SMTP server. If you need a mock mail server for testing, try Mailtrap.
// activities.kt
val MAIL_SMTP_HOST = "<<FILLME>>"
val MAIL_SMTP_PORT = "<<FILLME>>"
val MAIL_SMTP_USER_NAME = "<<FILLME>"
val MAIL_SMTP_PASSWORD = "<<FILLME>>"
val MAIL_SMTP_AUTH = "<<FILLME>"
val MAIL_SMTP_STARTTLS_ENABLE = "<<FILLME>>"
Activity Implementation
The implementation isn’t that important here. The important thing to keep in mind is that the workflow should only “affect” the outside world (in this case sending email) from activity methods like the one shown below:
// activities.kt
class NotificationActivitiesImpl : NotificationActivities {
private val prop = Properties()
init {
prop["mail.smtp.host"] = MAIL_SMTP_HOST
prop["mail.smtp.port"] = MAIL_SMTP_PORT
prop["mail.smtp.auth"] = MAIL_SMTP_AUTH
prop["mail.smtp.starttls.enable"] = MAIL_SMTP_STARTTLS_ENABLE
}
override fun sendEmail(from: String, to: String, subject: String, body: String) {
val authenticator = object : Authenticator() {
override fun getPasswordAuthentication(): PasswordAuthentication {
return PasswordAuthentication(MAIL_SMTP_USER_NAME, MAIL_SMTP_PASSWORD)
}
}
val session: Session = Session.getInstance(prop, authenticator)
val message: Message = MimeMessage(session)
message.setFrom(InternetAddress(from))
message.setRecipients(
Message.RecipientType.TO,
InternetAddress.parse(to)
)
message.setSubject(subject)
message.setText(body)
Transport.send(message)
}
}
Register Activity Implementation with Worker
Just like with workflows, the activities must be hosted in a worker and attached to a task list. We do this by updating the worker’s main method with the following line:
// worker.kt
worker.registerActivitiesImplementations(NotificationActivitiesImpl())
...
factory.start()
Invoking Activities From the Workflow
Now, we’re able to invoke the activities from the workflow. Activity methods should never be invoked directly but instead through an Activity stub created using Workflow.newActivityStub
.
class LoyaltyWorkflowImpl : LoyaltyWorkflow {
private val notification: NotificationActivities
init {
notification = Workflow.newActivityStub(
NotificationActivities::class.java,
ActivityOptions.newBuilder()
.setScheduleToStartTimeout(Duration.ofMinutes(5))
.setStartToCloseTimeout(Duration.ofMinutes(5)).build()
)
}
override fun enroll(customer: CustomerInfo) {
println("Customer enrolled: id=${customer.id} email=${customer.email}")
notification.sendEmail(
"loyaltyservice@example.com",
customer.email,
"Welcome to the loyalty programme!",
"You can now start earning loyalty points"
);
// Business logic for members goes here
println("Customer disenrolled: id=${customer.id} email=${customer.email}")
notification.sendEmail(
"loyaltyservice@example.com",
customer.email,
"Sorry to see you go",
"We are purging your loyalty points"
);
}
}
Run the Workflow Again
$ docker run --network=host --rm temporalio/tctl:0.21.1 workflow run --tasklist LoyaltyTaskList --workflow_type LoyaltyWorkflow_enroll --execution_timeout 400000000 --input '{"id": "71659d9e-51c0-4f59-9b4c-74436402ed14 ", "email": "bob@example.com"}'
You should see something like this in the terminal:
Running execution:
Workflow Id : e71500c0-35bb-4202-a259-219cb1fc2806
Run Id : eb0a9df1-e61d-41b8-bd97-7d0432a52b27
Type : LoyaltyWorkflow_enroll
Namespace : default
Task List : LoyaltyTaskList
Args : {"id":
: "71659d9e-51c0-4f59-9b4c-74436402ed14
: ", "email": "bob@example.com"}
Progress:
1, 2020-04-19T09:28:56Z, WorkflowExecutionStarted
2, 2020-04-19T09:28:56Z, DecisionTaskScheduled
3, 2020-04-19T09:28:56Z, DecisionTaskStarted
4, 2020-04-19T09:28:57Z, DecisionTaskCompleted
5, 2020-04-19T09:28:57Z, ActivityTaskScheduled
6, 2020-04-19T09:28:57Z, ActivityTaskStarted
7, 2020-04-19T09:29:12Z, ActivityTaskCompleted
8, 2020-04-19T09:29:12Z, DecisionTaskScheduled
9, 2020-04-19T09:29:12Z, DecisionTaskStarted
10, 2020-04-19T09:29:12Z, DecisionTaskCompleted
11, 2020-04-19T09:29:12Z, ActivityTaskScheduled
12, 2020-04-19T09:29:12Z, ActivityTaskStarted
13, 2020-04-19T09:29:27Z, ActivityTaskCompleted
14, 2020-04-19T09:29:27Z, DecisionTaskScheduled
15, 2020-04-19T09:29:27Z, DecisionTaskStarted
16, 2020-04-19T09:29:27Z, DecisionTaskCompleted
17, 2020-04-19T09:29:27Z, WorkflowExecutionCompleted
Result:
Run Time: 31 seconds
Status: COMPLETED
Output:
There are two sets of ActivityTaskScheduled, ActivityTaskStarted and ActivityTaskCompleted because we sent emails twice .
Bob should have gotten two emails:
Next time: Part 2 and Part 3
In part 2, we will implement the logic for earning points and learn about signal and query methods. Then in Part 3 we will implement the tier upgrade/downgrade logic and we’ll look into using await
- the feature of Temporal that allows us to “pause” a function until some condition is met.
-
Temporal uses the term workflows to describe this type of business logic. However, I decided to stay clear of that term whenever possible because that term means different things to different people and brings up comparisons that are irrelevant and frankly a waste of time. ↩︎