Long Running Business Logic in Plain Old Code Part 1

Usually, when programmers are tasked with programming a long running piece of business logic (e.g. subscriptions, gamification, marketing campaigns, any customer journey) they will reach for familiar tools such as cron, message queues and manual state management with their db of choice.

In this series of posts I want to show an alternative approach - implementing long running business logic as a single long running function. Traditionally, we cannot implement anything long running as a single function because processes and machines fail and the function’s state is in volatile memory. However, what if there was a tool that allowed us to code in such a way that we could treat memory as persistent and reliable (Smalltalk programmers might be familiar with this concept)

The long running business that we will implement in this blog series is a loyalty system. The tool that we will be covering for implementing these long running functions1 is Temporal.

A Loyalty System

I chose to create a loyalty system to showcase Temporal in action. The business rules of our loyalty system are (no prizes for guessing which company I ripped this off from):

Membership Tiers

Customers enrolled into the loyalty system are called members. An email should be sent when a customer is first enrolled in this system. An email should also be sent when they are for whatever reason disenrolled from the loyalty system.

There are 4 membership tiers - Member, Silver, Gold and Platinum.

An email should be sent whenever the customer is downgraded or upgraded.

Loyalty Points

Points are earned by performing transactions in the system. Each 1 dollar spent earns the member 1 point. A multiplier is also applied based on the membership tier - Member (1x), Silver (1.5x), Gold (2.5x) and Platinum (3x).

All points have the same expiry period i.e extending the expiry period, extends the expiry period of all points. Upon making a transaction, the expiry period of all points are extended by 3 full calendar months e.g. earning any points in January extends the points expiry period to the end of April (3 complete months). Conducting a transaction in March will then extend it until the end of June.

Therefore, as long as the customer makes a transaction every 3 months the points will not expire.

Tier Upgrades

Accumulating enough new points within a 6 month period entitles members to upgrade to the next membership tier. The 6 month periods are January 1 to June 30 and July 1 to December 31.

Multiple upgrades can occur during the same 6-month period. Earning a membership tier during one 6-month period earns them the benefits of that tier during the current 6-month period and the next 6-month period.

At the end of the 6-month period, members are either upgraded or downgraded based on the points accumulated during that period.

Prerequisites

You need the following installed on your computer before continuing:

Uber Cadence and Temporal

Uber Cadence is a workflow engine developed at Uber to help their engineers build long running business logic. Temporal is a fork of Cadence that is being actively developed by Temporal Technologies Inc. In this blog post, we’ll be covering Temporal but the equivalent code for Cadence will be very similar.

Temporal consists of a server, client SDKs, a command line tool and a web UI for viewing workflow state.

Temporal has two official SDKs - a Golang and a Java SDK. I’ll be implementing the loyalty system using the Java SDK and using the Kotlin programming language.

Behind the scenes, Temporal requires a database. The most battle tested option is Cassandra. MySQL is supported as well.

Temporal Concepts

There are a handful of concepts that we will need to familiarize ourselves with:

Workflow Methods - Workflow methods contain the business logic to be implemented. It should contain logic for coordinating/orchestration but should not “affect” the outside world directly. Generally speaking, the workflow ends when the workflow method “returns”.

Activities and Activity methods - Activity methods are how the workflow methods can change the outside world. Any type of logic that could possibly have a side effect should be done in an activity method e.g. API calls to other systems, database queries etc..

Signal Method - While the workflow is running it be can be triggered by external code when certain events occur using signal methods.

Query Methods - Query methods allow external code to pull information about the current state of the workfllow.

Tasklist - Tasklist is the term for the queueing abstraction within Temporal. Workflows and activities are scheduled for execution by workers by placing tasks on a tasklist.

Workers - The Temporal server does not run any application code directly.. That is the job of workers. Workers will listen on a task list for workflows and activities to execute.

Project Setup

Installing Temporal

The easiest way to get up and running is with docker-compose:

$ curl -O https://raw.githubusercontent.com/temporalio/temporal/master/docker/docker-compose.yml
$ docker-compose up

The docker compose file takes care of starting the Temporal service, Cassandra and the Temporal web UI.

