Running Workflows

There are several ways to run a Workflow, including:

  • Request to the HTTP API (Roboflow Hosted API or a self-hosted inference server) running the Workflows Execution Engine
  • Video processing using InferencePipeline
  • The inference Python package, where you can use the Workflows Execution Engine directly in your Python app

HTTP API Request

This way of running Workflows is ideal for clients who:

  • Want to use Workflows as a stand-alone, independent part of their systems.
  • Maintain their main applications in languages other than Python.
  • Prefer to offload compute-heavy tasks to dedicated servers.

Roboflow offers a hosted HTTP API that clients can use without needing their own infrastructure. Alternatively, the inference server (which can run Workflows) can be set up on-site if needed.

Running Workflows with the Roboflow Hosted API has several limitations:

  • Workflow runtime is limited to 20 seconds.
  • The response payload is limited to 6 MB. This means that some blocks — especially visualization blocks — may fail if you use too many of them or if the input images are too large.

Integrating via HTTP is simple: just send a request to the server. You can do this using a HTTP client library in your preferred programming language, leverage our Inference SDK in Python, or even use cURL.

To run your workflow created in Roboflow APP with cURL, use the following command:

curl --location 'https://detect.roboflow.com/infer/workflows/<your-workspace-name>/<your-workflow-id>' \
    --header 'Content-Type: application/json' \
    --data '{
    "api_key": "<YOUR-API-KEY>",
    "inputs": {
        "image": {"type": "url", "value": "https://your-image-url"},
        "parameter": "some-value"
    }
}'

Please note that:

  • <your-workspace-name>, <your-workflow-id>, <YOUR-API-KEY> must be replaced with actual values valid for your Roboflow account.
  • Keys of inputs dictionary are dictated by your Workflow; names may differ dependent on parameters you define.
  • Values of inputs dictionary are also dependent on your Workflow definition — inputs declared as WorkflowImage have special structure — dictionary with type and value keys — using cURL your options are url and base64 as type with value adjusted accordingly.

Video Processing Using InferencePipeline

For use cases involving video files or streams, we recommend using InferencePipeline, which can run Workflows on each video frame.

This option is ideal for clients who:

  • Need low-latency, high-throughput video processing.
  • Design workflows with single-frame processing times that meet real-time requirements (though complex workflows might not be suitable for real-time processing).
from inference import InferencePipeline
from inference.core.interfaces.camera.entities import VideoFrame

def my_sink(result: dict, video_frame: VideoFrame):
    print(result)


pipeline = InferencePipeline.init_with_workflow(
    api_key="<YOUR-API-KEY>",
    workspace_name="<your-workspace-name>",
    workflow_id="<your-workflow-id>",
    video_reference=0,
    on_prediction=my_sink,
    image_input_name="image",
)
pipeline.start()
pipeline.join()

Please note that:

  • <your-workspace-name>, <your-workflow-id>, <YOUR-API-KEY> must be replaced with actual values valid for your Roboflow account.
  • Your Workflow must accept video frames under the image parameter.
Note

Make sure you have inference or inference-gpu package installed in your Python environment.

Batch Processing Using inference-cli

inference-cli is a command-line wrapper library around inference. You can use it to process your data using Workflows without writing a single line of code. You simply point the data to be processed, select your Workflow and specify where results should be saved.

inference workflows process-images-directory \
    -i {your_input_directory} \
    -o {your_output_directory} \
    --workspace_name {your-roboflow-workspace-url} \
    --workflow_id {your-workflow-id} \
    --api-key {your_roboflow_api_key}
Note

Make sure you have inference or inference-cli package installed in your Python environment.

Workflows in Python Package

Workflows Compiler and Execution Engine are bundled with the inference package. Running Workflow directly may be ideal for clients who:

  • Maintain their applications in Python.
  • Agree for resource-heavy computations directly in their app.
  • Want to avoid additional latency and errors related to sending HTTP requests.
  • Expect full control over Workflow execution.
from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS
from inference.core.managers.base import ModelManager
from inference.core.workflows.core_steps.common.entities import StepExecutionMode
from inference.core.env import MAX_ACTIVE_MODELS
from inference.core.managers.decorators.fixed_size_cache import WithFixedSizeCache
from inference.core.registries.roboflow import RoboflowModelRegistry
from inference.models.utils import ROBOFLOW_MODEL_TYPES

model_registry = RoboflowModelRegistry(ROBOFLOW_MODEL_TYPES)
model_manager = ModelManager(model_registry=model_registry)
model_manager = WithFixedSizeCache(model_manager, max_size=MAX_ACTIVE_MODELS)

OBJECT_DETECTION_WORKFLOW = {
    "version": "1.0",
    "inputs": [
        {"type": "WorkflowImage", "name": "image"},
        {"type": "WorkflowParameter", "name": "model_id"},
        {"type": "WorkflowParameter", "name": "confidence", "default_value": 0.3},
    ],
    "steps": [
        {
            "type": "RoboflowObjectDetectionModel",
            "name": "detection",
            "image": "$inputs.image",
            "model_id": "$inputs.model_id",
            "confidence": "$inputs.confidence",
        }
    ],
    "outputs": [
        {"type": "JsonField", "name": "result", "selector": "$steps.detection.*"}
    ],
}

workflow_init_parameters = {
    "workflows_core.model_manager": model_manager,
    "workflows_core.api_key": "<YOUR-API-KEY>",
    "workflows_core.step_execution_mode": StepExecutionMode.LOCAL,
}

execution_engine = ExecutionEngine.init(
    workflow_definition=OBJECT_DETECTION_WORKFLOW,
    init_parameters=workflow_init_parameters,
    max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS,
)

result = execution_engine.run(
    runtime_parameters={
        "image": [your_image],
        "model_id": "rfdetr-small",
    }
)