From ed66f1b1a81214794af22a889d9919351f8cea20 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Fri, 17 Jun 2022 17:23:25 -0400 Subject: [PATCH 1/4] Make AnalysisThread::Stop wait for the thread to stop --- src/zm_analysis_thread.cpp | 6 +++++- src/zm_analysis_thread.h | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/zm_analysis_thread.cpp b/src/zm_analysis_thread.cpp index a6f1afcc2..1bc417091 100644 --- a/src/zm_analysis_thread.cpp +++ b/src/zm_analysis_thread.cpp @@ -11,7 +11,6 @@ AnalysisThread::AnalysisThread(Monitor *monitor) : AnalysisThread::~AnalysisThread() { Stop(); - if (thread_.joinable()) thread_.join(); } void AnalysisThread::Start() { @@ -21,6 +20,11 @@ void AnalysisThread::Start() { thread_ = std::thread(&AnalysisThread::Run, this); } +void AnalysisThread::Stop() { + terminate_ = true; + if (thread_.joinable()) thread_.join(); +} + void AnalysisThread::Run() { while (!(terminate_ or zm_terminate)) { // Some periodic updates are required for variable capturing framerate diff --git a/src/zm_analysis_thread.h b/src/zm_analysis_thread.h index f949aa729..c90bb2acb 100644 --- a/src/zm_analysis_thread.h +++ b/src/zm_analysis_thread.h @@ -15,7 +15,7 @@ class AnalysisThread { AnalysisThread(AnalysisThread &&rhs) = delete; void Start(); - void Stop() { terminate_ = true; } + void Stop(); bool Stopped() const { return terminate_; } private: From 3c8b590925f51bc1fef912e668287bacf0fdb73e Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Fri, 17 Jun 2022 17:23:33 -0400 Subject: [PATCH 2/4] Make DecoderThread::Stop wait for the thread to stop --- src/zm_decoder_thread.cpp | 7 ++++++- src/zm_decoder_thread.h | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/zm_decoder_thread.cpp b/src/zm_decoder_thread.cpp index 0ffeadfc5..5c1033985 100644 --- a/src/zm_decoder_thread.cpp +++ b/src/zm_decoder_thread.cpp @@ -10,7 +10,6 @@ DecoderThread::DecoderThread(Monitor *monitor) : DecoderThread::~DecoderThread() { Stop(); - if (thread_.joinable()) thread_.join(); } void DecoderThread::Start() { @@ -18,6 +17,12 @@ void DecoderThread::Start() { terminate_ = false; thread_ = std::thread(&DecoderThread::Run, this); } + +void DecoderThread::Stop() { + terminate_ = true; + if (thread_.joinable()) thread_.join(); +} + void DecoderThread::Run() { Debug(2, "DecoderThread::Run() for %d", monitor_->Id()); diff --git a/src/zm_decoder_thread.h b/src/zm_decoder_thread.h index 62b2f0d23..4fb8ea453 100644 --- a/src/zm_decoder_thread.h +++ b/src/zm_decoder_thread.h @@ -15,7 +15,7 @@ class DecoderThread { DecoderThread(DecoderThread &&rhs) = delete; void Start(); - void Stop() { terminate_ = true; } + void Stop(); private: void Run(); From aeca49d427dd6c3cca1a6d2c25ee3159a37d07df Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Fri, 17 Jun 2022 17:24:14 -0400 Subject: [PATCH 3/4] Stop the packetque first before stopping threads that may be in it. Only call clearPackets for video packets --- src/zm_monitor.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/zm_monitor.cpp b/src/zm_monitor.cpp index 7855b8073..9ade875ae 100644 --- a/src/zm_monitor.cpp +++ b/src/zm_monitor.cpp @@ -2215,9 +2215,9 @@ bool Monitor::Analyse() { shared_data->state = state = IDLE; } // end if ( trigger_data->trigger_state != TRIGGER_OFF ) - packetqueue.clearPackets(snap); if (snap->codec_type == AVMEDIA_TYPE_VIDEO) { + packetqueue.clearPackets(snap); // Only do these if it's a video packet. shared_data->last_read_index = snap->image_index; analysis_image_count++; @@ -3144,8 +3144,14 @@ int Monitor::PrimeCapture() { int Monitor::PreCapture() const { return camera->PreCapture(); } int Monitor::PostCapture() const { return camera->PostCapture(); } int Monitor::Close() { + Debug(1, "Stopping packetqueue"); + // Wake everyone up + packetqueue.stop(); + Debug(1, "Stopped packetqueue"); + // Because the stream indexes may change we have to clear out the packetqueue if (decoder) { + Debug(1, "Decoder stopping"); decoder->Stop(); Debug(1, "Decoder stopped"); } @@ -3187,10 +3193,6 @@ int Monitor::Close() { video_fifo = nullptr; } - Debug(1, "Stopping packetqueue"); - // Wake everyone up - packetqueue.stop(); - Debug(1, "Stopped packetqueue"); if (close_event_thread.joinable()) { Debug(1, "Joining event thread"); From a515c3a8994a3440e064fae25df58f6923cdd428 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Fri, 17 Jun 2022 17:25:08 -0400 Subject: [PATCH 4/4] clean out the if0'd out reorder code --- src/zm_packetqueue.cpp | 57 ++++++++---------------------------------- 1 file changed, 10 insertions(+), 47 deletions(-) diff --git a/src/zm_packetqueue.cpp b/src/zm_packetqueue.cpp index 68ba3de0f..bccab60b9 100644 --- a/src/zm_packetqueue.cpp +++ b/src/zm_packetqueue.cpp @@ -87,54 +87,17 @@ bool PacketQueue::queuePacket(std::shared_ptr add_packet) { { std::unique_lock lck(mutex); if (deleting or zm_terminate) return false; -#if 0 - bool have_out_of_order = false; - auto rit = pktQueue.rbegin(); - if (add_packet->packet.dts != AV_NOPTS_VALUE) { - // Find the previous packet for the stream, and check dts - while (rit != pktQueue.rend()) { - if ((*rit)->packet.stream_index == add_packet->packet.stream_index) { - if ((*rit)->packet.dts <= add_packet->packet.dts) { - Debug(1, "Found in order packet"); - ZM_DUMP_PACKET((*rit)->packet, "queued_packet"); - ZM_DUMP_PACKET(add_packet->packet, "add_packet"); - // packets are in order, everything is fine - break; - } else { - ZM_DUMP_PACKET((*rit)->packet, "queued_packet"); - ZM_DUMP_PACKET(add_packet->packet, "add_packet"); - have_out_of_order = true; - } - } - rit++; - } // end while - } - if (have_out_of_order) { - //auto it = rit.base(); it++; // insert inserts BEFORE the it, so we need to - pktQueue.insert(rit.base(), add_packet); - if (rit == pktQueue.rend()) { - Warning("Unable to re-order packet"); - } else { - Debug(1, "Found out of order packet"); - dumpQueue(); + pktQueue.push_back(add_packet); + for ( + auto iterators_it = iterators.begin(); + iterators_it != iterators.end(); + ++iterators_it + ) { + packetqueue_iterator *iterator_it = *iterators_it; + if (*iterator_it == pktQueue.end()) { + --(*iterator_it); } - } else { -#endif - pktQueue.push_back(add_packet); - for ( - auto iterators_it = iterators.begin(); - iterators_it != iterators.end(); - ++iterators_it - ) { - packetqueue_iterator *iterator_it = *iterators_it; - if (*iterator_it == pktQueue.end()) { - --(*iterator_it); - } - } // end foreach iterator -#if 0 - } -#endif - + } // end foreach iterator packet_counts[add_packet->packet.stream_index] += 1; Debug(2, "packet counts for %d is %d",