Make checkCommandQueue it's own thread, meaning we use blocking I/O and will still respond when it takes forever to send a jpeg. Make warning about broken stream into a Debug. The signal doesn't set zm_terminate fast enough.

This commit is contained in:
Isaac Connor
2022-08-15 17:58:56 -04:00
parent 071147fba7
commit aa030afcb6
4 changed files with 63 additions and 60 deletions

View File

@@ -842,25 +842,17 @@ void EventStream::runStream() {
SystemTimePoint::duration last_frame_offset = Seconds(0);
SystemTimePoint::duration time_to_event = Seconds(0);
std::thread command_processor;
if (connkey) {
command_processor = std::thread(&EventStream::checkCommandQueue, this);
}
while ( !zm_terminate ) {
now = std::chrono::steady_clock::now();
Microseconds delta = Microseconds(0);
send_frame = false;
if ( connkey ) {
// commands may set send_frame to true
while ( checkCommandQueue() && !zm_terminate ) {
// The idea is to loop here processing all commands before proceeding.
}
// Update modified time of the socket .lock file so that we can tell which ones are stale.
if (now - last_comm_update > Hours(1)) {
touch(sock_path_lock);
last_comm_update = now;
}
}
// Get current frame data
FrameData *frame_data = &event_data->frames[curr_frame_id-1];
@@ -1075,7 +1067,15 @@ void EventStream::runStream() {
delete vid_stream;
}
closeComms();
if (connkey) {
if (command_processor.joinable()) {
Debug(1, "command_processor is joinable");
command_processor.join();
} else {
Debug(1, "command_processor is not joinable");
}
command_processor.join();
}
} // end void EventStream::runStream()
bool EventStream::send_file(const std::string &filepath) {

View File

@@ -429,7 +429,8 @@ bool MonitorStream::sendFrame(Image *image, SystemTimePoint timestamp) {
) {
if (!zm_terminate) {
// If the pipe was closed, we will get signalled SIGPIPE to exit, which will set zm_terminate
Warning("Unable to send stream frame: %s", strerror(errno));
// ICON: zm_terminate might not get set yet. Make it a debug
Debug(1, "Unable to send stream frame: %s", strerror(errno));
}
return false;
}
@@ -532,6 +533,12 @@ void MonitorStream::runStream() {
} else {
Debug(2, "Not using playback_buffer");
} // end if connkey && playback_buffer
std::thread command_processor;
if (connkey) {
command_processor = std::thread(&MonitorStream::checkCommandQueue, this);
}
while (!zm_terminate) {
if (feof(stdout)) {
@@ -546,23 +553,6 @@ void MonitorStream::runStream() {
monitor->setLastViewed();
bool was_paused = paused;
bool got_command = false; // commands like zoom should output a frame even if paused
if (connkey) {
while (checkCommandQueue() && !zm_terminate) {
Debug(2, "checking command Queue for connkey: %d", connkey);
// Loop in here until all commands are processed.
Debug(2, "Have checking command Queue for connkey: %d", connkey);
got_command = true;
}
if (zm_terminate) break;
// Update modified time of the socket .lock file so that we can tell which ones are stale.
if (now - last_comm_update > Hours(1)) {
touch(sock_path_lock);
last_comm_update = now;
}
} else {
Debug(1, "No connkey");
} // end if connkey
if (!checkInitialised()) {
if (!loadMonitor(monitor_id)) {
if (!sendTextFrame("Not connected")) {
@@ -868,7 +858,15 @@ void MonitorStream::runStream() {
if (zm_terminate)
Debug(1, "zm_terminate");
closeComms();
if (connkey) {
if (command_processor.joinable()) {
Debug(1, "command_processor is joinable");
command_processor.join();
} else {
Debug(1, "command_processor is not joinable");
}
command_processor.join();
}
} // end MonitorStream::runStream
void MonitorStream::SingleImage(int scale) {

View File

@@ -21,6 +21,8 @@
#include "zm_box.h"
#include "zm_monitor.h"
#include "zm_signal.h"
#include <cmath>
#include <sys/file.h>
#include <sys/socket.h>
@@ -108,34 +110,35 @@ void StreamBase::updateFrameRate(double fps) {
}
} // void StreamBase::updateFrameRate(double fps)
bool StreamBase::checkCommandQueue() {
if ( sd >= 0 ) {
CmdMsg msg;
memset(&msg, 0, sizeof(msg));
int nbytes = recvfrom(sd, &msg, sizeof(msg), MSG_DONTWAIT, 0, 0);
if ( nbytes < 0 ) {
if ( errno != EAGAIN ) {
Error("recvfrom(), errno = %d, error = %s", errno, strerror(errno));
return false;
void StreamBase::checkCommandQueue() {
while (!zm_terminate) {
// Update modified time of the socket .lock file so that we can tell which ones are stale.
if (now - last_comm_update > Hours(1)) {
touch(sock_path_lock);
last_comm_update = now;
}
if (sd >= 0) {
CmdMsg msg;
memset(&msg, 0, sizeof(msg));
int nbytes = recvfrom(sd, &msg, sizeof(msg), 0, /*MSG_DONTWAIT*/ 0, 0);
if (nbytes < 0) {
if (errno != EAGAIN) {
Error("recvfrom(), errno = %d, error = %s", errno, strerror(errno));
}
} else {
Debug(2, "Message length is (%d)", nbytes);
processCommand(&msg);
got_command = true;
}
} else if (connkey) {
Warning("No sd in checkCommandQueue, comms not open for connkey %06d?", connkey);
} else {
// Perfectly valid if only getting a snapshot
Debug(1, "No sd in checkCommandQueue, comms not open.");
}
//else if ( (nbytes != sizeof(msg)) )
//{
//Error( "Partial message received, expected %d bytes, got %d", sizeof(msg), nbytes );
//}
else {
Debug(2, "Message length is (%d)", nbytes);
processCommand(&msg);
return true;
}
} else if ( connkey ) {
Warning("No sd in checkCommandQueue, comms not open for connkey %06d?", connkey);
} else {
// Perfectly valid if only getting a snapshot
Debug(1, "No sd in checkCommandQueue, comms not open.");
}
return false;
} // end bool StreamBase::checkCommandQueue()
} // end while !zm_terminate
} // end void StreamBase::checkCommandQueue()
Image *StreamBase::prepareImage(Image *image) {
/* zooming should happen before scaling to preserve quality

View File

@@ -146,6 +146,7 @@ protected:
VideoStream *vid_stream;
CmdMsg msg;
bool got_command = false; // commands like zoom should output a frame even if paused
unsigned char *temp_img_buffer; // Used when encoding or sending file data
size_t temp_img_buffer_size;
@@ -155,7 +156,7 @@ protected:
bool checkInitialised();
void updateFrameRate(double fps);
Image *prepareImage(Image *image);
bool checkCommandQueue();
void checkCommandQueue();
virtual void processCommand(const CmdMsg *msg)=0;
public:
@@ -189,6 +190,7 @@ public:
frame_count(0),
last_frame_count(0),
frame_mod(1),
got_command(false),
temp_img_buffer(nullptr),
temp_img_buffer_size(0)
{