At this point, your machine will have:

Gradle Project Setup

Create your Kotlin project using Gradle and making the following changes to your build.gradle:

plugins {
    ....
    id 'application'
}
repositories {
    ....
    maven {
        url "https://oss.sonatype.org/content/repositories/snapshots/"
    }
}

dependencies {
    ....
    compile group: 'io.temporal', name: 'temporal-sdk', version: '0.21.1'
    compile group: 'commons-configuration', name: 'commons-configuration', version: '1.9'
    compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
}

application {
    mainClassName = 'WorkerKt'
}

logback.xml

Create a file in src/main/resource/logback.xml:

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    <root level="INFO">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

Hello Loyalty

The Workflow Interface

// loyalty-system.kt
import io.temporal.workflow.WorkflowInterface
import io.temporal.workflow.WorkflowMethod

@WorkflowInterface
interface LoyaltyWorkflow {
    @WorkflowMethod
    fun enroll()
}

The WorkflowInterface is the contract between the workflow logic and the outside world. WorkflowMethods are invoked to start workflow instances.

The enroll method will be invoked by external code whenever a customer should be enrolled in the loyalty programme.

Implementing the Workflow Interface

This is where Temporal is a little magical. The bulk of the business logic will be implemented in the enroll() method.

The customer will be enrolled into the loyalty programme for many years. Traditionally, we would need to store the state of the customer, with regards to this process, in the database and read and upate it each time an event happens. This crutch is necessary because there is no abstraction for the programmer to leverage that allows them to treat the computer’s memory as persistent.

However, with Temporal, we can implement this logic as just a plain method. As long as the method does not return, the customer is still enrolled in the loyalty programme.

In later parts of this blog series we will implement the actual business logic of the loyalty programme in this method and we will see how Temporal allows us to keep the method running for as long as we need.

// loyalty-system.kt
class LoyaltyWorkflowImpl : LoyaltyWorkflow {
    override fun enroll() {
        println("Customer enrolled")

        // Business logic for members go here

        println("Customer disenrolled")
    }
}

Creating a worker

Workflows require a worker process to host them. The worker’s job is to connect to the Temporal service and then to listen on tasklists for workflows or activities to execute.

// worker.kt 
import io.temporal.client.WorkflowClient
import io.temporal.serviceclient.WorkflowServiceStubs
import io.temporal.worker.WorkerFactory

fun main() {
    val service = WorkflowServiceStubs.newInstance()
    val client = WorkflowClient.newInstance(service)
    val factory = WorkerFactory.newInstance(client)
    val worker = factory.newWorker("LoyaltyTaskList");
    worker.registerWorkflowImplementationTypes(LoyaltyWorkflowImpl::class.java)
    factory.start()
}

Start the worker in IntelliJ or in Gradle using:

$ ./gradlew run

Executing the Workflow from the Command Line

Temporal includes a command line tool that can be used for (among other things) starting workflows. To start a workflow, we need to specify:

$ docker run --network=host --rm temporalio/tctl:0.21.1 workflow run --tasklist 
LoyaltyTaskList --workflow_type LoyaltyWorkflow_enroll --execution_timeout 400000000

This should yield the output:

Running execution:
  Workflow Id : 2735037b-6f92-4947-ae0b-b5d994fbd47f
  Run Id      : 9b35e43e-366c-4f8e-a848-a6baa4ea81eb
  Type        : LoyaltyWorkflow_enroll
  Namespace   : default
  Task List   : LoyaltyTaskList
  Args        :
Progress:
  1, 2020-04-19T06:08:12Z, WorkflowExecutionStarted
  2, 2020-04-19T06:08:12Z, DecisionTaskScheduled
  3, 2020-04-19T06:08:12Z, DecisionTaskStarted
  4, 2020-04-19T06:08:12Z, DecisionTaskCompleted
  5, 2020-04-19T06:08:12Z, WorkflowExecutionCompleted

Result:
  Run Time: 1 seconds
  Status: COMPLETED
  Output:

The workflow run command starts the workflow and waits for it to finish. workflow start will start the workflow and exit immediately

Workflows with arguments

