Stream Management API

Warning

This feature has been integrated into the core inference-server. To enable it, start the server with the environment variable ENABLE_STREAM_API=True. Note that API endpoints have changed, so client code will need to be updated -- however, the core capabilities remain the same.

Warning

We require a Roboflow Enterprise License to use this in production. See LICENSE.txt for details.

Overview

This feature is designed for users requiring inference on online video streams. It enhances the inference.Stream() and InferencePipeline() interfaces by introducing a management layer. The HTTP management interface allows users to remotely manage the state of inference pipelines.

This functionality proves beneficial in various scenarios, including but not limited to:

  • Performing inference across multiple online video streams simultaneously.
  • Executing inference on multiple devices that necessitate coordination.
  • Establishing a monitoring layer to oversee video processing based on the inference package.

How to Run

In Docker - Using Docker Compose

CPU-based Devices

docker compose -f ./docker/dockerfiles/stream-management-api.compose-cpu.yaml up

GPU-based Devices

docker compose -f ./docker/dockerfiles/stream-management-api.compose-gpu.yaml up

Jetson Devices (JetPack 5.1.1)

docker-compose -f ./docker/dockerfiles/stream-management-api.compose-jetson.5.1.1.yaml up

In Docker - Running Containers Separately

CPU-based Devices

docker run -d --name stream_manager --network host roboflow/roboflow-inference-stream-manager-cpu:latest
docker run -d --name stream_management_api --network host roboflow/roboflow-inference-stream-management-api:latest

GPU-based Devices

docker run -d --name stream_manager --network host --runtime nvidia roboflow/roboflow-inference-stream-manager-gpu:latest
docker run -d --name stream_management_api --network host roboflow/roboflow-inference-stream-management-api:latest

Jetson Devices (JetPack 5.1.1)

docker run -d --name stream_manager --network host --runtime nvidia roboflow/roboflow-inference-stream-manager-jetson-5.1.1:latest
docker run -d --name stream_management_api --network host roboflow/roboflow-inference-stream-management-api:latest

Configuration Parameters

Stream Management API

  • STREAM_MANAGER_HOST — hostname for stream manager container (alter with container name if --network host not used or used against remote machine)
  • STREAM_MANAGER_PORT — port to communicate with stream manager (must match with stream manager container)

Stream Manager

  • PORT — port at which server will be running
  • Mount a volume under the container's /tmp/cache to enable permanent storage of models for faster inference pipelines initialization
  • Camera device connectivity must be enabled at the container level

Bare-metal Deployment

python -m inference.enterprise.stream_management.manager.app  # runs manager
python -m inference.enterprise.stream_management.api.app  # runs management API

How to Integrate

After running the roboflow-inference-stream-management-api container, the HTTP API will be available at http://127.0.0.1:8080 (given that default configuration is used).

You can call wget http://127.0.0.1:8080/openapi.json to get the OpenAPI specification.

Example Python Client

import requests
from typing import Optional

URL = "http://127.0.0.1:8080"

def list_pipelines() -> dict:
    response = requests.get(f"{URL}/list_pipelines")
    return response.json()

def get_pipeline_status(pipeline_id: str) -> dict:
    response = requests.get(f"{URL}/status/{pipeline_id}")
    return response.json()

def pause_pipeline(pipeline_id: str) -> dict:
    response = requests.post(f"{URL}/pause/{pipeline_id}")
    return response.json()

def resume_pipeline(pipeline_id: str) -> dict:
    response = requests.post(f"{URL}/resume/{pipeline_id}")
    return response.json()

def terminate_pipeline(pipeline_id: str) -> dict:
    response = requests.post(f"{URL}/terminate/{pipeline_id}")
    return response.json()

def initialise_pipeline(
    video_reference: str,
    model_id: str,
    api_key: str,
    sink_host: str,
    sink_port: int,
    max_fps: Optional[int] = None,
) -> dict:
    response = requests.post(
        f"{URL}/initialise",
        json={
            "type": "init",
            "sink_configuration": {
                "type": "udp_sink",
                "host": sink_host,
                "port": sink_port,
            },
            "video_reference": video_reference,
            "model_id": model_id,
            "api_key": api_key,
            "max_fps": max_fps,
        },
    )
    return response.json()
Note

The model id is composed of the string <project_id>/<version_id>. You can find these pieces of information by following the guide on workspace and project IDs.

Command Reference

init Command

{
  "type": "init",
  "model_id": "some/1",
  "video_reference": "rtsp://192.168.0.1:554",
  "sink_configuration": {
    "type": "udp_sink",
    "host": "192.168.0.3",
    "port": 9999
  },
  "api_key": "YOUR-API-KEY",
  "max_fps": 16,
  "model_configuration": {
    "type": "object-detection",
    "class_agnostic_nms": true,
    "confidence": 0.5,
    "iou_threshold": 0.4,
    "max_candidates": 300,
    "max_detections": 3000
  }
}

terminate Command

{
  "type": "terminate",
  "pipeline_id": "my_pipeline"
}

pause Command

{
  "type": "mute",
  "pipeline_id": "my_pipeline"
}

resume Command

{
  "type": "resume",
  "pipeline_id": "my_pipeline"
}

status Command

{
  "type": "status",
  "pipeline_id": "my_pipeline"
}

Important Notes

  • The initialise_pipeline() must be filled with video_reference and sink_configuration in such a way that any resource (video file / camera device) or URI (stream reference, sink reference) must be reachable from the Stream Manager environment. For instance, inside docker containers localhost will be bound into the container localhost, not the localhost of the machine hosting the container.