Data Representations in Workflows

Many frameworks enforce standard data types for developers to work with, and the Workflows ecosystem is no exception. While the kind in a Workflow represents a high-level abstraction of the data being passed through, it is important to understand the specific data types that will be provided to the WorkflowBlock.run(...) method when building Workflow blocks.

Batch

When a Workflow block declares batch processing, it uses a special container type called Batch. All batch-oriented parameters are wrapped with Batch[X], where X is the data type:

from inference.core.workflows.execution_engine.entities.base import Batch
from inference.core.workflows.prototypes.block import BlockResult

def run(self, x: Batch[int], y: Batch[float]) -> BlockResult:
   pass

The Batch type functions similarly to a Python list, with one key difference: it is read-only. You cannot modify its elements, nor can you add or remove elements. However, several useful operations are available:

Iteration Through Elements

def iterate(batch: Batch[int]) -> None:
    for element in batch:
        print(element)

Zipping Multiple Batches

Note

The Execution Engine ensures that batches provided to the run method are of equal size, preventing any loss of elements due to unequal batch sizes during iteration.

def zip_batches(batch_1: Batch[int], batch_2: Batch[int]) -> None:
    for element_1, element_2 in zip(batch_1, batch_2):
        print(element_1, element_2)

Getting Batch Element Indices

def discover_indices(batch: Batch[int]) -> None:
    for index in batch.indices:
        print(index)  # e.g., (0,) for 1D batch, (1, 3) for 2D nested batch

Iterating with Both Elements and Their Indices

def iterate_with_indices(batch: Batch[int]) -> None:
    for index, element in batch.iter_with_indices():
        print(index, element)

WorkflowImageData

WorkflowImageData is a dataclass that encapsulates an image along with its metadata, providing useful methods to manipulate the image representation within a Workflow block.

from inference.core.workflows.execution_engine.entities.base import WorkflowImageData

def operate_on_image(workflow_image: WorkflowImageData) -> None:
    # Getting an np.ndarray representing the image.
    numpy_image = workflow_image.numpy_image

    # Getting base64-encoded JPEG representation of the image.
    base64_image: str = workflow_image.base64_image

    # Converting the image into a format compatible with inference models.
    inference_format = workflow_image.to_inference_format()

    # Accessing metadata related to the parent image.
    parent_metadata = workflow_image.parent_metadata

    # Retrieving VideoMetadata object.
    video_metadata = workflow_image.video_metadata

To preserve metadata while transforming images:

def transform_image(image: WorkflowImageData) -> WorkflowImageData:
    transformed_image = some_transformation(image.numpy_image)
    return WorkflowImageData.copy_and_replace(
        origin_image_data=image,
        numpy_image=transformed_image,
    )

For cropping operations that increase dimensionality:

result_crop = WorkflowImageData.create_crop(
    origin_image_data=image,
    crop_identifier=crop_id,
    cropped_image=cropped_image,
    offset_x=x_min,
    offset_y=y_min,
)

VideoMetadata

Warning

The video_metadata kind is deprecated. VideoMetadata data representation became a member of WorkflowImageData in Execution Engine v1.2.0 (inference release v0.23.0).

VideoMetadata is a dataclass that provides the following metadata about video frame and video source:

from inference.core.workflows.execution_engine.entities.base import VideoMetadata

def inspect_video_metadata(video_metadata: VideoMetadata) -> None:
    print(video_metadata.video_identifier)  # Identifier string for video
    print(video_metadata.frame_number)      # Sequential number of the frame
    print(video_metadata.frame_timestamp)   # The timestamp of video frame
    print(video_metadata.fps)               # FPS value (optional)
    print(video_metadata.measured_fps)      # Measured FPS of live stream (optional)
    print(video_metadata.comes_from_video_file)  # Flag for video file vs stream