Right now we can’t do much because we don’t have much information about the customer that just enrolled. To get that information we need to modify our workflow to accept arguments.

Lets create a data class for the arguments to the workflow

data class CustomerInfo(var id: String,
                        var email: String)

Then update the signature of the @WorkflowMethod

@WorkflowInterface
interface LoyaltyWorkflow {
    @WorkflowMethod
    fun enroll(customer: CustomerInfo)
}

Update the implementation of the workflow method as well.

class LoyaltyWorkflowImpl : LoyaltyWorkflow {
    override fun enroll(customer: CustomerInfo) {
        println("Customer enrolled: id=${customer.id} email=${customer.email}")

        // Business logic for members goes here

        println("Customer disenrolled: id=${customer.id} email=${customer.email}")
    }
}

Note: It’s actually not a good idea to use println() here but we’ll get to that in a future post.

Running the Workflow with Arguments

docker run --network=host --rm temporalio/tctl:0.21.1 workflow run --tasklist LoyaltyTaskList --workflow_type LoyaltyWorkflow_enroll --execution_timeout 400000000 --input '{"id": "71659d9e-51c0-4f59-9b4c-74436402ed14 ", "email": "bob@example.com"}'

In the console of the worker you should see:

Customer enrolled: id=71659d9e-51c0-4f59-9b4c-74436402ed14  email=bob@example.com
Customer disenrolled: id=71659d9e-51c0-4f59-9b4c-74436402ed14  email=bob@example.com

Affecting the Outside World: Sending Emails

The workflow exists but it doesn’t change the outside world in any way. For that, we need activities. Let’s start by sending an email to the customer upon their enrollment in the loyalty program as well as sending an email when they get disenrolled from the program (i.e. when the workflow method is about to exit).

Activity Interface

Activity methods are encapsulated inside an ActivityInterface.

// activities.kt

import io.temporal.activity.ActivityInterface

@ActivityInterface
interface NotificationActivities {
    fun sendEmail(from: String, to: String, subject: String, body: String)
}

SMTP Server Configuration

Fill in the values below based on the configuration of your SMTP server. If you need a mock mail server for testing, try Mailtrap.

// activities.kt

val MAIL_SMTP_HOST = "<<FILLME>>"
val MAIL_SMTP_PORT = "<<FILLME>>"
val MAIL_SMTP_USER_NAME = "<<FILLME>"
val MAIL_SMTP_PASSWORD = "<<FILLME>>"
val MAIL_SMTP_AUTH = "<<FILLME>"
val MAIL_SMTP_STARTTLS_ENABLE = "<<FILLME>>"

Activity Implementation

The implementation isn’t that important here. The important thing to keep in mind is that the workflow should only “affect” the outside world (in this case sending email) from activity methods like the one shown below:

// activities.kt

class NotificationActivitiesImpl : NotificationActivities {
    private val prop = Properties()

    init {
        prop["mail.smtp.host"] = MAIL_SMTP_HOST
        prop["mail.smtp.port"] = MAIL_SMTP_PORT
        prop["mail.smtp.auth"] = MAIL_SMTP_AUTH
        prop["mail.smtp.starttls.enable"] = MAIL_SMTP_STARTTLS_ENABLE
    }

    override fun sendEmail(from: String, to: String, subject: String, body: String) {
        val authenticator = object : Authenticator() {
            override fun getPasswordAuthentication(): PasswordAuthentication {
                return PasswordAuthentication(MAIL_SMTP_USER_NAME, MAIL_SMTP_PASSWORD)
            }
        }
        val session: Session = Session.getInstance(prop, authenticator)

        val message: Message = MimeMessage(session)
        message.setFrom(InternetAddress(from))
        message.setRecipients(
            Message.RecipientType.TO,
            InternetAddress.parse(to)
        )
        message.setSubject(subject)
        message.setText(body)

        Transport.send(message)
    }
}

Register Activity Implementation with Worker

Just like with workflows, the activities must be hosted in a worker and attached to a task list. We do this by updating the worker’s main method with the following line:

// worker.kt
worker.registerActivitiesImplementations(NotificationActivitiesImpl())
...
factory.start()

