Lately I’ve been doing a lot of work managing batch-processing tasks. Broadly speaking, there are 2 types of trigger for such a task: time-based and logic-based. The former can be easily done by cron/scheduled jobs. The latter can be a bit tricky, mostly because it can involve dependencies on other tasks. In this post, I will talk about how I’ve been using the AWS Simple Workflow service (SWF) to take some of the headache out of orchestrating tasks.
What’s a Workflow?
So what do I mean by ‘workflow’ exactly? I define it as a sequence of activities for accomplishing a well-defined objective. Furthermore, it can involve ordering, dependency and logic.
If this seems a bit abstract, it may be helpful just to think of a workflow as something that can be described with a flow chart. Consider the following two task definitions:
- Task A: Generates a random letter and prints it into a local file /var/tmp/a.out. This task runs daily, and does not have any dependency on any other tasks/conditions. Let’s assume that this task is actually implemented by a Python script called a.py.
- Task B: Loads the letter from /var/tmp/a.out from Task A, return a random fruit/vegetable name starting with that letter. This task needs to run after a successful completion of task A. Let’s assume that this task is implementednp by another Python script called b.py.
So, for example, if the output of Task A is the letter “a”, then Task B might output “apple” or “avocado”.
Based on these requirements, we could draw the following flow chart:
- Rectangles represent activities
- Diamonds represent decisions
- Ovals represent process states
This example workflow is not only time-based, but also involves a bit of logic.
If you were to try to use just a cron job to orchestrate these tasks, you’d also have to introduce some coupling between the two tasks. Depending on the complexity of the original scripts files, you could:
- Merge the two scripts into one, kicking off one task after the other with a check on the completion and result, or
- Have a separate script to manage the ordering and dependency. This would minimise the changes to the original scripts.
The downside of both of these approaches is that if there are any future changes to the workflow logic, you will have to change the scripts correspondingly. Even worse, if the scripts belong to multiple projects, the coupling will make the source code more messy and complicated. Finally, you’ll also need to have something in place to monitor the progress of script execution, keep execution history for analysis, and send alerts when certain tasks fail, etc.
So as the true complexity of the problem dawns on us, we start to wonder: wouldn’t it be great if there was something that could manage all this without touching your original codes? And of course, the answer is simple: AWS Simple Workflow. But HOW?
The AWS SWF service is a central management tool which provides task coordination and state tracking in the AWS Cloud. To work with SWF, you can either:
- Build an application from scratch on top of the SWF framework, or
- Introduce an agent which acts as an intermediary between your existing application and SWF. This agent will be responsible for managing execution of your application, but will otherwise be totally independent from it.
I have found the second approach to be more common in practice. This is because the applications I’ve been trying to retrofit into a workflow are usually already running in a production environment. Having to rebuild them from scratch on top of the SWF framework would be a nuisance. So for the remainder of this blog, we’ll implement an agent-based solution to the example workflow I introduced earlier.
Let’s head over the AWS console and navigate to Simple Workflow Service Dashboard. I’ve annotated the parts that are of interest:
The first thing we then do is create a Domain. This is for scoping the requirements of your workflow. Say that you need SWF to manage multiple processes with completely independent business logic. In that case, I highly recommended that you create a separate domain for each project, so then you can easily track workflow execution.
In the SWF dashboard, you can find the “Manage Domains” button on the upper-right corner. By clicking on it you will find the list of currently registered domains as well as deprecated ones if any. Initially, it will be empty:
But after clicking the “Register New” button, you’ll be taken to the ‘Register Domain’ page, where you can fill in the details of the new domain to be registered:
Next, you click on “Register” button. When the new domain is successfully registered, you’ll see this message:
You then go back to the “Manage Domains” window, where your newly-registered domain will be listed:
Next, we will need to register a Workflow Type. A workflow type effectively maps to what’s represented in our flow chart.
Returning to the SWF dashboard, we need to select the domain from the dropdown list that the workflow type will belong to.
Once domain is selected, click on the link named”Register a New Workflow Type” listed under “Quick Links” in the dashboard. A new workflow type registration page will appear:
The task list in the registration form can be thought of as a queue containing all the pending tasks. There are two types of task. Decision Tasks decide what will be the next workflow activity. Activity Tasks perform the actual activities.
You can use different task lists for queuing decision tasks and activity tasks, especially when handling fairly complex flow logic. However, for a simple workflow it’s not usually necessary.
Once all the details are confirmed, click on the “Continue” button to enter the “Review” page:
Task Priorities are useful when you execute multiple workflow types within the same domain. Child Policies are needed when the execution of your workflow type has child executions, which tells SWF what to do with the child executions when the parent execution is being terminated. However in this example, we can simply choose the default child policy, which means all the child executions will be terminated when the parent is terminated.
After reviewing the workflow type registration details by clicking on the “Review” button , the new workflow type will be successfully registered by clicking the “Register Workflow” button:
Once a SWF workflow type is registered, we can then register the Activity Types for the tasks defined in the flow chart. It’s important to understand that the activity in the SWF domain is not the actual task or command to kick off a program. Instead, it’s the name of a step in the SWF workflow execution process.
Click on the link “Register a New Activity Type” listed under “Quick Links” in the dashboard, and you will see the activity type registration page. We can then register the activity type for Task A:
We can repeat the same step for Test_Task_B using all the same details (except the Activity Type Name).
So to summarise, here are the things we’ve set up so far:
- A SWF Domain: TEST_SWF
- A SWF Workflow Type: TestWorkflow 1.0
- 2 SWF Activity Types:
- Test_Task_A 1.0
- Test_Task_B 1.0
Let’s now look at how we link everything together.
Suppose that we need to conditionally execute a sequence of tasks. Furthermore, we’ll delegate someone to work on the list of tasks, and somebody to make the decision on which task to do. We’ll refer to these roles as ‘the worker’ and ‘the decider’ respectively.
Thinking in terms of what we might see on a flow chart, the decider focuses on diamond-shaped and oval-shaped tasks, analysing what the current state is and choosing what activity to perform next. For example, the decider might check if we’re at the start of an execution cycle, and if current conditions are OK for the next task to run.
The worker, on the other hand, actually performs the activities; for example, running task A by executing the appropriate Python script.
In order to run the correct task, the worker needs to ask the decider a question: “what’s the task?”. Once the decider has made it’s decision, the worker is notified. It executes the task, lets the decider know the result of the execution, and then ask the same question again: “what’s the task?”.
SWF is the mediator that helps the decider and worker communicate with each other. It does this by:
- Scheduling decision tasks for the decider during a workflow execution, notifying it of a point where a decision needs to made to get the execution going. This is very similar to the real-life situation where the worker asks the decider the question “what’s the task?”
- Collecting decisions made by the decider. Note that a decision is not simply a “Yes” or ‘No’. Instead, it’s an action, such as starting certain activity task or completing or terminating the current workflow execution.
- Scheduling activity tasks for the worker if the decider tells it to. You can consider this step as SWF service passes the decision of “what task to run” from the decider to the worker.
- Collecting activity task result from the worker, and asking the decider what will be the next step by scheduling a decision task. Again, SWF service asks the decider the same question: “What’s next?”
Note that SWF service does not make any decisions or handle any logic of the workflow execution. Instead, what it does basically is separate the flow chart into two groups of tasks. This is very helpful when you’re implementing a very complex solution consisting of several tasks, and you don’t want the dependencies among the tasks to complicate the solution even more.
So all we need to do is write programs for both the decider and worker.
The decider program needs to gets decision tasks from SWF once scheduled, and then responds to SWF with the decision made, which it has based on an evaluation of the current state. A response can be scheduling a new activity task, terminating the current workflow execution on errors, or completing the execution if no more tasks are to be run.
The worker program needs to get activity tasks from SWF service once scheduled, and then executes the actual task by kicking off the corresponding task script based on the activity type. In our case, we’ll make it that Test_Task_A maps to the a.py script, and Test_Task_B maps to b.py.
Implementing Decider and Worker scripts
We’re going to implement our decider and worker in Python using the boto3 library.
First let’s start with the decider program, which we’ll call ‘decider.py‘. Firstly, let’s have it initialise the connection to the SWF service:
import boto3 client = boto3.client('swf')
Next, we use the ‘client’ object to poll decision tasks from the SWF service:
Now, based on the initial conditions and the outcomes of previously-completed activity tasks, we makes decisions and take action. This involves scheduling the next activity task, completing the workflow execution if there are no more activity tasks, or terminating the workflow execution due to failures/errors. Thus the ‘decision’ can be considered as the result of the decision task, and will be reported to the SWF service.
Each decision task scheduled by the SWF service has all the history events, including the previously handled decision tasks and executed activity tasks, so that the decider can easily get the context from the current decision task.
In the code snippet, based on the result from the”findNextActivity” method , the “decide” method will complete the workflow execution if the next activity is “complete” (meaning there are no more activities to run), or schedule the next activity task if the conditions allow, or fail the workflow execution if there are failures in the history.
Now let’s take a look at the worker, which we’ll call ‘worker.py‘. After initialising its connection to the SWF service, it polls for activity tasks from the service:
Based on the type of the activity task, it then executes the corresponding application program. For example, if the type of the activity task received by the worker program is ‘Test_Task_A’, then the Python script ‘a.py’ will be executed:
Note that, via the “respondActivityTaskCompleted” and “respondActivityTaskFailed” functions, it also reports to the SWF service whether the execution succeeded or failed, along with the result if necessary.
So to summarise, we now have the following two scripts:
- decider.py, which:
- polls decision tasks from task list “TestTasks”
- reports decision to SWF service by scheduling activity task, fail workflow execution or complete workflow execution
- worker.py, which:
- polls activity tasks from task list “TestTasks”
- kicks off the corresponding script based on the activity type
- reports the execution result of the current activity task to SWF service
Putting it all together
Now that we’ve got all of the pieces in place, let’s put it all together and take it for a spin.
First, we start a workflow execution by clicking “Start a new Workflow Execution” listed under ‘Quick Links’ on the SWF dashboard. This will pop up the following wizard dialog:
We fill in the details then click on “Continue”, which takes us forward to the next stage:
We don’t need to provide any additional options, so simply click on “Review” to review a summary of the execution:
Having checked the details, we click on “Start Execution”. The following message will be displayed:
Closing the current dialog, we go to the “My Workflow Executions” screen, where we can find the current running execution:
Now let’s drill into the execution, and figure out how our decider and worker can interact with SWF to get the job done.
In the ‘Workflow Execution ID’ column, we click on ‘Demo’ to enter the workflow execution page, where we can find 3 tabs: ‘Summary’, ‘Events’ and ‘Activities’. The ‘Summary’ tab displays the configurations of the workflow execution:
However, it’s the second two tabs that are of most interest when walking through the workflow execution. First let’s look at what’s under the ‘Events’ tab after a workflow execution is kicked off:
I’ve annotated the important parts of this screen. In short, the events indicate that until a decision is made, there won’t be any more activity tasks scheduled or started. We can confirm this by clicking on the ‘Activities’ tab:
So basically what has happened here is that the SWF service is waiting for a decision from the decider program. This is where we can run decider.py to handle the first decision task and let it tell SWF service what to do next:
$ python decider.py Received a Decision Task Making decision on what is next Decision Made: Schedule activity -- Test_Task_A
Now that a decision is made, SWF service should have scheduled an activity task of type “Test_Task_A”.To check if this has happened, we can return to the ‘Events’ tab, where we’ll see a couple of new events:
In order to find out which activity task is scheduled, we need to check under ‘Activities’ tab.
According to the flow chart, this is exactly what we expect: that Task A will be the first task of the workflow. So now we can run worker.py to handle the first activity task (‘A’):
$ python worker.py Received a Activity Task Execute Test_Task_A with command - python /var/tmp/TEST_SWF/a.py &amp;amp;gt; /dev/null 2&amp;amp;gt;&amp;amp;amp;1 Test_Task_A succeeded with output: b
We see that the execution of Task A has randomly picked the letter “b” and printed it into output file a.out, which will be the reference for Task B. The result of the activity task execution will be reported to SWF service from the worker:
And, if we look at the status of the activity:
We see that it is consistent with what’s indicated in event 7.
Now we need to run decider.py again to handle the second decision task scheduled by SWF service:
$ python decider.py Received a Decision Task Making decision on what is next Test_Task_A succeeded Decision Made: Schedule activity -- Test_Task_B
The decider program goes through the events history and realises that Test A has succeeded. It then makes the decision that it’s OK to run Task B, and asks SWF to schedule an activity task of Test_Task_B. Returning to the ‘Events’ log, we can see this taking place:
and a new activity of “Test_Task_B” scheduled under the ‘Activities’ Tab:
Next, we run worker.py again to handle the second activity task (‘B’):
$ python worker.py Received a Activity Task Execute Test_Task_B with command - python /var/tmp/TEST_SWF/b.py &amp;amp;gt; /dev/null 2&amp;amp;gt;&amp;amp;amp;1 Test_Task_B succeeded with output: banana
This time, Task B picks up the output from Task A (the letter “b”) and randomly finds a fruit/vegetable name starting with “b”, in this case “banana”. Task B succeeds, which we can confirm on the ‘Events’ tab:
and we see on the ‘Activities’ tab that the status of both activities is now marked as ‘Completed’:
Finally, we run decider.py to handle the third decision task:
$ python decider.py Received a Decision Task Making decision on what is next Decision Made: Complete the workflow execution
The decider walks through all the history events and sees that all the tasks involved in the workflow have been completed successfully. Consequently, it decides to mark the workflow execution as completed. We can verify this on the ‘Events’ tab:
If we return to the “My Workflow Executions” screen, we can also see that the workflow execution is now marked as “Completed”:
When you have a problem that requires coordinating multiple tasks that have dependencies, SWF can be an excellent option. It gives you a single point for registering tasks and storing the state of your workflow, which makes it easier to keep track of how things are progressing. It’s easy for task orchestration to become an ad-hoc mess of inter-dependent scripts. A cloud-based workflow engine like SWF will help you bring some order to the chaos.