Scheduler
Overview
A Scheduler is used to generate the order in which the nodes of a graph are
executed. By default, a Scheduler executes nodes in an order determined by the pattern of edges among the nodes in the graph, with each node executed once
per PASS
through the graph. For example, in a graph
in which a node A projects to a node
B that projects to a node C, A will execute first followed by B, and then C in each PASS
through the
graph. However, a Scheduler can be used to implement more complex patterns of execution, by specifying
Conditions that determine when and how many times individual nodes execute, and whether and how
this depends on the execution of other nodes. Any executable node in a graph can be assigned a
Condition, and Conditions can be combined in arbitrary ways to generate any pattern of execution of the nodes
in a graph that is logically possible.
Creating a Scheduler
When creating a Scheduler explicitly, the set of nodes to be executed and their order must be specified in the Scheduler’s constructor using one the following:
a graph specification dictionary in the graph argument - each entry of the dictionary must be a node of a graph, and the value of each entry must be a set of zero or more nodes that project directly to the key. The graph must be acyclic; an error is generated if any cycles (e.g., recurrent dependencies) are detected. The Scheduler computes a
toposort
from the graph that is used as the default order of executions, subject to any Conditions that have been specified (see below).
Conditions can be added to a Scheduler when it is created by specifying a ConditionSet
(a set of
Conditions) in the conditions argument of its constructor. Individual Conditions and/or
ConditionSets can also be added after the Scheduler has been created, using its add_condition
and
add_condition_set
methods, respectively.
Graph structure Conditions are
applied to the Scheduler’s graph in the order in which they are added
.
Algorithm
When a Scheduler is created, it constructs a consideration_queue
: a list of consideration sets
that defines the order in which nodes are eligible to be executed. This
is determined by the topological ordering of the graph
provided to the Scheduler's constructor, which is then modified by any graph structure conditions that are added
to the Scheduler. Each consideration_set
is a set of nodes that are eligible to execute at the same time/CONSIDERATION_SET_EXECUTION
(i.e.,
that appear at the same “depth” in a sequence of dependencies, and among which there are no dependencies). The first
consideration_set
consists of only origin nodes. The second consists of all nodes
that receive edges from the nodes in the first consideration_set
.
The third consists of nodes that receive edges from nodes in the first two consideration sets
,
and so forth. When the Scheduler is run, it uses the consideration_queue
to determine which
nodes are eligible to execute in each CONSIDERATION_SET_EXECUTION
of a PASS
, and then evaluates the Condition
associated with each node in the current consideration_set
to determine which should
actually be assigned for execution.
Pseudocode:
consideration_queue <- list(toposort(graph))
reset TimeScale.ENVIRONMENT_STATE_UPDATE counters
while TimeScale.ENVIRONMENT_STATE_UPDATE are not satisfied
and TimeScale.ENVIRONMENT_SEQUENCE termination conditions are not satisfied:
reset TimeScale.PASS counters
cur_index <- 0
while TimeScale.ENVIRONMENT_STATE_UPDATE termination conditions are not satisfied
and TimeScale.ENVIRONMENT_SEQUENCE termination conditions are not satisfied
and cur_index < len(consideration_queue):
cur_consideration_set <- consideration_queue[cur_index]
do:
cur_consideration_set_has_changed <- False
for cur_node in cur_consideration_set:
if cur_node not in cur_consideration_set_execution
and cur_node`s Condition is satisfied:
cur_consideration_set_has_changed <- True
add cur_node to cur_consideration_set_execution
increment execution and time counters
while cur_consideration_set_has_changed
if cur_consideration_set_execution is not empty or absolute time conditions are used:
yield cur_consideration_set_execution
increment cur_index
increment time counters
if all execution sets yielded were empty:
yield an empty execution set
Execution
Note
This section covers normal scheduler execution
(Scheduler.mode = SchedulingMode.STANDARD
). See
Exact Time Mode below for a description of
exact time mode
.
When a Scheduler is run, it provides a set of nodes that should be run next, based on their dependencies in the
graph specification, and any Conditions, specified in the Scheduler’s
constructor. For each call to the run
method, the Scheduler sequentially evaluates its
consideration sets in their order in the consideration_queue
. For each set, it determines
which nodes in the set are allowed to execute, based on whether their associated Condition has
been met. Any node that does not have a Condition explicitly specified is assigned a Condition that causes it
to be executed whenever it is under consideration and all its structural parents have been
executed at least once since the node’s last execution. All of the nodes within a consideration_set that are allowed to execute comprise a CONSIDERATION_SET_EXECUTION
of execution. These nodes are considered
as executing simultaneously.
Note
The ordering of the nodes specified within a CONSIDERATION_SET_EXECUTION
is arbitrary (and is irrelevant, as there are no
graph dependencies among nodes within the same consideration_set
). However,
the execution of a node within a CONSIDERATION_SET_EXECUTION
may trigger the execution of another node within its
consideration_set
, as in the example below:
C
↗ ↖
A B
scheduler.add_condition(B, graph_scheduler.EveryNCalls(A, 2))
scheduler.add_condition(C, graph_scheduler.EveryNCalls(B, 1))
execution sets: [{A}, {A, B}, {C}, ...]
Since there are no graph dependencies between A
and B
, they may execute in the same CONSIDERATION_SET_EXECUTION
. Morever,
A
and B
are in the same consideration_set
. Since B
is specified to run every two
times A
runs, A
’s second execution in the second CONSIDERATION_SET_EXECUTION
allows B
to run within that CONSIDERATION_SET_EXECUTION
,
rather than waiting for the next PASS
.
For each CONSIDERATION_SET_EXECUTION
, the Scheduler evaluates whether any specified
termination Conditions have been met, and terminates if so. Otherwise,
it returns the set of nodes that should be executed in the current CONSIDERATION_SET_EXECUTION
. Each subsequent call to the
run
method returns the set of nodes in the following CONSIDERATION_SET_EXECUTION
.
Processing of all of the consideration_sets in the consideration_queue
constitutes a PASS
of
execution, over which every node in the graph has been considered for execution. Subsequent calls to the
run
method cycle back through the consideration_queue
, evaluating the consideration_sets in the same order as previously. Different subsets of nodes within the same consideration_set may be assigned to execute on each PASS
, since different Conditions may be satisfied.
The Scheduler continues to make PASS
es through the consideration_queue
until a termination Condition is satisfied. If no termination Conditions are specified, by default the Scheduler
terminates an ENVIRONMENT_STATE_UPDATE
when every node has been specified for execution at least once
(corresponding to the AllHaveRun
Condition). However, other termination Conditions can be specified,
that may cause the Scheduler to terminate an ENVIRONMENT_STATE_UPDATE
earlier or later (e.g., when the Condition
for a particular node or set of nodes is met).
Termination Conditions
Termination conditions are basic Conditions that specify
when the open-ended units of time - ENVIRONMENT_STATE_UPDATE
and ENVIRONMENT_SEQUENCE
- have ended. By default, the termination condition for an ENVIRONMENT_STATE_UPDATE
is
AllHaveRun
, which is satisfied when all nodes have run at least once within the environment state update, and the termination
condition for an ENVIRONMENT_SEQUENCE
is when all of its constituent environment state updates have terminated.
Graph structure conditions cannot
be used as termination conditions.
Absolute Time
The scheduler supports scheduling of models of real-time systems in
modes, both of which involve mapping real-time values to
Time. The default mode is is most compatible with standard
scheduling, but can cause some unexpected behavior in certain cases
because it is inexact. The consideration queue remains intact, but as a
result, actions specified by fixed times of absolute-time-based
conditions (start
and end
of
TimeInterval
, and t
of TimeTermination
) may not occur at exactly
the time specified. The simplest example of this situation involves a
linear graph with two nodes:
>>> import graph_scheduler
>>> graph = {'A': set(), 'B': {'A'}}
>>> scheduler = graph_scheduler.Scheduler(graph=graph)
>>> scheduler.add_condition('A', graph_scheduler.TimeInterval(start=10))
>>> scheduler.add_condition('B', graph_scheduler.TimeInterval(start=10))
In standard mode, A and B are in different consideration sets, and so can never execute at the same time. At most one of A and B will start exactly at t=10ms, with the other starting at its next consideration after. There are many of these examples, and while it may be solveable in some cases, it is not a simple problem. So, Exact Time Mode exists as an alternative option for these cases, though it comes with its own drawbacks.
Note
Due to issues with floating-point precision, absolute time values in conditions and Time are limited to 8 decimal points. If more precision is needed, use fractions, where possible, or smaller units (e.g. microseconds instead of milliseconds).
Exact Time Mode
When Scheduler.mode
is SchedulingMode.EXACT_TIME
, the scheduler is
capable of handling examples like the one
above. In this mode, all nodes in the
scheduler’s graph become members of the same consideration set, and may
be executed at the same time for every consideration set execution,
subject to the conditions specified. As a result, the guarantees in
standard scheduling may not apply - that is,
that all parent nodes get a chance to execute before their children, and
that there exist no data dependencies (edges) between nodes in the
same execution set. In exact time mode, all nodes will be in one
[unordered] execution set. An ordering may be inferred by the original
graph, however, using the indices in the original consideration queue
.
Additionally, non-absolute conditions like
EveryNCalls
may behave unexpectedly in some cases.
Examples
Please see Condition for a list of all supported Conditions and their behavior.
Basic phasing in a linear process:
>>> import graph_scheduler >>> graph = {'A': set(), 'B': {'A'}, 'C': {'B'}} >>> scheduler = graph_scheduler.Scheduler(graph=graph) >>> # implicit condition of Always for A >>> scheduler.add_condition('B', graph_scheduler.EveryNCalls('A', 2)) >>> scheduler.add_condition('C', graph_scheduler.EveryNCalls('B', 3)) >>> # implicit AllHaveRun Termination condition >>> execution_sequence = list(scheduler.run()) >>> execution_sequence [{'A'}, {'A'}, {'B'}, {'A'}, {'A'}, {'B'}, {'A'}, {'A'}, {'B'}, {'C'}]
Alternate basic phasing in a linear process:
>>> graph = {'A': set(), 'B': {'A'}} >>> scheduler = graph_scheduler.Scheduler(graph=graph) >>> scheduler.add_condition( ... 'A', ... graph_scheduler.Any( ... graph_scheduler.AtPass(0), ... graph_scheduler.EveryNCalls('B', 2) ... ) ... ) >>> scheduler.add_condition( ... 'B', ... graph_scheduler.Any( ... graph_scheduler.EveryNCalls('A', 1), ... graph_scheduler.EveryNCalls('B', 1) ... ) ... ) >>> termination_conds = { ... graph_scheduler.TimeScale.ENVIRONMENT_STATE_UPDATE: graph_scheduler.AfterNCalls('B', 4, time_scale=graph_scheduler.TimeScale.ENVIRONMENT_STATE_UPDATE) ... } >>> execution_sequence = list(scheduler.run(termination_conds=termination_conds)) >>> execution_sequence [{'A'}, {'B'}, {'B'}, {'A'}, {'B'}, {'B'}]
Basic phasing in two processes:
>>> graph = {'A': set(), 'B': set(), 'C': {'A', 'B'}} >>> scheduler = graph_scheduler.Scheduler(graph=graph) >>> scheduler.add_condition('A', graph_scheduler.EveryNPasses(1)) >>> scheduler.add_condition('B', graph_scheduler.EveryNCalls('A', 2)) >>> scheduler.add_condition( ... 'C', ... graph_scheduler.Any( ... graph_scheduler.AfterNCalls('A', 3), ... graph_scheduler.AfterNCalls('B', 3) ... ) ... ) >>> termination_conds = { ... graph_scheduler.TimeScale.ENVIRONMENT_STATE_UPDATE: graph_scheduler.AfterNCalls('C', 4, time_scale=graph_scheduler.TimeScale.ENVIRONMENT_STATE_UPDATE) ... } >>> execution_sequence = list(scheduler.run(termination_conds=termination_conds)) >>> execution_sequence [{'A'}, {'A', 'B'}, {'A'}, {'C'}, {'A', 'B'}, {'C'}, {'A'}, {'C'}, {'A', 'B'}, {'C'}]
Class Reference
- class graph_scheduler.scheduler.Scheduler(graph, conditions=None, termination_conds=None, default_execution_id=None, mode=SchedulingMode.STANDARD, default_absolute_time_unit=<Quantity(1, 'millisecond')>)
Generates an order of execution for nodes in a graph or graph specification dictionary, possibly determined by a set of Conditions.
- Parameters:
graph (Dict[object: set(object)],
networkx.DiGraph
) –- a directed acyclic graph (DAG). Specified as either:
a graph specification dictionary: each entry of the
dictionary must be an object, and the value of each entry must be a set of zero or more objects that project directly to the key. - a
networkx.DiGraph
conditions (ConditionSet) – set of Conditions that specify when individual nodes in graph execute and any dependencies among them.
mode (SchedulingMode[STANDARD|EXACT_TIME] : SchedulingMode.STANDARD) – sets the mode of scheduling: standard or exact time
default_absolute_time_unit (
pint.Quantity
:1ms
) – if not otherwise determined by any absolute conditions, specifies the absolute duration of aCONSIDERATION_SET_EXECUTION
- conditions
the set of Conditions the Scheduler uses when running
- Type:
- default_execution_id
the execution_id to use if none is supplied to
run
; a unique identifier to allow multiple schedulings independently. Must be hashable.- Type:
object
- execution_list
the full history of consideration set executions the Scheduler has produced
- Type:
list
- consideration_queue
a list form of the Scheduler’s toposort ordering of its nodes
- Type:
list
- consideration_queue_indices
a dict mapping nodes to their position in the
consideration_queue
- Type:
dict
- termination_conds
a mapping from
TimeScales
toConditions
that, when met, terminate the execution of the specifiedTimeScale
. On set, update only for theTimeScale
s specified in the argument.- Type:
Dict[TimeScale:
Condition
]
- mode
sets the mode of scheduling: standard or exact time
- Type:
- Default:
- default_absolute_time_unit
if not otherwise determined by any absolute conditions, specifies the absolute duration of a
CONSIDERATION_SET_EXECUTION
- Type:
- Default:
1ms
- end_environment_sequence(execution_id=NotImplemented)
Signals that an
ENVIRONMENT_SEQUENCE
has completed- Parameters:
execution_id (optional) – Defaults to
Scheduler.default_execution_id
- add_graph_edge(sender, receiver)
Adds an edge to the
graph
from sender to receiver. Equivalent toadd_condition(sender, AddEdgeTo(receiver))
.- Parameters:
sender (Hashable) – sender of the new edge
receiver (Hashable) – receiver of the new edge
- Returns:
the new condition added to implement the edge
- Return type:
- remove_graph_edge(sender, receiver)
Removes an edge from the
graph
from sender to receiver if it exists. Equivalent toadd_condition(receiver, RemoveEdgeFrom(sender))
.- Parameters:
sender (Hashable) – sender of the edge to be removed
receiver (Hashable) – receiver of the edge to be removed
- Returns:
the new condition added to implement the edge
- Return type:
- add_condition(owner, condition)
Adds a basic or
graph structure
Condition to the Scheduler.If condition is basic, it will overwrite the current basic Condition for owner, if present. If you want to add multiple basic Conditions to a single owner, instead add a single Composite Condition to accurately specify the desired behavior.
If condition is structural, it will be applied on top of
Scheduler.graph
in the order it is added.- Parameters:
owner (
node
) – specifies the node with which the condition should be associated. condition will govern the execution behavior of ownercondition (ConditionBase) – specifies the Condition, associated with the owner to be added to the ConditionSet.
- add_condition_set(conditions)
Adds a set of basic or
graph structure
Conditions (in the form of a dict or another ConditionSet) to the Scheduler.Any basic Condition added here will overwrite the current basic Condition for a given owner, if present. If you want to add multiple basic Conditions to a single owner, instead add a single Composite Condition to accurately specify the desired behavior.
Any structural Condition added here will be applied on top of
Scheduler.graph
in the order they are returned by iteration over conditions.- Parameters:
conditions (dict[
node
: Condition],ConditionSet
) –specifies collection of Conditions to be added to this ConditionSet,
- if a dict is provided:
each entry should map an owner node (the node whose execution behavior will be governed) to a Condition
- remove_condition(owner_or_condition)
Removes the condition specified as or owned by owner_or_condition.
- Parameters:
owner_or_condition (Union[Hashable,
ConditionBase
]) – Either a condition or the owner of a condition- Return type:
Optional
[ConditionBase
]- Returns:
The condition removed, or None if no condition removed
- Raises:
ConditionError –
when owner_or_condition is an owner and it owns multiple conditions - when owner_or_condition is a condition and its owner is None
- run(termination_conds=None, execution_id=None, base_execution_id=None, skip_environment_state_update_time_increment=False, **kwargs)
run is a python generator, that when iterated over provides the next
CONSIDERATION_SET_EXECUTION
of executions at each iteration
- property graph: Dict[Hashable, Set[Hashable]]
The current graph used by this Scheduler, which is modified by any graph structure conditions added
- property consideration_queue: List[Set[Hashable]]
The ordered list of sets of nodes in the graph, by the order in which they will be checked to ensure that all senders have a chance to run before their receivers
- property consideration_queue_indices: Dict[Hashable, int]
A dictionary mapping the graph’s nodes to their position in the original consideration queue. This is the same as the consideration queue when not using SchedulingMode.EXACT_TIME.