Scheduler

Overview

A Scheduler is used to generate the order in which the nodes of a graph are executed. By default, a Scheduler executes nodes in an order determined by the pattern of edges among the nodes in the graph, with each node executed once per PASS through the graph. For example, in a graph in which a node A projects to a node B that projects to a node C, A will execute first followed by B, and then C in each PASS through the graph. However, a Scheduler can be used to implement more complex patterns of execution, by specifying Conditions that determine when and how many times individual nodes execute, and whether and how this depends on the execution of other nodes. Any executable node in a graph can be assigned a Condition, and Conditions can be combined in arbitrary ways to generate any pattern of execution of the nodes in a graph that is logically possible.

Creating a Scheduler

When creating a Scheduler explicitly, the set of nodes to be executed and their order must be specified in the Scheduler’s constructor using one the following:

  • a graph specification dictionary in the graph argument - each entry of the dictionary must be a node of a graph, and the value of each entry must be a set of zero or more nodes that project directly to the key. The graph must be acyclic; an error is generated if any cycles (e.g., recurrent dependencies) are detected. The Scheduler computes a toposort from the graph that is used as the default order of executions, subject to any Conditions that have been specified (see below).

Conditions can be added to a Scheduler when it is created by specifying a ConditionSet (a set of Conditions) in the conditions argument of its constructor. Individual Conditions and/or ConditionSets can also be added after the Scheduler has been created, using its add_condition and add_condition_set methods, respectively.

Graph structure Conditions are applied to the Scheduler’s graph in the order in which they are added.

Algorithm

When a Scheduler is created, it constructs a consideration_queue: a list of consideration sets that defines the order in which nodes are eligible to be executed. This is determined by the topological ordering of the graph provided to the Scheduler's constructor, which is then modified by any graph structure conditions that are added to the Scheduler. Each consideration_set is a set of nodes that are eligible to execute at the same time/CONSIDERATION_SET_EXECUTION (i.e., that appear at the same “depth” in a sequence of dependencies, and among which there are no dependencies). The first consideration_set consists of only origin nodes. The second consists of all nodes that receive edges from the nodes in the first consideration_set. The third consists of nodes that receive edges from nodes in the first two consideration sets, and so forth. When the Scheduler is run, it uses the consideration_queue to determine which nodes are eligible to execute in each CONSIDERATION_SET_EXECUTION of a PASS, and then evaluates the Condition associated with each node in the current consideration_set to determine which should actually be assigned for execution.

Pseudocode:

consideration_queue <- list(toposort(graph))

