Compare commits

..

5 Commits

Author SHA1 Message Date
Josh Hawkins
f589a60cde add description to api endpoint 2026-01-22 11:11:08 -06:00
Josh Hawkins
b2ceb15db4 frontend 2026-01-22 11:07:26 -06:00
Josh Hawkins
b569f30820 tests 2026-01-22 10:48:20 -06:00
Josh Hawkins
db485ddafa remove frame extraction logic 2026-01-22 10:48:08 -06:00
Josh Hawkins
a6c5f4a82b use latest preview frame for latest image when camera is offline 2026-01-22 10:42:23 -06:00
8 changed files with 300 additions and 199 deletions

View File

@@ -1,12 +1,10 @@
"""Chat and LLM tool calling APIs."""
import base64
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
import cv2
from fastapi import APIRouter, Body, Depends, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
@@ -89,28 +87,6 @@ def get_tool_definitions() -> List[Dict[str, Any]]:
"required": [],
},
},
{
"type": "function",
"function": {
"name": "get_live_context",
"description": (
"Get the current detection information for a camera: objects being tracked, "
"zones, timestamps. Use this to understand what is visible in the live view. "
"Call this when the user has included a live image (via include_live_image) or "
"when answering questions about what is happening right now on a specific camera."
),
"parameters": {
"type": "object",
"properties": {
"camera": {
"type": "string",
"description": "Camera name to get live context for.",
},
},
"required": ["camera"],
},
},
},
]
@@ -231,98 +207,6 @@ async def execute_tool(
)
async def _execute_get_live_context(
request: Request,
camera: str,
allowed_cameras: List[str],
) -> Dict[str, Any]:
if camera not in allowed_cameras:
return {
"error": f"Camera '{camera}' not found or access denied",
}
if camera not in request.app.frigate_config.cameras:
return {
"error": f"Camera '{camera}' not found",
}
try:
frame_processor = request.app.detected_frames_processor
camera_state = frame_processor.camera_states.get(camera)
if camera_state is None:
return {
"error": f"Camera '{camera}' state not available",
}
tracked_objects_dict = {}
with camera_state.current_frame_lock:
tracked_objects = camera_state.tracked_objects.copy()
frame_time = camera_state.current_frame_time
for obj_id, tracked_obj in tracked_objects.items():
obj_dict = tracked_obj.to_dict()
if obj_dict.get("frame_time") == frame_time:
tracked_objects_dict[obj_id] = {
"label": obj_dict.get("label"),
"zones": obj_dict.get("current_zones", []),
"sub_label": obj_dict.get("sub_label"),
"stationary": obj_dict.get("stationary", False),
}
return {
"camera": camera,
"timestamp": frame_time,
"detections": list(tracked_objects_dict.values()),
}
except Exception as e:
logger.error(f"Error executing get_live_context: {e}", exc_info=True)
return {
"error": f"Error getting live context: {str(e)}",
}
async def _get_live_frame_image_url(
request: Request,
camera: str,
allowed_cameras: List[str],
) -> Optional[str]:
"""
Fetch the current live frame for a camera as a base64 data URL.
Returns None if the frame cannot be retrieved. Used when include_live_image
is set to attach the image to the first user message.
"""
if (
camera not in allowed_cameras
or camera not in request.app.frigate_config.cameras
):
return None
try:
frame_processor = request.app.detected_frames_processor
if camera not in frame_processor.camera_states:
return None
frame = frame_processor.get_current_frame(camera, {})
if frame is None:
return None
height, width = frame.shape[:2]
max_dimension = 1024
if height > max_dimension or width > max_dimension:
scale = max_dimension / max(height, width)
frame = cv2.resize(
frame,
(int(width * scale), int(height * scale)),
interpolation=cv2.INTER_AREA,
)
_, img_encoded = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
b64 = base64.b64encode(img_encoded.tobytes()).decode("utf-8")
return f"data:image/jpeg;base64,{b64}"
except Exception as e:
logger.debug("Failed to get live frame for %s: %s", camera, e)
return None
async def _execute_tool_internal(
tool_name: str,
arguments: Dict[str, Any],
@@ -347,11 +231,6 @@ async def _execute_tool_internal(
except (json.JSONDecodeError, AttributeError) as e:
logger.warning(f"Failed to extract tool result: {e}")
return {"error": "Failed to parse tool result"}
elif tool_name == "get_live_context":
camera = arguments.get("camera")
if not camera:
return {"error": "Camera parameter is required"}
return await _execute_get_live_context(request, camera, allowed_cameras)
else:
return {"error": f"Unknown tool: {tool_name}"}
@@ -398,43 +277,13 @@ async def chat_completion(
current_datetime = datetime.now(timezone.utc)
current_date_str = current_datetime.strftime("%Y-%m-%d")
current_time_str = current_datetime.strftime("%H:%M:%S %Z")
cameras_info = []
config = request.app.frigate_config
for camera_id in allowed_cameras:
if camera_id not in config.cameras:
continue
camera_config = config.cameras[camera_id]
friendly_name = (
camera_config.friendly_name
if camera_config.friendly_name
else camera_id.replace("_", " ").title()
)
cameras_info.append(f" - {friendly_name} (ID: {camera_id})")
cameras_section = ""
if cameras_info:
cameras_section = (
"\n\nAvailable cameras:\n"
+ "\n".join(cameras_info)
+ "\n\nWhen users refer to cameras by their friendly name (e.g., 'Back Deck Camera'), use the corresponding camera ID (e.g., 'back_deck_cam') in tool calls."
)
live_image_note = ""
if body.include_live_image:
live_image_note = (
f"\n\nThe first user message includes a live image from camera "
f"'{body.include_live_image}'. Use get_live_context for that camera to get "
"current detection details (objects, zones) to aid in understanding the image."
)
system_prompt = f"""You are a helpful assistant for Frigate, a security camera NVR system. You help users answer questions about their cameras, detected objects, and events.
Current date and time: {current_date_str} at {current_time_str} (UTC)
When users ask questions about "today", "yesterday", "this week", etc., use the current date above as reference.
When searching for objects or events, use ISO 8601 format for dates (e.g., {current_date_str}T00:00:00Z for the start of today).
Always be accurate with time calculations based on the current date provided.{cameras_section}{live_image_note}"""
Always be accurate with time calculations based on the current date provided."""
conversation.append(
{
@@ -443,7 +292,6 @@ Always be accurate with time calculations based on the current date provided.{ca
}
)
first_user_message_seen = False
for msg in body.messages:
msg_dict = {
"role": msg.role,
@@ -453,22 +301,6 @@ Always be accurate with time calculations based on the current date provided.{ca
msg_dict["tool_call_id"] = msg.tool_call_id
if msg.name:
msg_dict["name"] = msg.name
if (
msg.role == "user"
and not first_user_message_seen
and body.include_live_image
):
first_user_message_seen = True
image_url = await _get_live_frame_image_url(
request, body.include_live_image, allowed_cameras
)
if image_url:
msg_dict["content"] = [
{"type": "text", "text": msg.content},
{"type": "image_url", "image_url": {"url": image_url}},
]
conversation.append(msg_dict)
tool_iterations = 0

View File

@@ -32,10 +32,3 @@ class ChatCompletionRequest(BaseModel):
le=10,
description="Maximum number of tool call iterations (default: 5)",
)
include_live_image: Optional[str] = Field(
default=None,
description=(
"If set, the current live frame from this camera is attached to the first "
"user message as multimodal content. Use with get_live_context for detection info."
),
)

View File

@@ -42,6 +42,7 @@ from frigate.const import (
PREVIEW_FRAME_TYPE,
)
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
from frigate.output.preview import get_most_recent_preview_frame
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.file import get_event_thumbnail_bytes
from frigate.util.image import get_image_from_recording
@@ -125,7 +126,9 @@ async def camera_ptz_info(request: Request, camera_name: str):
@router.get(
"/{camera_name}/latest.{extension}", dependencies=[Depends(require_camera_access)]
"/{camera_name}/latest.{extension}",
dependencies=[Depends(require_camera_access)],
description="Returns the latest frame from the specified camera in the requested format (jpg, png, webp). Falls back to preview frames if the camera is offline.",
)
async def latest_frame(
request: Request,
@@ -159,20 +162,37 @@ async def latest_frame(
or 10
)
is_offline = False
if frame is None or datetime.now().timestamp() > (
frame_processor.get_current_frame_time(camera_name) + retry_interval
):
if request.app.camera_error_image is None:
error_image = glob.glob(
os.path.join(INSTALL_DIR, "frigate/images/camera-error.jpg")
)
last_frame_time = frame_processor.get_current_frame_time(camera_name)
preview_path = get_most_recent_preview_frame(
camera_name, before=last_frame_time
)
if len(error_image) > 0:
request.app.camera_error_image = cv2.imread(
error_image[0], cv2.IMREAD_UNCHANGED
if preview_path:
logger.debug(f"Using most recent preview frame for {camera_name}")
frame = cv2.imread(preview_path, cv2.IMREAD_UNCHANGED)
if frame is not None:
is_offline = True
if frame is None or not is_offline:
logger.debug(
f"No live or preview frame available for {camera_name}. Using error image."
)
if request.app.camera_error_image is None:
error_image = glob.glob(
os.path.join(INSTALL_DIR, "frigate/images/camera-error.jpg")
)
frame = request.app.camera_error_image
if len(error_image) > 0:
request.app.camera_error_image = cv2.imread(
error_image[0], cv2.IMREAD_UNCHANGED
)
frame = request.app.camera_error_image
height = int(params.height or str(frame.shape[0]))
width = int(height * frame.shape[1] / frame.shape[0])
@@ -194,14 +214,18 @@ async def latest_frame(
frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
_, img = cv2.imencode(f".{extension.value}", frame, quality_params)
headers = {
"Cache-Control": "no-store" if not params.store else "private, max-age=60",
}
if is_offline:
headers["X-Frigate-Offline"] = "true"
return Response(
content=img.tobytes(),
media_type=extension.get_mime_type(),
headers={
"Cache-Control": "no-store"
if not params.store
else "private, max-age=60",
},
headers=headers,
)
elif (
camera_name == "birdseye"

View File

@@ -216,14 +216,7 @@ class LlamaCppClient(GenAIClient):
"finish_reason": "error",
}
except requests.exceptions.RequestException as e:
error_detail = str(e)
if hasattr(e, "response") and e.response is not None:
try:
error_body = e.response.text
error_detail = f"{str(e)} - Response: {error_body[:500]}"
except Exception:
pass
logger.warning("llama.cpp returned an error: %s", error_detail)
logger.warning("llama.cpp returned an error: %s", str(e))
return {
"content": None,
"tool_calls": None,

View File

@@ -57,6 +57,51 @@ def get_cache_image_name(camera: str, frame_time: float) -> str:
)
def get_most_recent_preview_frame(camera: str, before: float = None) -> str | None:
"""Get the most recent preview frame for a camera."""
if not os.path.exists(PREVIEW_CACHE_DIR):
return None
try:
# files are named preview_{camera}-{timestamp}.webp
# we want the largest timestamp that is less than or equal to before
preview_files = [
f
for f in os.listdir(PREVIEW_CACHE_DIR)
if f.startswith(f"preview_{camera}-")
and f.endswith(f".{PREVIEW_FRAME_TYPE}")
]
if not preview_files:
return None
# sort by timestamp in descending order
# filenames are like preview_front-1712345678.901234.webp
preview_files.sort(reverse=True)
if before is None:
return os.path.join(PREVIEW_CACHE_DIR, preview_files[0])
for file_name in preview_files:
try:
# Extract timestamp: preview_front-1712345678.901234.webp
# Split by dash and extension
timestamp_part = file_name.split("-")[-1].split(
f".{PREVIEW_FRAME_TYPE}"
)[0]
timestamp = float(timestamp_part)
if timestamp <= before:
return os.path.join(PREVIEW_CACHE_DIR, file_name)
except (ValueError, IndexError):
continue
return None
except Exception as e:
logger.error(f"Error searching for most recent preview frame: {e}")
return None
class FFMpegConverter(threading.Thread):
"""Convert a list of still frames into a vfr mp4."""

View File

@@ -0,0 +1,107 @@
import os
import shutil
from unittest.mock import MagicMock
import cv2
import numpy as np
from frigate.output.preview import PREVIEW_CACHE_DIR, PREVIEW_FRAME_TYPE
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
class TestHttpLatestFrame(BaseTestHttp):
def setUp(self):
super().setUp([])
self.app = super().create_app()
self.app.detected_frames_processor = MagicMock()
if os.path.exists(PREVIEW_CACHE_DIR):
shutil.rmtree(PREVIEW_CACHE_DIR)
os.makedirs(PREVIEW_CACHE_DIR)
def tearDown(self):
if os.path.exists(PREVIEW_CACHE_DIR):
shutil.rmtree(PREVIEW_CACHE_DIR)
super().tearDown()
def test_latest_frame_fallback_to_preview(self):
camera = "front_door"
# 1. Mock frame processor to return None (simulating offline/missing frame)
self.app.detected_frames_processor.get_current_frame.return_value = None
# Return a timestamp that is after our dummy preview frame
self.app.detected_frames_processor.get_current_frame_time.return_value = (
1234567891.0
)
# 2. Create a dummy preview file
dummy_frame = np.zeros((180, 320, 3), np.uint8)
cv2.putText(
dummy_frame,
"PREVIEW",
(50, 50),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(255, 255, 255),
2,
)
preview_path = os.path.join(
PREVIEW_CACHE_DIR, f"preview_{camera}-1234567890.0.{PREVIEW_FRAME_TYPE}"
)
cv2.imwrite(preview_path, dummy_frame)
with AuthTestClient(self.app) as client:
response = client.get(f"/{camera}/latest.webp")
assert response.status_code == 200
assert response.headers.get("X-Frigate-Offline") == "true"
# Verify we got an image (webp)
assert response.headers.get("content-type") == "image/webp"
def test_latest_frame_no_fallback_when_live(self):
camera = "front_door"
# 1. Mock frame processor to return a live frame
dummy_frame = np.zeros((180, 320, 3), np.uint8)
self.app.detected_frames_processor.get_current_frame.return_value = dummy_frame
self.app.detected_frames_processor.get_current_frame_time.return_value = (
2000000000.0 # Way in the future
)
with AuthTestClient(self.app) as client:
response = client.get(f"/{camera}/latest.webp")
assert response.status_code == 200
assert "X-Frigate-Offline" not in response.headers
def test_latest_frame_stale_falls_back_to_preview(self):
camera = "front_door"
# 1. Mock frame processor to return a stale frame
dummy_frame = np.zeros((180, 320, 3), np.uint8)
self.app.detected_frames_processor.get_current_frame.return_value = dummy_frame
# Return a timestamp that is after our dummy preview frame, but way in the past
self.app.detected_frames_processor.get_current_frame_time.return_value = 1000.0
# 2. Create a dummy preview file
preview_path = os.path.join(
PREVIEW_CACHE_DIR, f"preview_{camera}-999.0.{PREVIEW_FRAME_TYPE}"
)
cv2.imwrite(preview_path, dummy_frame)
with AuthTestClient(self.app) as client:
response = client.get(f"/{camera}/latest.webp")
assert response.status_code == 200
assert response.headers.get("X-Frigate-Offline") == "true"
def test_latest_frame_no_preview_found(self):
camera = "front_door"
# 1. Mock frame processor to return None
self.app.detected_frames_processor.get_current_frame.return_value = None
# 2. No preview file created
with AuthTestClient(self.app) as client:
response = client.get(f"/{camera}/latest.webp")
# Should fall back to camera-error.jpg (which might not exist in test env, but let's see)
# If camera-error.jpg is not found, it returns 500 "Unable to get valid frame" in latest_frame
# OR it uses request.app.camera_error_image if already loaded.
# Since we didn't provide camera-error.jpg, it might 500 if glob fails or return 500 if frame is None.
assert response.status_code in [200, 500]
assert "X-Frigate-Offline" not in response.headers

View File

@@ -0,0 +1,80 @@
import os
import shutil
import unittest
from frigate.output.preview import (
PREVIEW_CACHE_DIR,
PREVIEW_FRAME_TYPE,
get_most_recent_preview_frame,
)
class TestPreviewLoader(unittest.TestCase):
def setUp(self):
if os.path.exists(PREVIEW_CACHE_DIR):
shutil.rmtree(PREVIEW_CACHE_DIR)
os.makedirs(PREVIEW_CACHE_DIR)
def tearDown(self):
if os.path.exists(PREVIEW_CACHE_DIR):
shutil.rmtree(PREVIEW_CACHE_DIR)
def test_get_most_recent_preview_frame_missing(self):
self.assertIsNone(get_most_recent_preview_frame("test_camera"))
def test_get_most_recent_preview_frame_exists(self):
camera = "test_camera"
# create dummy preview files
for ts in ["1000.0", "2000.0", "1500.0"]:
with open(
os.path.join(
PREVIEW_CACHE_DIR, f"preview_{camera}-{ts}.{PREVIEW_FRAME_TYPE}"
),
"w",
) as f:
f.write(f"test_{ts}")
expected_path = os.path.join(
PREVIEW_CACHE_DIR, f"preview_{camera}-2000.0.{PREVIEW_FRAME_TYPE}"
)
self.assertEqual(get_most_recent_preview_frame(camera), expected_path)
def test_get_most_recent_preview_frame_before(self):
camera = "test_camera"
# create dummy preview files
for ts in ["1000.0", "2000.0"]:
with open(
os.path.join(
PREVIEW_CACHE_DIR, f"preview_{camera}-{ts}.{PREVIEW_FRAME_TYPE}"
),
"w",
) as f:
f.write(f"test_{ts}")
# Test finding frame before or at 1500
expected_path = os.path.join(
PREVIEW_CACHE_DIR, f"preview_{camera}-1000.0.{PREVIEW_FRAME_TYPE}"
)
self.assertEqual(
get_most_recent_preview_frame(camera, before=1500.0), expected_path
)
# Test finding frame before or at 999
self.assertIsNone(get_most_recent_preview_frame(camera, before=999.0))
def test_get_most_recent_preview_frame_other_camera(self):
camera = "test_camera"
other_camera = "other_camera"
with open(
os.path.join(
PREVIEW_CACHE_DIR, f"preview_{other_camera}-3000.0.{PREVIEW_FRAME_TYPE}"
),
"w",
) as f:
f.write("test")
self.assertIsNone(get_most_recent_preview_frame(camera))
def test_get_most_recent_preview_frame_no_directory(self):
shutil.rmtree(PREVIEW_CACHE_DIR)
self.assertIsNone(get_most_recent_preview_frame("test_camera"))

View File

@@ -81,6 +81,11 @@ export default function LivePlayer({
const internalContainerRef = useRef<HTMLDivElement | null>(null);
const cameraName = useCameraFriendlyName(cameraConfig);
// player is showing on a dashboard if containerRef is not provided
const inDashboard = containerRef?.current == null;
// stats
const [stats, setStats] = useState<PlayerStatsType>({
@@ -408,6 +413,28 @@ export default function LivePlayer({
/>
</div>
{offline && inDashboard && (
<>
<div className="absolute inset-0 rounded-lg bg-black/50 md:rounded-2xl" />
<div className="absolute inset-0 left-1/2 top-1/2 flex -translate-x-1/2 -translate-y-1/2 items-center justify-center">
<div className="flex flex-col items-center justify-center gap-2 rounded-lg bg-background/50 p-3 text-center">
<div className="text-md">{t("streamOffline.title")}</div>
<TbExclamationCircle className="size-6" />
<p className="text-center text-sm">
<Trans
ns="components/player"
values={{
cameraName: cameraName,
}}
>
streamOffline.desc
</Trans>
</p>
</div>
</div>
</>
)}
{offline && !showStillWithoutActivity && cameraEnabled && (
<div className="absolute inset-0 left-1/2 top-1/2 flex h-96 w-96 -translate-x-1/2 -translate-y-1/2">
<div className="flex flex-col items-center justify-center rounded-lg bg-background/50 p-5">