diff --git a/src/zm_analysis_thread.cpp b/src/zm_analysis_thread.cpp index a7002fc23..bc8f3c610 100644 --- a/src/zm_analysis_thread.cpp +++ b/src/zm_analysis_thread.cpp @@ -1,46 +1,55 @@ #include "zm_analysis_thread.h" #include "zm_signal.h" +#include "zm_utils.h" AnalysisThread::AnalysisThread(std::shared_ptr monitor) : - monitor(std::move(monitor)), terminate(false) {} - -AnalysisThread::~AnalysisThread() { - Debug(2, "THREAD: deleteing analysis thread"); + monitor_(std::move(monitor)), terminate_(false) { + thread_ = std::thread(&AnalysisThread::Run, this); } -int AnalysisThread::run() { - Debug(2, "AnalysisThread::run()"); +AnalysisThread::~AnalysisThread() { + terminate_ = true; + if (thread_.joinable()) + thread_.join(); +} - useconds_t analysis_rate = monitor->GetAnalysisRate(); - unsigned int analysis_update_delay = monitor->GetAnalysisUpdateDelay(); - time_t last_analysis_update_time, cur_time; - monitor->UpdateAdaptiveSkip(); - last_analysis_update_time = time(0); +AnalysisThread::AnalysisThread(AnalysisThread &&rhs) noexcept + : monitor_(std::move(rhs.monitor_)), terminate_(rhs.terminate_.load()), thread_(std::move(rhs.thread_)) {} - while ( !(terminate or zm_terminate) ) { +void AnalysisThread::Run() { + Debug(2, "AnalysisThread::Run()"); + Microseconds analysis_rate = Microseconds(monitor_->GetAnalysisRate()); + Seconds analysis_update_delay = Seconds(monitor_->GetAnalysisUpdateDelay()); + + monitor_->UpdateAdaptiveSkip(); + + TimePoint last_analysis_update_time = std::chrono::steady_clock::now(); + TimePoint cur_time; + + while (!(terminate_ or zm_terminate)) { // Some periodic updates are required for variable capturing framerate - if ( analysis_update_delay ) { - cur_time = time(0); - if ( (unsigned int)( cur_time - last_analysis_update_time ) > analysis_update_delay ) { - analysis_rate = monitor->GetAnalysisRate(); - monitor->UpdateAdaptiveSkip(); + if (analysis_update_delay != Seconds::zero()) { + cur_time = std::chrono::steady_clock::now(); + if ((cur_time - last_analysis_update_time) > analysis_update_delay) { + analysis_rate = Microseconds(monitor_->GetAnalysisRate()); + monitor_->UpdateAdaptiveSkip(); last_analysis_update_time = cur_time; } } Debug(2, "Analyzing"); - if ( !monitor->Analyse() ) { -Debug(2, "uSleeping for %d", (monitor->Active()?ZM_SAMPLE_RATE:ZM_SUSPENDED_RATE)); - usleep(monitor->Active() ? ZM_SAMPLE_RATE : ZM_SUSPENDED_RATE); - } else if ( analysis_rate ) { -Debug(2, "uSleeping for %d", analysis_rate); - usleep(analysis_rate); - } else { -Debug(2, "Not Sleeping"); - } + if (!monitor_->Analyse()) { + Microseconds sleep_for = monitor_->Active() ? Microseconds(ZM_SAMPLE_RATE) : Microseconds(ZM_SUSPENDED_RATE); - } // end while ! terminate - return 0; -} // end in AnalysisThread::run() + Debug(2, "Sleeping for %" PRId64 "us", int64(sleep_for.count())); + std::this_thread::sleep_for(sleep_for); + } else if (analysis_rate != Microseconds::zero()) { + Debug(2, "Sleeping for %" PRId64 " us", int64(analysis_rate.count())); + std::this_thread::sleep_for(analysis_rate); + } else { + Debug(2, "Not sleeping"); + } + } +} diff --git a/src/zm_analysis_thread.h b/src/zm_analysis_thread.h index 70d0cd182..7e82614ec 100644 --- a/src/zm_analysis_thread.h +++ b/src/zm_analysis_thread.h @@ -2,26 +2,22 @@ #define ZM_ANALYSIS_THREAD_H #include "zm_monitor.h" -#include "zm_thread.h" +#include #include +#include -class AnalysisThread : public Thread { - private: - std::shared_ptr monitor; - bool terminate; +class AnalysisThread { + public: + explicit AnalysisThread(std::shared_ptr monitor); + ~AnalysisThread(); + AnalysisThread(AnalysisThread &&rhs) noexcept; - public: - explicit AnalysisThread(std::shared_ptr monitor); - ~AnalysisThread(); - int run(); - - void stop() { - terminate = true; - } - bool stopped() const { - return terminate; - } + private: + void Run(); + std::shared_ptr monitor_; + std::atomic terminate_; + std::thread thread_; }; #endif diff --git a/src/zm_utils.h b/src/zm_utils.h index da9e1f56e..2e178cfad 100644 --- a/src/zm_utils.h +++ b/src/zm_utils.h @@ -20,6 +20,7 @@ #ifndef ZM_UTILS_H #define ZM_UTILS_H +#include #include #include #include @@ -54,4 +55,14 @@ extern unsigned int neonversion; char *timeval_to_string( struct timeval tv ); std::string UriDecode( const std::string &encoded ); void touch( const char *pathname ); + +typedef std::chrono::microseconds Microseconds; +typedef std::chrono::milliseconds Milliseconds; +typedef std::chrono::seconds Seconds; +typedef std::chrono::minutes Minutes; +typedef std::chrono::hours Hours; + +typedef std::chrono::steady_clock::time_point TimePoint; +typedef std::chrono::system_clock::time_point SystemTimePoint; + #endif // ZM_UTILS_H diff --git a/src/zmc.cpp b/src/zmc.cpp index c5f47c37d..624cdf5e5 100644 --- a/src/zmc.cpp +++ b/src/zmc.cpp @@ -281,7 +281,9 @@ int main(int argc, char *argv[]) { Debug(1, "Not starting RTSP server because min_rtsp_port not set"); } #endif - AnalysisThread **analysis_threads = new AnalysisThread *[monitors.size()]; + + std::vector analysis_threads = std::vector(); + int *capture_delays = new int[monitors.size()]; int *alarm_capture_delays = new int[monitors.size()]; struct timeval * last_capture_times = new struct timeval[monitors.size()]; @@ -295,10 +297,7 @@ int main(int argc, char *argv[]) { Monitor::Function function = monitors[0]->GetFunction(); if ( function != Monitor::MONITOR ) { Debug(1, "Starting an analysis thread for monitor (%d)", monitors[i]->Id()); - analysis_threads[i] = new AnalysisThread(monitors[i]); - analysis_threads[i]->start(); - } else { - analysis_threads[i] = nullptr; + analysis_threads.emplace_back(monitors[i]); } #if HAVE_RTSP_SERVER if ( rtsp_server_threads ) { @@ -382,10 +381,9 @@ int main(int argc, char *argv[]) { } // end while ! zm_terminate and connected // Killoff the analysis threads. Don't need them spinning while we try to reconnect + analysis_threads.clear(); + for (size_t i = 0; i < monitors.size(); i++) { - if ( analysis_threads[i] ) { - analysis_threads[i]->stop(); - } #if HAVE_RTSP_SERVER if ( rtsp_server_threads ) { rtsp_server_threads[i]->stop(); @@ -396,12 +394,6 @@ int main(int argc, char *argv[]) { for (size_t i = 0; i < monitors.size(); i++) { monitors[i]->Close(); - // Killoff the analysis threads. Don't need them spinning while we try to reconnect - if ( analysis_threads[i] ) { - analysis_threads[i]->join(); - delete analysis_threads[i]; - analysis_threads[i] = nullptr; - } #if HAVE_RTSP_SERVER if ( rtsp_server_threads ) { rtsp_server_threads[i]->join();; @@ -414,7 +406,6 @@ int main(int argc, char *argv[]) { camera->Close(); } - delete [] analysis_threads; #if HAVE_RTSP_SERVER if ( rtsp_server_threads ) { delete[] rtsp_server_threads;