Multiple Identical Nodes

In cases where a particular node needs to be executed multiple times in the same pipeline, the Pipe model’s source_run_index and destination_run_index attributes should be used.

As an example for a pipeline running multiple identical nodes, we will create the following pipeline:

../../_images/multiple_identical_nodes.png

We will assume the existence of the three nodes:

  • addition: Adds two number to return “result”.
  • power: Raises base in the power of exp. In this case we have a “square” node, where exp is preconfigured as 2.
  • norm: Calculated the norm of the provided vector (x) using NumPy’s linalg.norm() method.

Each node has a designated run index in it’s bottom right corner, distinguishing it from other executions of this node.

To create the pipeline, we will first create a pipeline specification dictionary:

multiple_identical_nodes.py
ADDITION_NODE = {
    "analysis_version": "addition",
    "configuration": {},
}
SQUARE_NODE = {
    "analysis_version": "power",
    "configuration": {"exp": 2},
}
NORM_NODE = {
    "analysis_version": "norm",
    "configuration": {},
}
PIPELINE = {
    "title": "Multiple Identical Nodes",
    "description": "Simple pipeline with identical nodes.",
    "pipes": [
        # Addition #0 [result] --> Power #1 [base]
        {
            "source": ADDITION_NODE,
            "source_run_index": 0,
            "source_port": "result",
            "destination": SQUARE_NODE,
            "destination_run_index": 1,
            "destination_port": "base",
        },
        # Power #1 [result] --> Addition #2 [x]
        {
            "source": SQUARE_NODE,
            "source_run_index": 1,
            "source_port": "result",
            "destination": ADDITION_NODE,
            "destination_run_index": 2,
            "destination_port": "x",
        },
        # Addition #1 [result] --> Addition #2 [y]
        {
            "source": ADDITION_NODE,
            "source_run_index": 1,
            "source_port": "result",
            "destination": ADDITION_NODE,
            "destination_run_index": 2,
            "destination_port": "y",
        },
        # Power #0 [result] --> Norm #0 [x[0]]
        {
            "source": SQUARE_NODE,
            "source_run_index": 0,
            "source_port": "result",
            "destination": NORM_NODE,
            "destination_run_index": 0,
            "destination_port": "x",
            "index": 0,
        },
        # Addition #2 [result] --> Norm #0 [x[1]]
        {
            "source": ADDITION_NODE,
            "source_run_index": 2,
            "source_port": "result",
            "destination": NORM_NODE,
            "destination_run_index": 0,
            "destination_port": "x",
            "index": 1,
        },
    ],
}

Note that we also used the Pipe model’s index attribute to pass two outputs as a single list (vector) to the norm nodes x parameter.

To run the pipeline, we need to specify both x and y for the first two addition executions, as well as the base to the first power execution.

>>> from django_analyses.models import AnalysisVersion
>>> from django_analyses.models import Node
>>> from django_analyses.models import Pipeline
>>> from django_analyses.pipeline_runner import PipelineRunner

# Import the pipeline specification dictionary created above
>>> from multiple_identical_nodes import PIPELINE
# Create the pipeline
>>> pipeline = Pipeline.objects.from_dict(PIPELINE)

# Fetch the nodes for the inputs dictionary
>>> addition = AnalysisVersion.objects.get(analysis__title="addition")
>>> power = AnalysisVersion.objects.get(analysis__title="power")
>>> addition_inputs = [{"x": 1, "y": 1}, {"x": 1, "y": 1}]
>>> power_inputs = [{"base": 1}]
>>> inputs = {addition: addition_inputs, power: power_inputs}
>>> runner = PipelineRunner(pipeline=pipeline)
>>> results = runner.run(inputs=inputs)
power v1.0 (#0)
Inputs:
{'base': 1}
...
Outputs:
{'result': 1.0}
────────────────────

addition v1.0 (#0)
Inputs:
{'x': 1, 'y': 1}
...
Outputs:
{'result': 2.0}
────────────────────
────────────────────
power v1.0 (#1)
Inputs:
{'base': 2.0}
...
Outputs:
{'result': 4.0}
────────────────────
────────────────────
addition v1.0 (#1)
Inputs:
{'x': 1, 'y': 1}
...
Outputs:
{'result': 2.0}
────────────────────
────────────────────
addition v1.0 (#2)
Inputs:
{'y': 2.0, 'x': 4.0}
...
Outputs:
{'result': 6.0}
────────────────────
────────────────────
norm vNumPy:1.18 (#0)
Inputs:
{'x': [1.0, 6.0]}
...
Outputs:
{'norm': 6.08276253029822}
────────────────────
done!
>>> norm = AnalysisVersion.objects.get(analysis__title="norm")
>>> results[norm][0].get_output("norm")
6.082762530298219