Merge pull request #3145 from Carbenium/analysis-thread

AnalysisThread: Move it over to std::thread
This commit is contained in:
Isaac Connor
2021-02-09 09:26:47 -05:00
committed by GitHub
4 changed files with 67 additions and 60 deletions

View File

@@ -1,46 +1,55 @@
#include "zm_analysis_thread.h"
#include "zm_signal.h"
#include "zm_utils.h"
AnalysisThread::AnalysisThread(std::shared_ptr<Monitor> monitor) :
monitor(std::move(monitor)), terminate(false) {}
AnalysisThread::~AnalysisThread() {
Debug(2, "THREAD: deleteing analysis thread");
monitor_(std::move(monitor)), terminate_(false) {
thread_ = std::thread(&AnalysisThread::Run, this);
}
int AnalysisThread::run() {
Debug(2, "AnalysisThread::run()");
AnalysisThread::~AnalysisThread() {
terminate_ = true;
if (thread_.joinable())
thread_.join();
}
useconds_t analysis_rate = monitor->GetAnalysisRate();
unsigned int analysis_update_delay = monitor->GetAnalysisUpdateDelay();
time_t last_analysis_update_time, cur_time;
monitor->UpdateAdaptiveSkip();
last_analysis_update_time = time(0);
AnalysisThread::AnalysisThread(AnalysisThread &&rhs) noexcept
: monitor_(std::move(rhs.monitor_)), terminate_(rhs.terminate_.load()), thread_(std::move(rhs.thread_)) {}
while ( !(terminate or zm_terminate) ) {
void AnalysisThread::Run() {
Debug(2, "AnalysisThread::Run()");
Microseconds analysis_rate = Microseconds(monitor_->GetAnalysisRate());
Seconds analysis_update_delay = Seconds(monitor_->GetAnalysisUpdateDelay());
monitor_->UpdateAdaptiveSkip();
TimePoint last_analysis_update_time = std::chrono::steady_clock::now();
TimePoint cur_time;
while (!(terminate_ or zm_terminate)) {
// Some periodic updates are required for variable capturing framerate
if ( analysis_update_delay ) {
cur_time = time(0);
if ( (unsigned int)( cur_time - last_analysis_update_time ) > analysis_update_delay ) {
analysis_rate = monitor->GetAnalysisRate();
monitor->UpdateAdaptiveSkip();
if (analysis_update_delay != Seconds::zero()) {
cur_time = std::chrono::steady_clock::now();
if ((cur_time - last_analysis_update_time) > analysis_update_delay) {
analysis_rate = Microseconds(monitor_->GetAnalysisRate());
monitor_->UpdateAdaptiveSkip();
last_analysis_update_time = cur_time;
}
}
Debug(2, "Analyzing");
if ( !monitor->Analyse() ) {
Debug(2, "uSleeping for %d", (monitor->Active()?ZM_SAMPLE_RATE:ZM_SUSPENDED_RATE));
usleep(monitor->Active() ? ZM_SAMPLE_RATE : ZM_SUSPENDED_RATE);
} else if ( analysis_rate ) {
Debug(2, "uSleeping for %d", analysis_rate);
usleep(analysis_rate);
} else {
Debug(2, "Not Sleeping");
}
if (!monitor_->Analyse()) {
Microseconds sleep_for = monitor_->Active() ? Microseconds(ZM_SAMPLE_RATE) : Microseconds(ZM_SUSPENDED_RATE);
} // end while ! terminate
return 0;
} // end in AnalysisThread::run()
Debug(2, "Sleeping for %" PRId64 "us", int64(sleep_for.count()));
std::this_thread::sleep_for(sleep_for);
} else if (analysis_rate != Microseconds::zero()) {
Debug(2, "Sleeping for %" PRId64 " us", int64(analysis_rate.count()));
std::this_thread::sleep_for(analysis_rate);
} else {
Debug(2, "Not sleeping");
}
}
}

View File

@@ -2,26 +2,22 @@
#define ZM_ANALYSIS_THREAD_H
#include "zm_monitor.h"
#include "zm_thread.h"
#include <atomic>
#include <memory>
#include <thread>
class AnalysisThread : public Thread {
private:
std::shared_ptr<Monitor> monitor;
bool terminate;
class AnalysisThread {
public:
explicit AnalysisThread(std::shared_ptr<Monitor> monitor);
~AnalysisThread();
AnalysisThread(AnalysisThread &&rhs) noexcept;
public:
explicit AnalysisThread(std::shared_ptr<Monitor> monitor);
~AnalysisThread();
int run();
void stop() {
terminate = true;
}
bool stopped() const {
return terminate;
}
private:
void Run();
std::shared_ptr<Monitor> monitor_;
std::atomic<bool> terminate_;
std::thread thread_;
};
#endif

View File

@@ -20,6 +20,7 @@
#ifndef ZM_UTILS_H
#define ZM_UTILS_H
#include <chrono>
#include <ctime>
#include <sys/time.h>
#include <string>
@@ -54,4 +55,14 @@ extern unsigned int neonversion;
char *timeval_to_string( struct timeval tv );
std::string UriDecode( const std::string &encoded );
void touch( const char *pathname );
typedef std::chrono::microseconds Microseconds;
typedef std::chrono::milliseconds Milliseconds;
typedef std::chrono::seconds Seconds;
typedef std::chrono::minutes Minutes;
typedef std::chrono::hours Hours;
typedef std::chrono::steady_clock::time_point TimePoint;
typedef std::chrono::system_clock::time_point SystemTimePoint;
#endif // ZM_UTILS_H

View File

@@ -281,7 +281,9 @@ int main(int argc, char *argv[]) {
Debug(1, "Not starting RTSP server because min_rtsp_port not set");
}
#endif
AnalysisThread **analysis_threads = new AnalysisThread *[monitors.size()];
std::vector<AnalysisThread> analysis_threads = std::vector<AnalysisThread>();
int *capture_delays = new int[monitors.size()];
int *alarm_capture_delays = new int[monitors.size()];
struct timeval * last_capture_times = new struct timeval[monitors.size()];
@@ -295,10 +297,7 @@ int main(int argc, char *argv[]) {
Monitor::Function function = monitors[0]->GetFunction();
if ( function != Monitor::MONITOR ) {
Debug(1, "Starting an analysis thread for monitor (%d)", monitors[i]->Id());
analysis_threads[i] = new AnalysisThread(monitors[i]);
analysis_threads[i]->start();
} else {
analysis_threads[i] = nullptr;
analysis_threads.emplace_back(monitors[i]);
}
#if HAVE_RTSP_SERVER
if ( rtsp_server_threads ) {
@@ -382,10 +381,9 @@ int main(int argc, char *argv[]) {
} // end while ! zm_terminate and connected
// Killoff the analysis threads. Don't need them spinning while we try to reconnect
analysis_threads.clear();
for (size_t i = 0; i < monitors.size(); i++) {
if ( analysis_threads[i] ) {
analysis_threads[i]->stop();
}
#if HAVE_RTSP_SERVER
if ( rtsp_server_threads ) {
rtsp_server_threads[i]->stop();
@@ -396,12 +394,6 @@ int main(int argc, char *argv[]) {
for (size_t i = 0; i < monitors.size(); i++) {
monitors[i]->Close();
// Killoff the analysis threads. Don't need them spinning while we try to reconnect
if ( analysis_threads[i] ) {
analysis_threads[i]->join();
delete analysis_threads[i];
analysis_threads[i] = nullptr;
}
#if HAVE_RTSP_SERVER
if ( rtsp_server_threads ) {
rtsp_server_threads[i]->join();;
@@ -414,7 +406,6 @@ int main(int argc, char *argv[]) {
camera->Close();
}
delete [] analysis_threads;
#if HAVE_RTSP_SERVER
if ( rtsp_server_threads ) {
delete[] rtsp_server_threads;