Implement a struct which holds the workflow graph and yields the next values in line for execution.
When writing a workflow management engine, one has to decide on the data structure of the workflow. I decided to go for the most common one: A directed acyclic graph, in short DAG. I thought about using a more open type, i. e. a cyclic graph, but that has really weird implications for a retry system and output handling. It would be a cool idea to explore, but it sounds like a pain to implement.
I have not yet written anything about OpenWorkflow, but will be using the types here. Most of them should be self-explanatory, and I’m sure there will be posts about OpenWorkflow eventually, but if you’re interested just check out the protobuf definition in the linked repository.
Implementing a DAG
Luckily for me, there is a very extensive graph library in rust, petgraph. So first of I need to create the graph to hold my task instances.
And then a way to fill my Dag with a list of tasks, parsed from a protobuf message.
Dag implements the
FlowtyError is using SNAFU to create errors for Flowty.
Now that that’s out of the way, let’s populate the graph. First we plainly create it and iterate over our list of tasks.
Parsing from a
prost_types::Duration to a
chrono::Duration is a bit of a pain, going over the
std::time::Duration type, but it is what it is.
Now we have a graph full of nodes, but no edges.
To change that we iterate through all node indices and check their downstream dependencies.
We add an edge of the
task_id if a downstream task is matching a node in our graph.
Any typos and so on in that list of downstream will, for now, just be ignored.
Now we have a directed graph, since that is the
Before returning let’s check if it’s acyclic.
Nice! Petgraph comes with a built-in functionality. Now our Dag is complete.
Traversing (or, getting the current execution stage)
So now comes the part you’ve been waiting for: Traversing. Or rather, getting the current execution stage.
You see, it’s not so much that I want to drain my graph when passing nodes, nor that I want to get to the end of it immediatly.
Instead, I want to know which nodes (tasks) are executing, or are up for execution.
Which is why
An iterator allows to ask for the next item in line.
It’s a bit of an abuse of the functionality, because using this iterator in a for loop will block the entire thead until the graph is completly done.
next whenever we want to know if action is required on the Dag is handy.
Let’s start from the top. Namely using a topologic sort (thanks petgraph) to visit each node in the correct order.
It’s already explained in the comment above the function, but let’s write it out step-by-step.
toposort is already here, returning the index for each node in order.
We also create what we will later return, the stage, and the list of downstream tasks of the tasks in the current stage.
Additionally, I added a check at the very beginning of the loop, calling
task_instance_is_done() which returns
true if the status matches to success or failure.
In that case this node is already a-okay✅.
Now we check if the current task is already in the
If it is not, we can directly add it to our
stage because we get all results from
toposort in order.
What’s missing now is the logic to determine whether something should be added to the current stage, even if it is part of the downstream dependencies.
match over the
run_condition of our node.
The important conditions here are
None, execute without caring for dependencies
OneDone, execute when one parent is done
OneSuccess, execute when one parent has succeeded
OneFailed, execute when one parent has failed
The reason behind this is, that if all parents were done, it should not have been listed in the
downstream vector in the first place.
I implemented the logic for all possible cases in the match.
Rust match is exhaustive, although I could’ve probably just combined them all with
_ => ().
But I decided it would be nice to have, and I will most likely extend the features of this code later.
For now, let’s dive into the
match, and match to
Simple enough, we push the current
node to the
stage and add its
downstream_tasks to the
Now the arms for
I deducted the remaining conditions, because as I’ve mentioned they are not really relevant as of yet.
Basically the code for every arm is very similiar.
We start of by iterating through the parents by using the
neighbors_directed() functions with the
This means we get an Iterator of indices for the graph for all nodes with edges going towards our current node: the parents.
Then we check if the parent’s execution status is meeting our run condition.
If so, we add the
node to the
stage and its
downstream_tasks to the
Simple as that, we created the current execution stage. Finally, we return the result.
None when the vector is empty, we signal the end of the iterator.
We achieved our goal of implementing a workflow graph, which yields the tasks which are executing or up for execution. Since I’m new to Rust I’m not sure if I’ve taken the best or fanciest path, so I’m open for feedback. I’m also new to writing these kinds of posts, so feedback for this format is also appreciated.
Now that we have a Dag which we can ask for the next tasks in line, I will spend some more time on the scheduler. The next post will thus most likely be about the workings of the scheduler and using this implemention of a Dag.
Also I might come back to this implementation to add more features, refactor with more feedback and knowledge and most definetly add some logging capabilities📝.