Inference Pipeline

The Inference Pipeline interface is made for streaming and is the best route for real time use cases. It is an asynchronous interface that can consume many different video sources including local devices (like webcams), RTSP video streams, video files, etc. With this interface, you define the source of a video stream and sinks.

Quickstart

To use fine-tuned models with Inference, you will need a Roboflow API key. If you don't already have a Roboflow account, sign up for a free Roboflow account. Then, retrieve your API key from the Roboflow dashboard.

export ROBOFLOW_API_KEY=<your api key>

Install Inference:

pip install inference

Create an Inference Pipeline:

from inference import InferencePipeline
from inference.core.interfaces.stream.sinks import render_boxes

api_key = "YOUR_ROBOFLOW_API_KEY"

pipeline = InferencePipeline.init(
    model_id="rfdetr-large",
    video_reference="https://storage.googleapis.com/com-roboflow-marketing/inference/people-walking.mp4",
    on_prediction=render_boxes,
    api_key=api_key,
)

pipeline.start()
pipeline.join()

What is video reference?

Inference Pipelines can consume many different types of video streams:

  • Device Id (integer): Providing an integer instructs a pipeline to stream video from a local device, like a webcam. Typically, built in webcams show up as device 0.
  • Video File (string): Providing the path to a video file will result in the pipeline reading every frame from the file.
  • Video URL (string): Providing the path to a video URL is equivalent to providing a video file path.
  • RTSP URL (string): Providing an RTSP URL will result in the pipeline streaming frames from an RTSP stream.
  • List: A list of any of the above values.

How the InferencePipeline works

InferencePipeline spins a video source consumer thread for each provided video reference. Frames from videos are grabbed by video multiplexer. on_prediction(...) may work in SEQUENTIAL mode (only one element at once), or BATCH mode -- all batch elements at a time, controlled by the sink_mode parameter.

For static video files, InferencePipeline processes all frames by default. For streams, it is possible to drop frames from the buffers -- in favour of always processing the most recent data.

To enhance stability, in case of streams processing, video sources will be automatically re-connected once connectivity is lost during processing.

Custom inference logic

InferencePipeline supports running custom inference logic. Pass a custom callable as on_video_frame:

import os
import json
from inference.core.interfaces.camera.entities import VideoFrame
from inference import InferencePipeline
from typing import Any, List

TARGET_DIR = "./my_predictions"

class MyModel:

  def __init__(self, weights_path: str):
    self._model = your_model_loader(weights_path)

  def infer(self, video_frames: List[VideoFrame]) -> List[Any]:
    return self._model([v.image for v in video_frames])

def save_prediction(prediction: dict, video_frame: VideoFrame) -> None:
  with open(os.path.join(TARGET_DIR, f"{video_frame.frame_id}.json")) as f:
    json.dump(prediction, f)

my_model = MyModel("./my_model.pt")

pipeline = InferencePipeline.init_with_custom_logic(
  video_reference="./my_video.mp4",
  on_video_frame=my_model.infer,
  on_prediction=save_prediction,
)

pipeline.start()
pipeline.join()

InferencePipeline with Roboflow Workflows

InferencePipeline can also run Roboflow Workflows:

from inference import InferencePipeline
from inference.core.interfaces.camera.entities import VideoFrame
from inference.core.interfaces.stream.sinks import render_boxes

def workflows_sink(
    predictions: dict,
    video_frame: VideoFrame,
) -> None:
    render_boxes(
        predictions["predictions"][0],
        video_frame,
        display_statistics=True,
    )

workflow_specification = {
    "specification": {
        "version": "1.0",
        "inputs": [
            {"type": "InferenceImage", "name": "image"},
        ],
        "steps": [
            {
                "type": "ObjectDetectionModel",
                "name": "step_1",
                "image": "$inputs.image",
                "model_id": "rfdetr-small",
                "confidence": 0.5,
            }
        ],
        "outputs": [
            {"type": "JsonField", "name": "predictions", "selector": "$steps.step_1.*"},
        ],
    }
}

pipeline = InferencePipeline.init_with_workflow(
    video_reference="./my_video.mp4",
    workflow_specification=workflow_specification,
    on_prediction=workflows_sink,
    image_input_name="image",
    video_metadata_input_name="video_metadata"
)

pipeline.start()
pipeline.join()

You can initialise InferencePipeline with a workflow registered in the Roboflow App:

pipeline = InferencePipeline.init_with_workflow(
    video_reference="./my_video.mp4",
    workspace_name="<your_workspace>",
    workflow_id="<your_workflow_id>",
    on_prediction=workflows_sink,
)
Note

You can profile your Workflow execution inside InferencePipeline when you export the environmental variable ENABLE_WORKFLOWS_PROFILING=True. Additionally, you can tune the number of frames you keep in profiler buffer via WORKFLOWS_PROFILER_BUFFER_SIZE. The init_with_workflow(...) method also accepts a profiling_directory parameter.

Sinks

Sinks define what an Inference Pipeline should do with each prediction. A sink is a function with the following signature:

from typing import Union, List, Optional
from inference.core.interfaces.camera.entities import VideoFrame

def on_prediction(
    predictions: Union[dict, List[Optional[dict]]],
    video_frame: Union[VideoFrame, List[Optional[VideoFrame]]],
) -> None:
    for prediction, frame in zip(predictions, video_frame):
        if prediction is None:
            continue
        # SOME PROCESSING

Built-in Sinks

  • render_boxes(...) -- Visualizes predictions and overlays them on a stream using Supervision annotators.
  • UDPSink(...) -- Broadcasts predictions over a UDP port.
  • multi_sink(...) -- Combines multiple sinks so multiple actions happen on a single inference result.
  • VideoFileSink(...) -- Saves annotated frames to a video file.

Model Weights Download

Model weights are downloaded automatically the first time you run inference. You can pre-download weights:

from inference import get_model
get_model("rfdetr-base")

Other Pipeline Configuration

Inference Pipelines are highly configurable:

  • max_fps: Used to set the maximum rate of frame processing.
  • confidence: Confidence threshold used for inference.
  • iou_threshold: IoU threshold used for inference.
  • video_source_properties: Optional dictionary of properties to configure the video source, corresponding to cv2 VideoCapture properties.
from inference import InferencePipeline
pipeline = InferencePipeline.init(
    ...,
    max_fps=10,
    confidence=0.75,
    iou_threshold=0.4,
    video_source_properties={
        "frame_width": 1920.0,
        "frame_height": 1080.0,
        "fps": 30.0,
    },
)