Goal
Implement a struct which holds the workflow graph and yields the next values in line for execution.
Workflow Graph
When writing a workflow management engine, one has to decide on the data structure of the workflow. I decided to go for the most common one: A directed acyclic graph, in short DAG. I thought about using a more open type, i. e. a cyclic graph, but that has really weird implications for a retry system and output handling. It would be a cool idea to explore, but it sounds like a pain to implement.
I have not yet written anything about OpenWorkflow, but will be using the types here. Most of them should be self-explanatory, and I’m sure there will be posts about OpenWorkflow eventually, but if you’re interested just check out the protobuf definition in the linked repository.
Implementing a DAG
Luckily for me, there is a very extensive graph library in rust, petgraph. So first of I need to create the graph to hold my task instances.
|
|
And then a way to fill my Dag with a list of tasks, parsed from a protobuf message.
For this Dag
implements the TryFrom
trait.
|
|
FlowtyError
is using SNAFU to create errors for Flowty.
|
|
Now that that’s out of the way, let’s populate the graph. First we plainly create it and iterate over our list of tasks.
|
|
Parsing from a prost_types::Duration
to a chrono::Duration
is a bit of a pain, going over the std::time::Duration
type, but it is what it is.
Now we have a graph full of nodes, but no edges.
To change that we iterate through all node indices and check their downstream dependencies.
|
|
We add an edge of the task_id
if a downstream task is matching a node in our graph.
Any typos and so on in that list of downstream will, for now, just be ignored.
Now we have a directed graph, since that is the petgraph
default.
Before returning let’s check if it’s acyclic.
|
|
Nice! Petgraph comes with a built-in functionality. Now our Dag is complete.
|
|
Traversing (or, getting the current execution stage)
So now comes the part you’ve been waiting for: Traversing. Or rather, getting the current execution stage.
You see, it’s not so much that I want to drain my graph when passing nodes, nor that I want to get to the end of it immediatly.
Instead, I want to know which nodes (tasks) are executing, or are up for execution.
Which is why Dag
also impl
ements Iterator
.
An iterator allows to ask for the next item in line.
It’s a bit of an abuse of the functionality, because using this iterator in a for loop will block the entire thead until the graph is completly done.
But calling next
whenever we want to know if action is required on the Dag is handy.
Let’s start from the top. Namely using a topologic sort (thanks petgraph) to visit each node in the correct order.
|
|
It’s already explained in the comment above the function, but let’s write it out step-by-step.
The toposort
is already here, returning the index for each node in order.
We also create what we will later return, the stage, and the list of downstream tasks of the tasks in the current stage.
Additionally, I added a check at the very beginning of the loop, calling task_instance_is_done()
which returns true
if the status matches to success or failure.
In that case this node is already a-okay✅.
Now we check if the current task is already in the downstream
vector.
If it is not, we can directly add it to our stage
because we get all results from toposort
in order.
|
|
What’s missing now is the logic to determine whether something should be added to the current stage, even if it is part of the downstream dependencies.
So let’s match
over the run_condition
of our node.
The important conditions here are
None
, execute without caring for dependenciesOneDone
, execute when one parent is doneOneSuccess
, execute when one parent has succeededOneFailed
, execute when one parent has failed
The reason behind this is, that if all parents were done, it should not have been listed in the downstream
vector in the first place.
I implemented the logic for all possible cases in the match.
Rust match is exhaustive, although I could’ve probably just combined them all with _ => ()
.
But I decided it would be nice to have, and I will most likely extend the features of this code later.
For now, let’s dive into the match
, and match to None
.
|
|
Simple enough, we push the current node
to the stage
and add its downstream_tasks
to the downstream
vector.
Now the arms for OneDone
OneSuccess
OneFailed
.
|
|
I deducted the remaining conditions, because as I’ve mentioned they are not really relevant as of yet.
Basically the code for every arm is very similiar.
We start of by iterating through the parents by using the neighbors_directed()
functions with the Incoming
direction.
This means we get an Iterator of indices for the graph for all nodes with edges going towards our current node: the parents.
Then we check if the parent’s execution status is meeting our run condition.
If so, we add the node
to the stage
and its downstream_tasks
to the downstream
.
Simple as that, we created the current execution stage. Finally, we return the result.
|
|
By returning None
when the vector is empty, we signal the end of the iterator.
Review
We achieved our goal of implementing a workflow graph, which yields the tasks which are executing or up for execution. Since I’m new to Rust I’m not sure if I’ve taken the best or fanciest path, so I’m open for feedback. I’m also new to writing these kinds of posts, so feedback for this format is also appreciated.
Now that we have a Dag which we can ask for the next tasks in line, I will spend some more time on the scheduler. The next post will thus most likely be about the workings of the scheduler and using this implemention of a Dag.
Also I might come back to this implementation to add more features, refactor with more feedback and knowledge and most definetly add some logging capabilities📝.