From aa030afcb6eb1bbea87ac9ded5bc53614546d64d Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Mon, 15 Aug 2022 17:58:56 -0400 Subject: [PATCH] Make checkCommandQueue it's own thread, meaning we use blocking I/O and will still respond when it takes forever to send a jpeg. Make warning about broken stream into a Debug. The signal doesn't set zm_terminate fast enough. --- src/zm_eventstream.cpp | 28 ++++++++++---------- src/zm_monitorstream.cpp | 36 +++++++++++++------------- src/zm_stream.cpp | 55 +++++++++++++++++++++------------------- src/zm_stream.h | 4 ++- 4 files changed, 63 insertions(+), 60 deletions(-) diff --git a/src/zm_eventstream.cpp b/src/zm_eventstream.cpp index 9dafae3cd..09dd2554c 100644 --- a/src/zm_eventstream.cpp +++ b/src/zm_eventstream.cpp @@ -842,25 +842,17 @@ void EventStream::runStream() { SystemTimePoint::duration last_frame_offset = Seconds(0); SystemTimePoint::duration time_to_event = Seconds(0); + std::thread command_processor; + if (connkey) { + command_processor = std::thread(&EventStream::checkCommandQueue, this); + } + while ( !zm_terminate ) { now = std::chrono::steady_clock::now(); Microseconds delta = Microseconds(0); send_frame = false; - if ( connkey ) { - // commands may set send_frame to true - while ( checkCommandQueue() && !zm_terminate ) { - // The idea is to loop here processing all commands before proceeding. - } - - // Update modified time of the socket .lock file so that we can tell which ones are stale. - if (now - last_comm_update > Hours(1)) { - touch(sock_path_lock); - last_comm_update = now; - } - } - // Get current frame data FrameData *frame_data = &event_data->frames[curr_frame_id-1]; @@ -1075,7 +1067,15 @@ void EventStream::runStream() { delete vid_stream; } - closeComms(); + if (connkey) { + if (command_processor.joinable()) { + Debug(1, "command_processor is joinable"); + command_processor.join(); + } else { + Debug(1, "command_processor is not joinable"); + } + command_processor.join(); + } } // end void EventStream::runStream() bool EventStream::send_file(const std::string &filepath) { diff --git a/src/zm_monitorstream.cpp b/src/zm_monitorstream.cpp index 9b6b4a0dd..0007fe292 100644 --- a/src/zm_monitorstream.cpp +++ b/src/zm_monitorstream.cpp @@ -429,7 +429,8 @@ bool MonitorStream::sendFrame(Image *image, SystemTimePoint timestamp) { ) { if (!zm_terminate) { // If the pipe was closed, we will get signalled SIGPIPE to exit, which will set zm_terminate - Warning("Unable to send stream frame: %s", strerror(errno)); + // ICON: zm_terminate might not get set yet. Make it a debug + Debug(1, "Unable to send stream frame: %s", strerror(errno)); } return false; } @@ -532,6 +533,12 @@ void MonitorStream::runStream() { } else { Debug(2, "Not using playback_buffer"); } // end if connkey && playback_buffer + + std::thread command_processor; + if (connkey) { + command_processor = std::thread(&MonitorStream::checkCommandQueue, this); + } + while (!zm_terminate) { if (feof(stdout)) { @@ -546,23 +553,6 @@ void MonitorStream::runStream() { monitor->setLastViewed(); bool was_paused = paused; - bool got_command = false; // commands like zoom should output a frame even if paused - if (connkey) { - while (checkCommandQueue() && !zm_terminate) { - Debug(2, "checking command Queue for connkey: %d", connkey); - // Loop in here until all commands are processed. - Debug(2, "Have checking command Queue for connkey: %d", connkey); - got_command = true; - } - if (zm_terminate) break; - // Update modified time of the socket .lock file so that we can tell which ones are stale. - if (now - last_comm_update > Hours(1)) { - touch(sock_path_lock); - last_comm_update = now; - } - } else { - Debug(1, "No connkey"); - } // end if connkey if (!checkInitialised()) { if (!loadMonitor(monitor_id)) { if (!sendTextFrame("Not connected")) { @@ -868,7 +858,15 @@ void MonitorStream::runStream() { if (zm_terminate) Debug(1, "zm_terminate"); - closeComms(); + if (connkey) { + if (command_processor.joinable()) { + Debug(1, "command_processor is joinable"); + command_processor.join(); + } else { + Debug(1, "command_processor is not joinable"); + } + command_processor.join(); + } } // end MonitorStream::runStream void MonitorStream::SingleImage(int scale) { diff --git a/src/zm_stream.cpp b/src/zm_stream.cpp index 7f6337a18..16f30795a 100644 --- a/src/zm_stream.cpp +++ b/src/zm_stream.cpp @@ -21,6 +21,8 @@ #include "zm_box.h" #include "zm_monitor.h" +#include "zm_signal.h" + #include #include #include @@ -108,34 +110,35 @@ void StreamBase::updateFrameRate(double fps) { } } // void StreamBase::updateFrameRate(double fps) -bool StreamBase::checkCommandQueue() { - if ( sd >= 0 ) { - CmdMsg msg; - memset(&msg, 0, sizeof(msg)); - int nbytes = recvfrom(sd, &msg, sizeof(msg), MSG_DONTWAIT, 0, 0); - if ( nbytes < 0 ) { - if ( errno != EAGAIN ) { - Error("recvfrom(), errno = %d, error = %s", errno, strerror(errno)); - return false; +void StreamBase::checkCommandQueue() { + while (!zm_terminate) { + // Update modified time of the socket .lock file so that we can tell which ones are stale. + if (now - last_comm_update > Hours(1)) { + touch(sock_path_lock); + last_comm_update = now; + } + + if (sd >= 0) { + CmdMsg msg; + memset(&msg, 0, sizeof(msg)); + int nbytes = recvfrom(sd, &msg, sizeof(msg), 0, /*MSG_DONTWAIT*/ 0, 0); + if (nbytes < 0) { + if (errno != EAGAIN) { + Error("recvfrom(), errno = %d, error = %s", errno, strerror(errno)); + } + } else { + Debug(2, "Message length is (%d)", nbytes); + processCommand(&msg); + got_command = true; } + } else if (connkey) { + Warning("No sd in checkCommandQueue, comms not open for connkey %06d?", connkey); + } else { + // Perfectly valid if only getting a snapshot + Debug(1, "No sd in checkCommandQueue, comms not open."); } - //else if ( (nbytes != sizeof(msg)) ) - //{ - //Error( "Partial message received, expected %d bytes, got %d", sizeof(msg), nbytes ); - //} - else { - Debug(2, "Message length is (%d)", nbytes); - processCommand(&msg); - return true; - } - } else if ( connkey ) { - Warning("No sd in checkCommandQueue, comms not open for connkey %06d?", connkey); - } else { - // Perfectly valid if only getting a snapshot - Debug(1, "No sd in checkCommandQueue, comms not open."); - } - return false; -} // end bool StreamBase::checkCommandQueue() + } // end while !zm_terminate +} // end void StreamBase::checkCommandQueue() Image *StreamBase::prepareImage(Image *image) { /* zooming should happen before scaling to preserve quality diff --git a/src/zm_stream.h b/src/zm_stream.h index fd5a27a3e..d46e89893 100644 --- a/src/zm_stream.h +++ b/src/zm_stream.h @@ -146,6 +146,7 @@ protected: VideoStream *vid_stream; CmdMsg msg; + bool got_command = false; // commands like zoom should output a frame even if paused unsigned char *temp_img_buffer; // Used when encoding or sending file data size_t temp_img_buffer_size; @@ -155,7 +156,7 @@ protected: bool checkInitialised(); void updateFrameRate(double fps); Image *prepareImage(Image *image); - bool checkCommandQueue(); + void checkCommandQueue(); virtual void processCommand(const CmdMsg *msg)=0; public: @@ -189,6 +190,7 @@ public: frame_count(0), last_frame_count(0), frame_mod(1), + got_command(false), temp_img_buffer(nullptr), temp_img_buffer_size(0) {