From b22dbd478870f56d80b22f3960b7c052c66f8f55 Mon Sep 17 00:00:00 2001 From: Peter Keresztes Schmidt Date: Tue, 9 Feb 2021 00:35:16 +0100 Subject: [PATCH 1/4] AnalysisThread: Move it over to std::thread --- src/zm_analysis_thread.cpp | 58 ++++++++++++++++++++------------------ src/zm_analysis_thread.h | 28 ++++++++---------- src/zmc.cpp | 21 ++++---------- 3 files changed, 49 insertions(+), 58 deletions(-) diff --git a/src/zm_analysis_thread.cpp b/src/zm_analysis_thread.cpp index a7002fc23..a79cbe7a3 100644 --- a/src/zm_analysis_thread.cpp +++ b/src/zm_analysis_thread.cpp @@ -3,44 +3,48 @@ #include "zm_signal.h" AnalysisThread::AnalysisThread(std::shared_ptr monitor) : - monitor(std::move(monitor)), terminate(false) {} - -AnalysisThread::~AnalysisThread() { - Debug(2, "THREAD: deleteing analysis thread"); + monitor_(std::move(monitor)), terminate_(false) { + thread_ = std::thread(&AnalysisThread::Run, this); } -int AnalysisThread::run() { - Debug(2, "AnalysisThread::run()"); +AnalysisThread::~AnalysisThread() { + terminate_ = true; + if (thread_.joinable()) + thread_.join(); +} - useconds_t analysis_rate = monitor->GetAnalysisRate(); - unsigned int analysis_update_delay = monitor->GetAnalysisUpdateDelay(); +AnalysisThread::AnalysisThread(AnalysisThread &&rhs) noexcept + : monitor_(std::move(rhs.monitor_)), terminate_(rhs.terminate_.load()), thread_(std::move(rhs.thread_)) {} + +void AnalysisThread::Run() { + Debug(2, "AnalysisThread::Run()"); + + useconds_t analysis_rate = monitor_->GetAnalysisRate(); + unsigned int analysis_update_delay = monitor_->GetAnalysisUpdateDelay(); time_t last_analysis_update_time, cur_time; - monitor->UpdateAdaptiveSkip(); - last_analysis_update_time = time(0); - - while ( !(terminate or zm_terminate) ) { + monitor_->UpdateAdaptiveSkip(); + last_analysis_update_time = time(nullptr); + while (!(terminate_ or zm_terminate)) { // Some periodic updates are required for variable capturing framerate - if ( analysis_update_delay ) { - cur_time = time(0); - if ( (unsigned int)( cur_time - last_analysis_update_time ) > analysis_update_delay ) { - analysis_rate = monitor->GetAnalysisRate(); - monitor->UpdateAdaptiveSkip(); + if (analysis_update_delay) { + cur_time = time(nullptr); + if ((unsigned int) (cur_time - last_analysis_update_time) > analysis_update_delay) { + analysis_rate = monitor_->GetAnalysisRate(); + monitor_->UpdateAdaptiveSkip(); last_analysis_update_time = cur_time; } } Debug(2, "Analyzing"); - if ( !monitor->Analyse() ) { -Debug(2, "uSleeping for %d", (monitor->Active()?ZM_SAMPLE_RATE:ZM_SUSPENDED_RATE)); - usleep(monitor->Active() ? ZM_SAMPLE_RATE : ZM_SUSPENDED_RATE); - } else if ( analysis_rate ) { -Debug(2, "uSleeping for %d", analysis_rate); + if (!monitor_->Analyse()) { + Debug(2, "uSleeping for %d", (monitor_->Active() ? ZM_SAMPLE_RATE : ZM_SUSPENDED_RATE)); + usleep(monitor_->Active() ? ZM_SAMPLE_RATE : ZM_SUSPENDED_RATE); + } else if (analysis_rate) { + Debug(2, "uSleeping for %d", analysis_rate); usleep(analysis_rate); } else { -Debug(2, "Not Sleeping"); + Debug(2, "Not Sleeping"); } - - } // end while ! terminate - return 0; -} // end in AnalysisThread::run() + } +} diff --git a/src/zm_analysis_thread.h b/src/zm_analysis_thread.h index 70d0cd182..7e82614ec 100644 --- a/src/zm_analysis_thread.h +++ b/src/zm_analysis_thread.h @@ -2,26 +2,22 @@ #define ZM_ANALYSIS_THREAD_H #include "zm_monitor.h" -#include "zm_thread.h" +#include #include +#include -class AnalysisThread : public Thread { - private: - std::shared_ptr monitor; - bool terminate; +class AnalysisThread { + public: + explicit AnalysisThread(std::shared_ptr monitor); + ~AnalysisThread(); + AnalysisThread(AnalysisThread &&rhs) noexcept; - public: - explicit AnalysisThread(std::shared_ptr monitor); - ~AnalysisThread(); - int run(); - - void stop() { - terminate = true; - } - bool stopped() const { - return terminate; - } + private: + void Run(); + std::shared_ptr monitor_; + std::atomic terminate_; + std::thread thread_; }; #endif diff --git a/src/zmc.cpp b/src/zmc.cpp index c5f47c37d..624cdf5e5 100644 --- a/src/zmc.cpp +++ b/src/zmc.cpp @@ -281,7 +281,9 @@ int main(int argc, char *argv[]) { Debug(1, "Not starting RTSP server because min_rtsp_port not set"); } #endif - AnalysisThread **analysis_threads = new AnalysisThread *[monitors.size()]; + + std::vector analysis_threads = std::vector(); + int *capture_delays = new int[monitors.size()]; int *alarm_capture_delays = new int[monitors.size()]; struct timeval * last_capture_times = new struct timeval[monitors.size()]; @@ -295,10 +297,7 @@ int main(int argc, char *argv[]) { Monitor::Function function = monitors[0]->GetFunction(); if ( function != Monitor::MONITOR ) { Debug(1, "Starting an analysis thread for monitor (%d)", monitors[i]->Id()); - analysis_threads[i] = new AnalysisThread(monitors[i]); - analysis_threads[i]->start(); - } else { - analysis_threads[i] = nullptr; + analysis_threads.emplace_back(monitors[i]); } #if HAVE_RTSP_SERVER if ( rtsp_server_threads ) { @@ -382,10 +381,9 @@ int main(int argc, char *argv[]) { } // end while ! zm_terminate and connected // Killoff the analysis threads. Don't need them spinning while we try to reconnect + analysis_threads.clear(); + for (size_t i = 0; i < monitors.size(); i++) { - if ( analysis_threads[i] ) { - analysis_threads[i]->stop(); - } #if HAVE_RTSP_SERVER if ( rtsp_server_threads ) { rtsp_server_threads[i]->stop(); @@ -396,12 +394,6 @@ int main(int argc, char *argv[]) { for (size_t i = 0; i < monitors.size(); i++) { monitors[i]->Close(); - // Killoff the analysis threads. Don't need them spinning while we try to reconnect - if ( analysis_threads[i] ) { - analysis_threads[i]->join(); - delete analysis_threads[i]; - analysis_threads[i] = nullptr; - } #if HAVE_RTSP_SERVER if ( rtsp_server_threads ) { rtsp_server_threads[i]->join();; @@ -414,7 +406,6 @@ int main(int argc, char *argv[]) { camera->Close(); } - delete [] analysis_threads; #if HAVE_RTSP_SERVER if ( rtsp_server_threads ) { delete[] rtsp_server_threads; From 4cd70fec1a2f5e615156a3d0fa6c0309d9940320 Mon Sep 17 00:00:00 2001 From: Peter Keresztes Schmidt Date: Tue, 9 Feb 2021 00:57:47 +0100 Subject: [PATCH 2/4] Utils: Add some typedefs for std::chrono durations --- src/zm_utils.h | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/zm_utils.h b/src/zm_utils.h index da9e1f56e..2e178cfad 100644 --- a/src/zm_utils.h +++ b/src/zm_utils.h @@ -20,6 +20,7 @@ #ifndef ZM_UTILS_H #define ZM_UTILS_H +#include #include #include #include @@ -54,4 +55,14 @@ extern unsigned int neonversion; char *timeval_to_string( struct timeval tv ); std::string UriDecode( const std::string &encoded ); void touch( const char *pathname ); + +typedef std::chrono::microseconds Microseconds; +typedef std::chrono::milliseconds Milliseconds; +typedef std::chrono::seconds Seconds; +typedef std::chrono::minutes Minutes; +typedef std::chrono::hours Hours; + +typedef std::chrono::steady_clock::time_point TimePoint; +typedef std::chrono::system_clock::time_point SystemTimePoint; + #endif // ZM_UTILS_H From 3ba8ff604f778b7672cbf08cdc767615d3c387ea Mon Sep 17 00:00:00 2001 From: Peter Keresztes Schmidt Date: Tue, 9 Feb 2021 01:04:19 +0100 Subject: [PATCH 3/4] AnalysisThread: Use std::this_thread::sleep_for instead of usleep --- src/zm_analysis_thread.cpp | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/zm_analysis_thread.cpp b/src/zm_analysis_thread.cpp index a79cbe7a3..66a781c79 100644 --- a/src/zm_analysis_thread.cpp +++ b/src/zm_analysis_thread.cpp @@ -1,6 +1,7 @@ #include "zm_analysis_thread.h" #include "zm_signal.h" +#include "zm_utils.h" AnalysisThread::AnalysisThread(std::shared_ptr monitor) : monitor_(std::move(monitor)), terminate_(false) { @@ -19,7 +20,7 @@ AnalysisThread::AnalysisThread(AnalysisThread &&rhs) noexcept void AnalysisThread::Run() { Debug(2, "AnalysisThread::Run()"); - useconds_t analysis_rate = monitor_->GetAnalysisRate(); + Microseconds analysis_rate = Microseconds(monitor_->GetAnalysisRate()); unsigned int analysis_update_delay = monitor_->GetAnalysisUpdateDelay(); time_t last_analysis_update_time, cur_time; monitor_->UpdateAdaptiveSkip(); @@ -30,7 +31,7 @@ void AnalysisThread::Run() { if (analysis_update_delay) { cur_time = time(nullptr); if ((unsigned int) (cur_time - last_analysis_update_time) > analysis_update_delay) { - analysis_rate = monitor_->GetAnalysisRate(); + analysis_rate = Microseconds(monitor_->GetAnalysisRate()); monitor_->UpdateAdaptiveSkip(); last_analysis_update_time = cur_time; } @@ -38,13 +39,15 @@ void AnalysisThread::Run() { Debug(2, "Analyzing"); if (!monitor_->Analyse()) { - Debug(2, "uSleeping for %d", (monitor_->Active() ? ZM_SAMPLE_RATE : ZM_SUSPENDED_RATE)); - usleep(monitor_->Active() ? ZM_SAMPLE_RATE : ZM_SUSPENDED_RATE); - } else if (analysis_rate) { - Debug(2, "uSleeping for %d", analysis_rate); - usleep(analysis_rate); + Microseconds sleep_for = monitor_->Active() ? Microseconds(ZM_SAMPLE_RATE) : Microseconds(ZM_SUSPENDED_RATE); + + Debug(2, "Sleeping for %" PRId64 "us", int64(sleep_for.count())); + std::this_thread::sleep_for(sleep_for); + } else if (analysis_rate != Microseconds::zero()) { + Debug(2, "Sleeping for %" PRId64 " us", int64(analysis_rate.count())); + std::this_thread::sleep_for(analysis_rate); } else { - Debug(2, "Not Sleeping"); + Debug(2, "Not sleeping"); } } } From 888ddc5d63c57380274d761bed8687620a021dc4 Mon Sep 17 00:00:00 2001 From: Peter Keresztes Schmidt Date: Tue, 9 Feb 2021 01:18:01 +0100 Subject: [PATCH 4/4] AnalysisThread: Sprinkle some more std::chrono --- src/zm_analysis_thread.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/zm_analysis_thread.cpp b/src/zm_analysis_thread.cpp index 66a781c79..bc8f3c610 100644 --- a/src/zm_analysis_thread.cpp +++ b/src/zm_analysis_thread.cpp @@ -21,16 +21,18 @@ void AnalysisThread::Run() { Debug(2, "AnalysisThread::Run()"); Microseconds analysis_rate = Microseconds(monitor_->GetAnalysisRate()); - unsigned int analysis_update_delay = monitor_->GetAnalysisUpdateDelay(); - time_t last_analysis_update_time, cur_time; + Seconds analysis_update_delay = Seconds(monitor_->GetAnalysisUpdateDelay()); + monitor_->UpdateAdaptiveSkip(); - last_analysis_update_time = time(nullptr); + + TimePoint last_analysis_update_time = std::chrono::steady_clock::now(); + TimePoint cur_time; while (!(terminate_ or zm_terminate)) { // Some periodic updates are required for variable capturing framerate - if (analysis_update_delay) { - cur_time = time(nullptr); - if ((unsigned int) (cur_time - last_analysis_update_time) > analysis_update_delay) { + if (analysis_update_delay != Seconds::zero()) { + cur_time = std::chrono::steady_clock::now(); + if ((cur_time - last_analysis_update_time) > analysis_update_delay) { analysis_rate = Microseconds(monitor_->GetAnalysisRate()); monitor_->UpdateAdaptiveSkip(); last_analysis_update_time = cur_time;