Files
zoneminder/src/zm_decoder_thread.cpp
Isaac Connor 5153dc68ee fix: flush decoder_queue on decoder thread exit to avoid stale latency offset across reconnect
The decoder thread holds a raw AVCodecContext* obtained from
camera->getVideoCodecContext() and pushes packet locks into Monitor's
decoder_queue for each send_packet() that hasn't yet been matched by a
receive_frame(). Monitor::PrimeCapture() used to call camera->PrimeCapture()
(which Close()s the camera and frees the codec context) without first
stopping the decoder thread. Two problems followed:

1. Use-after-free race between the decoder thread and the camera teardown.
2. The stale decoder_queue entries survived the reconnect. The new codec
   context produced frames in send-order, so we popped the oldest stale
   entries to attribute frames that actually came from packets sent later.
   The net effect was a permanent N-packet offset between capture and
   decode (~92 packets observed in the field). Analysis blocks on
   !packet->decoded for those packets, so the packetqueue saturates at
   max_video_packet_count and stays there, spamming the "max video packets
   in the queue" warning forever.

Fix:
- Stop+Join the decoder in Monitor::PrimeCapture() before tearing down the
  codec context.
- Add Monitor::flushDecoderQueue() which marks in-flight packets decoded,
  notifies waiters, and clears the queue.
- Call it at the end of DecoderThread::Run() so any Stop()+Join() (including
  the existing one in Pause()) naturally releases stale entries.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 07:38:58 -04:00

52 lines
1.5 KiB
C++

#include "zm_decoder_thread.h"
#include "zm_monitor.h"
#include "zm_signal.h"
DecoderThread::DecoderThread(Monitor *monitor) :
monitor_(monitor), terminate_(false) {
thread_ = std::thread(&DecoderThread::Run, this);
}
DecoderThread::~DecoderThread() {
Stop();
if (thread_.joinable()) thread_.join();
}
void DecoderThread::Start() {
Stop(); // Signal any running thread to terminate first
if (thread_.joinable()) thread_.join();
terminate_ = false;
thread_ = std::thread(&DecoderThread::Run, this);
}
void DecoderThread::Stop() {
terminate_ = true;
}
void DecoderThread::Join() {
if (thread_.joinable()) thread_.join();
}
void DecoderThread::Run() {
Debug(2, "DecoderThread::Run() for %d", monitor_->Id());
while (!(terminate_ or zm_terminate)) {
if (!monitor_->Decode()) {
if (!(terminate_ or zm_terminate)) {
// We wait on the packetqueue condition variable instead of sleeping.
// This allows us to wake up immediately when new packets are queued.
Microseconds wait_for = monitor_->Active() ? Microseconds(ZM_SAMPLE_RATE) : Microseconds(ZM_SUSPENDED_RATE);
Debug(2, "Waiting for %" PRId64 "us", int64(wait_for.count()));
monitor_->GetPacketQueue()->wait_for(wait_for);
}
}
}
// Release any packets we sent to the codec but never received frames for.
// The codec context is about to be (or has been) torn down for Pause /
// reconnect; leaving stale entries would create a permanent latency
// offset against the next codec context on resume.
monitor_->flushDecoderQueue();
}