Stream Management API
This feature has been integrated into the core inference-server. To enable it, start the server with the environment variable ENABLE_STREAM_API=True. Note that API endpoints have changed, so client code will need to be updated -- however, the core capabilities remain the same.
We require a Roboflow Enterprise License to use this in production. See LICENSE.txt for details.
Overview
This feature is designed for users requiring inference on online video streams. It enhances the inference.Stream() and InferencePipeline() interfaces by introducing a management layer. The HTTP management interface allows users to remotely manage the state of inference pipelines.
This functionality proves beneficial in various scenarios, including but not limited to:
- Performing inference across multiple online video streams simultaneously.
- Executing inference on multiple devices that necessitate coordination.
- Establishing a monitoring layer to oversee video processing based on the
inferencepackage.
How to Run
In Docker - Using Docker Compose
CPU-based Devices
docker compose -f ./docker/dockerfiles/stream-management-api.compose-cpu.yaml up
GPU-based Devices
docker compose -f ./docker/dockerfiles/stream-management-api.compose-gpu.yaml up
Jetson Devices (JetPack 5.1.1)
docker-compose -f ./docker/dockerfiles/stream-management-api.compose-jetson.5.1.1.yaml up
In Docker - Running Containers Separately
CPU-based Devices
docker run -d --name stream_manager --network host roboflow/roboflow-inference-stream-manager-cpu:latest
docker run -d --name stream_management_api --network host roboflow/roboflow-inference-stream-management-api:latest
GPU-based Devices
docker run -d --name stream_manager --network host --runtime nvidia roboflow/roboflow-inference-stream-manager-gpu:latest
docker run -d --name stream_management_api --network host roboflow/roboflow-inference-stream-management-api:latest
Jetson Devices (JetPack 5.1.1)
docker run -d --name stream_manager --network host --runtime nvidia roboflow/roboflow-inference-stream-manager-jetson-5.1.1:latest
docker run -d --name stream_management_api --network host roboflow/roboflow-inference-stream-management-api:latest
Configuration Parameters
Stream Management API
STREAM_MANAGER_HOST— hostname for stream manager container (alter with container name if--network hostnot used or used against remote machine)STREAM_MANAGER_PORT— port to communicate with stream manager (must match with stream manager container)
Stream Manager
PORT— port at which server will be running- Mount a volume under the container's
/tmp/cacheto enable permanent storage of models for faster inference pipelines initialization - Camera device connectivity must be enabled at the container level
Bare-metal Deployment
python -m inference.enterprise.stream_management.manager.app # runs manager
python -m inference.enterprise.stream_management.api.app # runs management API
How to Integrate
After running the roboflow-inference-stream-management-api container, the HTTP API will be available at http://127.0.0.1:8080 (given that default configuration is used).
You can call wget http://127.0.0.1:8080/openapi.json to get the OpenAPI specification.
Example Python Client
import requests
from typing import Optional
URL = "http://127.0.0.1:8080"
def list_pipelines() -> dict:
response = requests.get(f"{URL}/list_pipelines")
return response.json()
def get_pipeline_status(pipeline_id: str) -> dict:
response = requests.get(f"{URL}/status/{pipeline_id}")
return response.json()
def pause_pipeline(pipeline_id: str) -> dict:
response = requests.post(f"{URL}/pause/{pipeline_id}")
return response.json()
def resume_pipeline(pipeline_id: str) -> dict:
response = requests.post(f"{URL}/resume/{pipeline_id}")
return response.json()
def terminate_pipeline(pipeline_id: str) -> dict:
response = requests.post(f"{URL}/terminate/{pipeline_id}")
return response.json()
def initialise_pipeline(
video_reference: str,
model_id: str,
api_key: str,
sink_host: str,
sink_port: int,
max_fps: Optional[int] = None,
) -> dict:
response = requests.post(
f"{URL}/initialise",
json={
"type": "init",
"sink_configuration": {
"type": "udp_sink",
"host": sink_host,
"port": sink_port,
},
"video_reference": video_reference,
"model_id": model_id,
"api_key": api_key,
"max_fps": max_fps,
},
)
return response.json()
The model id is composed of the string <project_id>/<version_id>. You can find these pieces of information by following the guide on workspace and project IDs.
Command Reference
init Command
{
"type": "init",
"model_id": "some/1",
"video_reference": "rtsp://192.168.0.1:554",
"sink_configuration": {
"type": "udp_sink",
"host": "192.168.0.3",
"port": 9999
},
"api_key": "YOUR-API-KEY",
"max_fps": 16,
"model_configuration": {
"type": "object-detection",
"class_agnostic_nms": true,
"confidence": 0.5,
"iou_threshold": 0.4,
"max_candidates": 300,
"max_detections": 3000
}
}
terminate Command
{
"type": "terminate",
"pipeline_id": "my_pipeline"
}
pause Command
{
"type": "mute",
"pipeline_id": "my_pipeline"
}
resume Command
{
"type": "resume",
"pipeline_id": "my_pipeline"
}
status Command
{
"type": "status",
"pipeline_id": "my_pipeline"
}
Important Notes
- The
initialise_pipeline()must be filled withvideo_referenceandsink_configurationin such a way that any resource (video file / camera device) or URI (stream reference, sink reference) must be reachable from the Stream Manager environment. For instance, inside docker containerslocalhostwill be bound into the container localhost, not the localhost of the machine hosting the container.