From 0882a3ad1ef54d13e0f170a29eb253d83cb9cdd2 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Thu, 12 Feb 2026 23:00:59 -0500 Subject: [PATCH] fix: resolve Event::Run thread hang preventing zmc clean shutdown Event::Run could block indefinitely in PacketQueue methods during normal event closing (closeEvent from analysis thread), because their wait predicates only check deleting/zm_terminate, not Event's terminate_ flag. Three changes fix this: - get_packet_no_wait: return immediately when iterator at end instead of blocking on condition variable (makes it truly non-blocking) - Event::Run: use increment_it(wait=false) since deletePacket can advance the iterator to end() during AddPacket_ without the queue lock - Event::Stop: call packetqueue->notify_all() to wake timed waits so Run() checks terminate_ promptly Co-Authored-By: Claude Opus 4.6 --- src/zm_event.cpp | 28 ++++++++++++++-------------- src/zm_event.h | 1 + src/zm_monitor.cpp | 2 ++ src/zm_packetqueue.cpp | 7 ++----- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/zm_event.cpp b/src/zm_event.cpp index f2ecb73ea..ecae6985f 100644 --- a/src/zm_event.cpp +++ b/src/zm_event.cpp @@ -188,19 +188,21 @@ Event::Event( } Event::~Event() { + Debug(1, "~Event %" PRIu64 ": calling Stop", id); Stop(); if (thread_.joinable()) { - Debug(1, "Joining event thread"); - // Should be. Issuing the stop and then getting the lock + Debug(1, "~Event %" PRIu64 ": joining Run thread", id); thread_.join(); + Debug(1, "~Event %" PRIu64 ": Run thread joined", id); } + Debug(1, "~Event %" PRIu64 ": freeing packetqueue iterator", id); packetqueue->free_it(packetqueue_it); /* Close the video file */ // We close the videowriter first, because if we finish the event, we might try to view the file, but we aren't done writing it yet. if (videoStore != nullptr) { - Debug(4, "Deleting video store"); + Debug(1, "~Event %" PRIu64 ": deleting video store", id); delete videoStore; videoStore = nullptr; int result = rename(video_incomplete_path.c_str(), video_path.c_str()); @@ -262,6 +264,7 @@ Event::~Event() { sql = stringtf("UPDATE Storage SET DiskSpace = DiskSpace + %" PRIu64 " WHERE Id=%u", video_size, storage->Id()); dbQueue.push(std::move(sql)); } + Debug(1, "~Event %" PRIu64 ": complete", id); } // Event::~Event() void Event::createNotes(std::string ¬es) { @@ -683,6 +686,7 @@ bool Event::SetPath(Storage *storage) { } // end bool Event::SetPath void Event::Run() { + Debug(1, "Event::Run %" PRIu64 ": starting setup", id); Storage *storage = monitor->getStorage(); if (!SetPath(storage)) { // Try another @@ -772,6 +776,7 @@ void Event::Run() { // The idea is to process the queue no matter what so that all packets get processed. // We only break if the queue is empty + Debug(1, "Event::Run %" PRIu64 ": entering packet loop", id); while (!terminate_ and !zm_terminate) { ZMPacketLock packet_lock = packetqueue->get_packet_no_wait(packetqueue_it); std::shared_ptr packet = packet_lock.packet_; @@ -779,16 +784,12 @@ void Event::Run() { if (!packet->decoded) { Debug(1, "Not decoded"); packet_lock.unlock(); - // Wait on packetqueue condition instead of blind sleep — wakes immediately - // when decoder sets decoded=true and calls packetqueue.notify_all() packetqueue->wait_for(Microseconds(ZM_SAMPLE_RATE)); continue; } if (!packet->analyzed) { Debug(1, "Not analyzed"); packet_lock.unlock(); - // Wait on packetqueue condition instead of blind sleep — wakes immediately - // when analysis sets analyzed=true and calls packetqueue.notify_all() packetqueue->wait_for(Microseconds(ZM_SAMPLE_RATE)); continue; } @@ -800,7 +801,6 @@ void Event::Run() { if (monitor->GetOptVideoWriter() == Monitor::PASSTHROUGH) { if (!save_jpegs) { Debug(1, "Deleting image data for %d", packet->image_index); - // Don't need raw images anymore delete packet->image; packet->image = nullptr; } @@ -810,16 +810,16 @@ void Event::Run() { delete packet->analysis_image; packet->analysis_image = nullptr; } - } // end if packet->image - // Important not to increment it until after we are done with the packet because clearPackets checks for iterators pointing to it. - packetqueue->increment_it(packetqueue_it, true); + } + // Use wait=false: deletePacket may have advanced our iterator to end() + // while we were in AddPacket_ without the queue lock. + packetqueue->increment_it(packetqueue_it, false); } else { if (terminate_ or zm_terminate) return; - // Wait on packetqueue condition instead of blind sleep — wakes immediately - // when a packet is queued or a packet lock becomes available packetqueue->wait_for(Microseconds(10000)); - } // end if packet_lock + } } // end while + Debug(1, "Event::Run %" PRIu64 ": exiting, terminate_=%d zm_terminate=%d", id, terminate_.load(), zm_terminate); } // end Run() int Event::MonitorId() const { diff --git a/src/zm_event.h b/src/zm_event.h index 3034638f4..38cee8a6d 100644 --- a/src/zm_event.h +++ b/src/zm_event.h @@ -150,6 +150,7 @@ class Event { void Stop() { terminate_ = true; + packetqueue->notify_all(); } bool Stopped() const { return terminate_; } diff --git a/src/zm_monitor.cpp b/src/zm_monitor.cpp index e52d845b4..103f79821 100644 --- a/src/zm_monitor.cpp +++ b/src/zm_monitor.cpp @@ -3269,7 +3269,9 @@ void Monitor::closeEvent() { close_event_thread = std::thread([](Event *e, const std::string &command) { int64_t event_id = e->Id(); int monitor_id = e->MonitorId(); + Debug(1, "close_event_thread: deleting event %" PRId64, event_id); delete e; + Debug(1, "close_event_thread: event %" PRId64 " deleted", event_id); if (!command.empty()) { if (fork() == 0) { diff --git a/src/zm_packetqueue.cpp b/src/zm_packetqueue.cpp index c14fb45dc..d82ab50b3 100644 --- a/src/zm_packetqueue.cpp +++ b/src/zm_packetqueue.cpp @@ -475,11 +475,8 @@ ZMPacketLock PacketQueue::get_packet_no_wait(packetqueue_iterator *it) { std::addressof(*it), (*it == pktQueue.end())); if (deleting or zm_terminate) return ZMPacketLock(); - if ((*it == pktQueue.end()) and !(deleting or zm_terminate)) { - Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end())); - condition.wait(lck, [&]{ return (*it != pktQueue.end()) || deleting || zm_terminate; }); - } - if ((*it == pktQueue.end()) or deleting or zm_terminate) return ZMPacketLock(); + if (*it == pktQueue.end()) + return ZMPacketLock(); std::shared_ptr p = *(*it); ZMPacketLock packet_lock(p);