reset TimeScale.ENVIRONMENT_STATE_UPDATE counters
while TimeScale.ENVIRONMENT_STATE_UPDATE are not satisfied
and TimeScale.ENVIRONMENT_SEQUENCE termination conditions are not satisfied:
    reset TimeScale.PASS counters
    cur_index <- 0

    while TimeScale.ENVIRONMENT_STATE_UPDATE termination conditions are not satisfied
    and TimeScale.ENVIRONMENT_SEQUENCE termination conditions are not satisfied
    and cur_index < len(consideration_queue):

        cur_consideration_set <- consideration_queue[cur_index]
        do:
            cur_consideration_set_has_changed <- False
            for cur_node in cur_consideration_set:
                if  cur_node not in cur_consideration_set_execution
                    and cur_node`s Condition is satisfied:

                    cur_consideration_set_has_changed <- True
                    add cur_node to cur_consideration_set_execution
                    increment execution and time counters
        while cur_consideration_set_has_changed

        if cur_consideration_set_execution is not empty or absolute time conditions are used:
            yield cur_consideration_set_execution

        increment cur_index
        increment time counters

    if all execution sets yielded were empty:
        yield an empty execution set

Execution

Note

This section covers normal scheduler execution (Scheduler.mode = SchedulingMode.STANDARD). See Exact Time Mode below for a description of exact time mode.

When a Scheduler is run, it provides a set of nodes that should be run next, based on their dependencies in the graph specification, and any Conditions, specified in the Scheduler’s constructor. For each call to the run method, the Scheduler sequentially evaluates its consideration sets in their order in the consideration_queue. For each set, it determines which nodes in the set are allowed to execute, based on whether their associated Condition has been met. Any node that does not have a Condition explicitly specified is assigned a Condition that causes it to be executed whenever it is under consideration and all its structural parents have been executed at least once since the node’s last execution. All of the nodes within a consideration_set that are allowed to execute comprise a CONSIDERATION_SET_EXECUTION of execution. These nodes are considered as executing simultaneously.

Note

The ordering of the nodes specified within a CONSIDERATION_SET_EXECUTION is arbitrary (and is irrelevant, as there are no graph dependencies among nodes within the same consideration_set). However, the execution of a node within a CONSIDERATION_SET_EXECUTION may trigger the execution of another node within its consideration_set, as in the example below:

   C
  ↗ ↖
 A   B

scheduler.add_condition(B, graph_scheduler.EveryNCalls(A, 2))
scheduler.add_condition(C, graph_scheduler.EveryNCalls(B, 1))

execution sets: [{A}, {A, B}, {C}, ...]

Since there are no graph dependencies between A and B, they may execute in the same CONSIDERATION_SET_EXECUTION. Morever, A and B are in the same consideration_set. Since B is specified to run every two times A runs, A’s second execution in the second CONSIDERATION_SET_EXECUTION allows B to run within that CONSIDERATION_SET_EXECUTION, rather than waiting for the next PASS.

For each CONSIDERATION_SET_EXECUTION, the Scheduler evaluates whether any specified termination Conditions have been met, and terminates if so. Otherwise, it returns the set of nodes that should be executed in the current CONSIDERATION_SET_EXECUTION. Each subsequent call to the run method returns the set of nodes in the following CONSIDERATION_SET_EXECUTION.

Processing of all of the consideration_sets in the consideration_queue constitutes a PASS of execution, over which every node in the graph has been considered for execution. Subsequent calls to the run method cycle back through the consideration_queue, evaluating the consideration_sets in the same order as previously. Different subsets of nodes within the same consideration_set may be assigned to execute on each PASS, since different Conditions may be satisfied.

The Scheduler continues to make PASSes through the consideration_queue until a termination Condition is satisfied. If no termination Conditions are specified, by default the Scheduler terminates an ENVIRONMENT_STATE_UPDATE when every node has been specified for execution at least once (corresponding to the AllHaveRun Condition). However, other termination Conditions can be specified, that may cause the Scheduler to terminate an ENVIRONMENT_STATE_UPDATE earlier or later (e.g., when the Condition for a particular node or set of nodes is met).

Termination Conditions

Termination conditions are basic Conditions that specify when the open-ended units of time - ENVIRONMENT_STATE_UPDATE and ENVIRONMENT_SEQUENCE - have ended. By default, the termination condition for an ENVIRONMENT_STATE_UPDATE is AllHaveRun, which is satisfied when all nodes have run at least once within the environment state update, and the termination condition for an ENVIRONMENT_SEQUENCE is when all of its constituent environment state updates have terminated. Graph structure conditions cannot be used as termination conditions.

Absolute Time

The scheduler supports scheduling of models of real-time systems in modes, both of which involve mapping real-time values to Time. The default mode is is most compatible with standard scheduling, but can cause some unexpected behavior in certain cases because it is inexact. The consideration queue remains intact, but as a result, actions specified by fixed times of absolute-time-based conditions (start and end of TimeInterval, and t of TimeTermination) may not occur at exactly the time specified. The simplest example of this situation involves a linear graph with two nodes:

>>> import graph_scheduler

>>> graph = {'A': set(), 'B': {'A'}}
>>> scheduler = graph_scheduler.Scheduler(graph=graph)

>>> scheduler.add_condition('A', graph_scheduler.TimeInterval(start=10))
>>> scheduler.add_condition('B', graph_scheduler.TimeInterval(start=10))

In standard mode, A and B are in different consideration sets, and so can never execute at the same time. At most one of A and B will start exactly at t=10ms, with the other starting at its next consideration after. There are many of these examples, and while it may be solveable in some cases, it is not a simple problem. So, Exact Time Mode exists as an alternative option for these cases, though it comes with its own drawbacks.

Note

Due to issues with floating-point precision, absolute time values in conditions and Time are limited to 8 decimal points. If more precision is needed, use fractions, where possible, or smaller units (e.g. microseconds instead of milliseconds).

Exact Time Mode

When Scheduler.mode is SchedulingMode.EXACT_TIME, the scheduler is capable of handling examples like the one above. In this mode, all nodes in the scheduler’s graph become members of the same consideration set, and may be executed at the same time for every consideration set execution, subject to the conditions specified. As a result, the guarantees in standard scheduling may not apply - that is, that all parent nodes get a chance to execute before their children, and that there exist no data dependencies (edges) between nodes in the same execution set. In exact time mode, all nodes will be in one [unordered] execution set. An ordering may be inferred by the original graph, however, using the indices in the original consideration queue. Additionally, non-absolute conditions like EveryNCalls may behave unexpectedly in some cases.

Examples

Please see Condition for a list of all supported Conditions and their behavior.

  • Basic phasing in a linear process:

    >>> import graph_scheduler
    
    >>> graph = {'A': set(), 'B': {'A'}, 'C': {'B'}}
    >>> scheduler = graph_scheduler.Scheduler(graph=graph)
    
    >>> # implicit condition of Always for A
    >>> scheduler.add_condition('B', graph_scheduler.EveryNCalls('A', 2))
    >>> scheduler.add_condition('C', graph_scheduler.EveryNCalls('B', 3))
    
    >>> # implicit AllHaveRun Termination condition
    >>> execution_sequence = list(scheduler.run())
    >>> execution_sequence
    [{'A'}, {'A'}, {'B'}, {'A'}, {'A'}, {'B'}, {'A'}, {'A'}, {'B'}, {'C'}]
    
  • Alternate basic phasing in a linear process:

    >>> graph = {'A': set(), 'B': {'A'}}
    >>> scheduler = graph_scheduler.Scheduler(graph=graph)
    
    >>> scheduler.add_condition(
    ...     'A',
    ...     graph_scheduler.Any(
    ...         graph_scheduler.AtPass(0),
    ...         graph_scheduler.EveryNCalls('B', 2)
    ...     )
    ... )
    
    >>> scheduler.add_condition(
    ...     'B',
    ...     graph_scheduler.Any(
    ...         graph_scheduler.EveryNCalls('A', 1),
    ...         graph_scheduler.EveryNCalls('B', 1)
    ...     )
    ... )
    >>> termination_conds = {
    ...     graph_scheduler.TimeScale.ENVIRONMENT_STATE_UPDATE: graph_scheduler.AfterNCalls('B', 4, time_scale=graph_scheduler.TimeScale.ENVIRONMENT_STATE_UPDATE)
    ... }
    >>> execution_sequence = list(scheduler.run(termination_conds=termination_conds))
    >>> execution_sequence
    [{'A'}, {'B'}, {'B'}, {'A'}, {'B'}, {'B'}]
    
  • Basic phasing in two processes:

    >>> graph = {'A': set(), 'B': set(), 'C': {'A', 'B'}}
    >>> scheduler = graph_scheduler.Scheduler(graph=graph)
    
    >>> scheduler.add_condition('A', graph_scheduler.EveryNPasses(1))
    >>> scheduler.add_condition('B', graph_scheduler.EveryNCalls('A', 2))
    >>> scheduler.add_condition(
    ...     'C',
    ...     graph_scheduler.Any(
    ...         graph_scheduler.AfterNCalls('A', 3),
    ...         graph_scheduler.AfterNCalls('B', 3)
    ...     )
    ... )
    >>> termination_conds = {
    ...     graph_scheduler.TimeScale.ENVIRONMENT_STATE_UPDATE: graph_scheduler.AfterNCalls('C', 4, time_scale=graph_scheduler.TimeScale.ENVIRONMENT_STATE_UPDATE)
    ... }
    >>> execution_sequence = list(scheduler.run(termination_conds=termination_conds))
    >>> execution_sequence 
    [{'A'}, {'A', 'B'}, {'A'}, {'C'}, {'A', 'B'}, {'C'}, {'A'}, {'C'}, {'A', 'B'}, {'C'}]
    

Class Reference

class graph_scheduler.scheduler.Scheduler(graph, conditions=None, termination_conds=None, default_execution_id=None, mode=SchedulingMode.STANDARD, default_absolute_time_unit=<Quantity(1, 'millisecond')>)

Generates an order of execution for nodes in a graph or graph specification dictionary, possibly determined by a set of Conditions.

Parameters:
  • graph (Dict[object: set(object)], networkx.DiGraph) –

    a directed acyclic graph (DAG). Specified as either:
    • a graph specification dictionary: each entry of the

    dictionary must be an object, and the value of each entry must be a set of zero or more objects that project directly to the key. - a networkx.DiGraph

  • conditions (ConditionSet) – set of Conditions that specify when individual nodes in graph execute and any dependencies among them.

  • mode (SchedulingMode[STANDARD|EXACT_TIME] : SchedulingMode.STANDARD) – sets the mode of scheduling: standard or exact time

  • default_absolute_time_unit (pint.Quantity : 1ms) – if not otherwise determined by any absolute conditions, specifies the absolute duration of a CONSIDERATION_SET_EXECUTION

conditions

the set of Conditions the Scheduler uses when running

Type:

ConditionSet

default_execution_id

the execution_id to use if none is supplied to run; a unique identifier to allow multiple schedulings independently. Must be hashable.

Type:

object

execution_list

the full history of consideration set executions the Scheduler has produced

Type:

list

consideration_queue

a list form of the Scheduler’s toposort ordering of its nodes

Type:

list

consideration_queue_indices

a dict mapping nodes to their position in the consideration_queue

Type:

dict

termination_conds

a mapping from TimeScales to Conditions that, when met, terminate the execution of the specified TimeScale. On set, update only for the TimeScales specified in the argument.

Type:

Dict[TimeScale: Condition]

mode

sets the mode of scheduling: standard or exact time

Type:

SchedulingMode

Default:

SchedulingMode.STANDARD

default_absolute_time_unit

if not otherwise determined by any absolute conditions, specifies the absolute duration of a CONSIDERATION_SET_EXECUTION

Type:

pint.Quantity

Default:

1ms

end_environment_sequence(execution_id=NotImplemented)

Signals that an ENVIRONMENT_SEQUENCE has completed

Parameters:

execution_id (optional) – Defaults to Scheduler.default_execution_id

add_graph_edge(sender, receiver)

Adds an edge to the graph from sender to receiver. Equivalent to add_condition(sender, AddEdgeTo(receiver)).

Parameters:
  • sender (Hashable) – sender of the new edge

  • receiver (Hashable) – receiver of the new edge

Returns:

the new condition added to implement the edge

Return type:

AddEdgeTo

remove_graph_edge(sender, receiver)

Removes an edge from the graph from sender to receiver if it exists. Equivalent to add_condition(receiver, RemoveEdgeFrom(sender)).

Parameters:
  • sender (Hashable) – sender of the edge to be removed

  • receiver (Hashable) – receiver of the edge to be removed

Returns:

the new condition added to implement the edge

Return type:

RemoveEdgeFrom

add_condition(owner, condition)

Adds a basic or graph structure Condition to the Scheduler.

If condition is basic, it will overwrite the current basic Condition for owner, if present. If you want to add multiple basic Conditions to a single owner, instead add a single Composite Condition to accurately specify the desired behavior.

If condition is structural, it will be applied on top of Scheduler.graph in the order it is added.

Parameters:
  • owner (node) – specifies the node with which the condition should be associated. condition will govern the execution behavior of owner

  • condition (ConditionBase) – specifies the Condition, associated with the owner to be added to the ConditionSet.

add_condition_set(conditions)

Adds a set of basic or graph structure Conditions (in the form of a dict or another ConditionSet) to the Scheduler.

Any basic Condition added here will overwrite the current basic Condition for a given owner, if present. If you want to add multiple basic Conditions to a single owner, instead add a single Composite Condition to accurately specify the desired behavior.

Any structural Condition added here will be applied on top of Scheduler.graph in the order they are returned by iteration over conditions.

Parameters:

conditions (dict[node: Condition], ConditionSet) –

specifies collection of Conditions to be added to this ConditionSet,

if a dict is provided:

each entry should map an owner node (the node whose execution behavior will be governed) to a Condition

remove_condition(owner_or_condition)

Removes the condition specified as or owned by owner_or_condition.

Parameters:

owner_or_condition (Union[Hashable, ConditionBase]) – Either a condition or the owner of a condition

Return type:

Optional[ConditionBase]

Returns:

The condition removed, or None if no condition removed

Raises:

ConditionError

  • when owner_or_condition is an owner and it owns multiple conditions - when owner_or_condition is a condition and its owner is None

run(termination_conds=None, execution_id=None, base_execution_id=None, skip_environment_state_update_time_increment=False, **kwargs)

run is a python generator, that when iterated over provides the next CONSIDERATION_SET_EXECUTION of executions at each iteration

Parameters:

termination_conds – (dict) - a mapping from TimeScales to Conditions that when met terminate the execution of the specified TimeScale

property graph: Dict[Hashable, Set[Hashable]]

The current graph used by this Scheduler, which is modified by any graph structure conditions added

property consideration_queue: List[Set[Hashable]]

The ordered list of sets of nodes in the graph, by the order in which they will be checked to ensure that all senders have a chance to run before their receivers

property consideration_queue_indices: Dict[Hashable, int]

A dictionary mapping the graph’s nodes to their position in the original consideration queue. This is the same as the consideration queue when not using SchedulingMode.EXACT_TIME.

class graph_scheduler.scheduler.SchedulingMode(value)
STANDARD

Standard Mode

EXACT_TIME

Exact time Mode