Inference Pipeline
The Inference Pipeline interface is made for streaming and is the best route for real time use cases. It is an asynchronous interface that can consume many different video sources including local devices (like webcams), RTSP video streams, video files, etc. With this interface, you define the source of a video stream and sinks.
Quickstart
To use fine-tuned models with Inference, you will need a Roboflow API key. If you don't already have a Roboflow account, sign up for a free Roboflow account. Then, retrieve your API key from the Roboflow dashboard.
export ROBOFLOW_API_KEY=<your api key>
Install Inference:
pip install inference
Create an Inference Pipeline:
from inference import InferencePipeline
from inference.core.interfaces.stream.sinks import render_boxes
api_key = "YOUR_ROBOFLOW_API_KEY"
pipeline = InferencePipeline.init(
model_id="rfdetr-large",
video_reference="https://storage.googleapis.com/com-roboflow-marketing/inference/people-walking.mp4",
on_prediction=render_boxes,
api_key=api_key,
)
pipeline.start()
pipeline.join()
What is video reference?
Inference Pipelines can consume many different types of video streams:
- Device Id (integer): Providing an integer instructs a pipeline to stream video from a local device, like a webcam. Typically, built in webcams show up as device
0. - Video File (string): Providing the path to a video file will result in the pipeline reading every frame from the file.
- Video URL (string): Providing the path to a video URL is equivalent to providing a video file path.
- RTSP URL (string): Providing an RTSP URL will result in the pipeline streaming frames from an RTSP stream.
- List: A list of any of the above values.
How the InferencePipeline works
InferencePipeline spins a video source consumer thread for each provided video reference. Frames from videos are grabbed by video multiplexer. on_prediction(...) may work in SEQUENTIAL mode (only one element at once), or BATCH mode -- all batch elements at a time, controlled by the sink_mode parameter.
For static video files, InferencePipeline processes all frames by default. For streams, it is possible to drop frames from the buffers -- in favour of always processing the most recent data.
To enhance stability, in case of streams processing, video sources will be automatically re-connected once connectivity is lost during processing.
Custom inference logic
InferencePipeline supports running custom inference logic. Pass a custom callable as on_video_frame:
import os
import json
from inference.core.interfaces.camera.entities import VideoFrame
from inference import InferencePipeline
from typing import Any, List
TARGET_DIR = "./my_predictions"
class MyModel:
def __init__(self, weights_path: str):
self._model = your_model_loader(weights_path)
def infer(self, video_frames: List[VideoFrame]) -> List[Any]:
return self._model([v.image for v in video_frames])
def save_prediction(prediction: dict, video_frame: VideoFrame) -> None:
with open(os.path.join(TARGET_DIR, f"{video_frame.frame_id}.json")) as f:
json.dump(prediction, f)
my_model = MyModel("./my_model.pt")
pipeline = InferencePipeline.init_with_custom_logic(
video_reference="./my_video.mp4",
on_video_frame=my_model.infer,
on_prediction=save_prediction,
)
pipeline.start()
pipeline.join()
InferencePipeline with Roboflow Workflows
InferencePipeline can also run Roboflow Workflows:
from inference import InferencePipeline
from inference.core.interfaces.camera.entities import VideoFrame
from inference.core.interfaces.stream.sinks import render_boxes
def workflows_sink(
predictions: dict,
video_frame: VideoFrame,
) -> None:
render_boxes(
predictions["predictions"][0],
video_frame,
display_statistics=True,
)
workflow_specification = {
"specification": {
"version": "1.0",
"inputs": [
{"type": "InferenceImage", "name": "image"},
],
"steps": [
{
"type": "ObjectDetectionModel",
"name": "step_1",
"image": "$inputs.image",
"model_id": "rfdetr-small",
"confidence": 0.5,
}
],
"outputs": [
{"type": "JsonField", "name": "predictions", "selector": "$steps.step_1.*"},
],
}
}
pipeline = InferencePipeline.init_with_workflow(
video_reference="./my_video.mp4",
workflow_specification=workflow_specification,
on_prediction=workflows_sink,
image_input_name="image",
video_metadata_input_name="video_metadata"
)
pipeline.start()
pipeline.join()
You can initialise InferencePipeline with a workflow registered in the Roboflow App:
pipeline = InferencePipeline.init_with_workflow(
video_reference="./my_video.mp4",
workspace_name="<your_workspace>",
workflow_id="<your_workflow_id>",
on_prediction=workflows_sink,
)
You can profile your Workflow execution inside InferencePipeline when you export the environmental variable ENABLE_WORKFLOWS_PROFILING=True. Additionally, you can tune the number of frames you keep in profiler buffer via WORKFLOWS_PROFILER_BUFFER_SIZE. The init_with_workflow(...) method also accepts a profiling_directory parameter.
Sinks
Sinks define what an Inference Pipeline should do with each prediction. A sink is a function with the following signature:
from typing import Union, List, Optional
from inference.core.interfaces.camera.entities import VideoFrame
def on_prediction(
predictions: Union[dict, List[Optional[dict]]],
video_frame: Union[VideoFrame, List[Optional[VideoFrame]]],
) -> None:
for prediction, frame in zip(predictions, video_frame):
if prediction is None:
continue
# SOME PROCESSING
Built-in Sinks
render_boxes(...)-- Visualizes predictions and overlays them on a stream using Supervision annotators.UDPSink(...)-- Broadcasts predictions over a UDP port.multi_sink(...)-- Combines multiple sinks so multiple actions happen on a single inference result.VideoFileSink(...)-- Saves annotated frames to a video file.
Model Weights Download
Model weights are downloaded automatically the first time you run inference. You can pre-download weights:
from inference import get_model
get_model("rfdetr-base")
Other Pipeline Configuration
Inference Pipelines are highly configurable:
max_fps: Used to set the maximum rate of frame processing.confidence: Confidence threshold used for inference.iou_threshold: IoU threshold used for inference.video_source_properties: Optional dictionary of properties to configure the video source, corresponding to cv2 VideoCapture properties.
from inference import InferencePipeline
pipeline = InferencePipeline.init(
...,
max_fps=10,
confidence=0.75,
iou_threshold=0.4,
video_source_properties={
"frame_width": 1920.0,
"frame_height": 1080.0,
"fps": 30.0,
},
)