Invoking Activities From the Workflow

Now, we’re able to invoke the activities from the workflow. Activity methods should never be invoked directly but instead through an Activity stub created using Workflow.newActivityStub.

class LoyaltyWorkflowImpl : LoyaltyWorkflow {

    private val notification: NotificationActivities

    init {
        notification = Workflow.newActivityStub(
            NotificationActivities::class.java,
            ActivityOptions.newBuilder()
                .setScheduleToStartTimeout(Duration.ofMinutes(5))
                .setStartToCloseTimeout(Duration.ofMinutes(5)).build()
        )
    }

    override fun enroll(customer: CustomerInfo) {
        println("Customer enrolled: id=${customer.id} email=${customer.email}")
        notification.sendEmail(
            "loyaltyservice@example.com",
            customer.email,
            "Welcome to the loyalty programme!",
            "You can now start earning loyalty points"
        );

        // Business logic for members goes here

        println("Customer disenrolled: id=${customer.id} email=${customer.email}")
        notification.sendEmail(
            "loyaltyservice@example.com",
            customer.email,
            "Sorry to see you go",
            "We are purging your loyalty points"
        );
    }
}

Run the Workflow Again

$ docker run --network=host --rm temporalio/tctl:0.21.1 workflow run --tasklist LoyaltyTaskList --workflow_type LoyaltyWorkflow_enroll --execution_timeout 400000000 --input '{"id": "71659d9e-51c0-4f59-9b4c-74436402ed14 ", "email": "bob@example.com"}'

You should see something like this in the terminal:

Running execution:
  Workflow Id : e71500c0-35bb-4202-a259-219cb1fc2806
  Run Id      : eb0a9df1-e61d-41b8-bd97-7d0432a52b27
  Type        : LoyaltyWorkflow_enroll
  Namespace   : default
  Task List   : LoyaltyTaskList
  Args        : {"id":
              : "71659d9e-51c0-4f59-9b4c-74436402ed14
              : ", "email": "bob@example.com"}
Progress:
  1, 2020-04-19T09:28:56Z, WorkflowExecutionStarted
  2, 2020-04-19T09:28:56Z, DecisionTaskScheduled
  3, 2020-04-19T09:28:56Z, DecisionTaskStarted
  4, 2020-04-19T09:28:57Z, DecisionTaskCompleted
  5, 2020-04-19T09:28:57Z, ActivityTaskScheduled
  6, 2020-04-19T09:28:57Z, ActivityTaskStarted
  7, 2020-04-19T09:29:12Z, ActivityTaskCompleted
  8, 2020-04-19T09:29:12Z, DecisionTaskScheduled
  9, 2020-04-19T09:29:12Z, DecisionTaskStarted
  10, 2020-04-19T09:29:12Z, DecisionTaskCompleted
  11, 2020-04-19T09:29:12Z, ActivityTaskScheduled
  12, 2020-04-19T09:29:12Z, ActivityTaskStarted
  13, 2020-04-19T09:29:27Z, ActivityTaskCompleted
  14, 2020-04-19T09:29:27Z, DecisionTaskScheduled
  15, 2020-04-19T09:29:27Z, DecisionTaskStarted
  16, 2020-04-19T09:29:27Z, DecisionTaskCompleted
  17, 2020-04-19T09:29:27Z, WorkflowExecutionCompleted

Result:
  Run Time: 31 seconds
  Status: COMPLETED
  Output:

There are two sets of ActivityTaskScheduled, ActivityTaskStarted and ActivityTaskCompleted because we sent emails twice .

Bob should have gotten two emails:

image-20200419193411625

Next time: Part 2 and Part 3

In part 2, we will implement the logic for earning points and learn about signal and query methods. Then in Part 3 we will implement the tier upgrade/downgrade logic and we’ll look into using await - the feature of Temporal that allows us to “pause” a function until some condition is met.


  1. Temporal uses the term workflows to describe this type of business logic. However, I decided to stay clear of that term whenever possible because that term means different things to different people and brings up comparisons that are irrelevant and frankly a waste of time. ↩︎

Comments

comments powered by Disqus