Creating Workflow Blocks

Workflows blocks development requires an understanding of the Workflow Ecosystem. Before diving deeper into the details, let's summarize the required knowledge:

  • Understanding of Workflow execution, in particular:
    • What is the relation of Workflow blocks and steps in Workflow definition
    • How Workflow blocks and their manifests are used by the Workflows Compiler
    • What is the dimensionality level of batch-oriented data passing through Workflow
    • How the Execution Engine interacts with step, regarding its inputs and outputs
    • What is the nature and role of Workflow kinds
    • Understanding how pydantic works

Environment Setup

Creating a Workflow block is simply a matter of defining a Python class that implements a specific interface. We recommend following these steps as part of the standard development process:

  1. Set up the conda environment and install main dependencies of inference, as described in the inference contributor guide.
  2. Familiarize yourself with the organization of the Workflows codebase.
  3. Create a minimalistic block -- start by implementing a simple block manifest and basic logic.
  4. Add the block to the plugin -- once your block is created, add it to the list of blocks exported from the plugin.
  5. Iterate and refine your block -- continue developing and running your block until you are satisfied with the results.

Running Your Blocks Using Workflows UI

We recommend running the inference server with a mounted volume:

docker run -p 9001:9001 \
   -v ./inference:/app/inference \
   roboflow/roboflow-inference-server-cpu:latest

and connecting your local server to Roboflow UI to quickly run previews.

Prototypes

To create a Workflow block you need some imports from the core of the Workflows library:

from inference.core.workflows.execution_engine.entities.base import (
    Batch,
    OutputDefinition,
    WorkflowImageData,
)

from inference.core.workflows.prototypes.block import (
    BlockResult,
    WorkflowBlock,
    WorkflowBlockManifest,
)

from inference.core.workflows.execution_engine.entities.types import *

The most important are:

  • WorkflowBlock -- base class for your block
  • WorkflowBlockManifest -- base class for block manifest
Warning

You may have noticed that we recommend importing the Batch and WorkflowImageData classes, which are fundamental components used when constructing building blocks. For a deeper understanding of how these classes fit into the overall architecture, refer to the Data Representations page.

Block Manifest

A manifest defines a prototype for step declaration that can be placed in a Workflow definition. It:

  • Uses pydantic to power syntax parsing of Workflows definitions.
  • Defines Data Bindings: specifies which fields are selectors for data flowing through the workflow.
  • Describes Block Outputs: outlines the outputs that the block will produce.
  • Specifies Dimensionality: details the properties related to input and output dimensionality.
  • Ensures Compatibility: dictates the compatibility with different Execution Engine versions. See versioning.

Scaffolding for Manifest

from typing import Literal
from inference.core.workflows.prototypes.block import (
    WorkflowBlockManifest,
)

class ImagesSimilarityManifest(WorkflowBlockManifest):
    type: Literal["my_plugin/images_similarity@v1"]
    name: str

This defines two special fields: type (the pydantic type discriminator for the Compiler) and name (the unique step identifier).

Adding Inputs

from inference.core.workflows.execution_engine.entities.types import (
    Selector,
    IMAGE_KIND,
)

class ImagesSimilarityManifest(WorkflowBlockManifest):
    type: Literal["my_plugin/images_similarity@v1"]
    name: str
    image_1: Selector(kind=[IMAGE_KIND]) = Field(
        description="First image to calculate similarity",
    )
    image_2: Selector(kind=[IMAGE_KIND]) = Field(
        description="Second image to calculate similarity",
    )

Adding Parameters

similarity_threshold: Union[
    float,
    Selector(kind=[FLOAT_ZERO_TO_ONE_KIND]),
] = Field(
    default=0.4,
    description="Threshold to assume that images are similar",
)

Declaring Block Outputs

@classmethod
def describe_outputs(cls) -> List[OutputDefinition]:
    return [
      OutputDefinition(
        name="images_match",
        kind=[BOOLEAN_KIND],
      ),
    ]

@classmethod
def get_execution_engine_compatibility(cls) -> Optional[str]:
    return ">=1.3.0,<2.0.0"

Definition of Block Class

class ImagesSimilarityBlock(WorkflowBlock):

    def __init__(self):
        self._sift = cv2.SIFT_create()
        self._matcher = cv2.FlannBasedMatcher(dict(algorithm=1, trees=5), dict(checks=50))

    @classmethod
    def get_manifest(cls) -> Type[WorkflowBlockManifest]:
        return ImagesSimilarityManifest

    def run(
        self,
        image_1: WorkflowImageData,
        image_2: WorkflowImageData,
        similarity_threshold: float,
    ) -> BlockResult:
        image_1_gray = cv2.cvtColor(image_1.numpy_image, cv2.COLOR_BGR2GRAY)
        image_2_gray = cv2.cvtColor(image_2.numpy_image, cv2.COLOR_BGR2GRAY)
        kp_1, des_1 = self._sift.detectAndCompute(image_1_gray, None)
        kp_2, des_2 = self._sift.detectAndCompute(image_2_gray, None)
        matches = self._matcher.knnMatch(des_1, des_2, k=2)
        good_matches = []
        for m, n in matches:
            if m.distance < similarity_threshold * n.distance:
                good_matches.append(m)
        return {
            "images_match": len(good_matches) > 0,
        }

Key points:

  • The __init__ constructor initializes state that lives through consecutive invocations of run(...).
  • The run(...) method uses WorkflowImageData abstraction with numpy_image property to get np.ndarray.
  • The result dictionary keys must match the names of outputs declared in the manifest.

Exposing Block in a Plugin

Add the block class to the list returned from your plugin's load_blocks(...) function:

from my_plugin.images_similarity.v1 import ImagesSimilarityBlock

def load_blocks():
    return [ImagesSimilarityBlock]

Advanced Topics

Blocks Processing Batches of Inputs

Sometimes, performance of your block may benefit if all input data is processed at once as batch. This may happen for models running on GPU. To enable this, implement get_parameters_accepting_batches(...) in your manifest:

@classmethod
def get_parameters_accepting_batches(cls) -> bool:
    return ["image_1", "image_2"]

Then the run(...) method receives Batch[WorkflowImageData] instead of WorkflowImageData, and you iterate over batch elements.

Flow-Control Blocks

Flow-control blocks are blocks that declare compatibility with step selectors in their manifest. They cannot register outputs and return FlowControl objects instead, specifying which next steps should be executed for a given batch element or the whole workflow execution.

For detailed examples and more advanced topics, see the inference repository source code.