From 53851b67c7c6f1a487d67cfba1304bb65c19f017 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Mon, 1 Mar 2021 14:00:26 -0500 Subject: [PATCH] use a Buffer to implement the simple ZM header protocol for passing packet data from ZM to RTSP Server --- src/zm_rtsp_server_fifo_source.cpp | 154 +++++++++++++++++++---------- src/zm_rtsp_server_fifo_source.h | 5 +- 2 files changed, 106 insertions(+), 53 deletions(-) diff --git a/src/zm_rtsp_server_fifo_source.cpp b/src/zm_rtsp_server_fifo_source.cpp index 7b25058ab..d7ad62ee4 100644 --- a/src/zm_rtsp_server_fifo_source.cpp +++ b/src/zm_rtsp_server_fifo_source.cpp @@ -34,19 +34,18 @@ ZoneMinderFifoSource::ZoneMinderFifoSource( memset(&m_mutex, 0, sizeof(m_mutex)); pthread_mutex_init(&m_mutex, nullptr); pthread_create(&m_thid, nullptr, threadStub, this); - m_buffer_ptr = &m_buffer[0]; } ZoneMinderFifoSource::~ZoneMinderFifoSource() { + Debug(1, "Deleting Fifo Source"); stop = 1; envir().taskScheduler().deleteEventTrigger(m_eventTriggerId); pthread_join(m_thid, nullptr); - while ( m_captureQueue.size() ) { + while (m_captureQueue.size()) { NAL_Frame * f = m_captureQueue.front(); m_captureQueue.pop_front(); delete f; } - pthread_mutex_destroy(&m_mutex); } @@ -75,13 +74,13 @@ void ZoneMinderFifoSource::doStopGettingFrames() { // deliver frame to the sink void ZoneMinderFifoSource::deliverFrame() { if (!isCurrentlyAwaitingData()) { - Debug(4, "not awaiting data"); + Debug(5, "not awaiting data"); return; } pthread_mutex_lock(&m_mutex); if (m_captureQueue.empty()) { - Debug(4, "Queue is empty"); + Debug(5, "Queue is empty"); pthread_mutex_unlock(&m_mutex); return; } @@ -101,7 +100,8 @@ void ZoneMinderFifoSource::deliverFrame() { } else { fFrameSize = nal_size; } - Debug(2, "deliverFrame timestamp: %ld.%06ld size: %d queuesize: %d", + + Debug(4, "deliverFrame timestamp: %ld.%06ld size: %d queuesize: %d", frame->m_timestamp.tv_sec, frame->m_timestamp.tv_usec, fFrameSize, m_captureQueue.size() @@ -137,68 +137,122 @@ int ZoneMinderFifoSource::getNextFrame() { } } - int bytes_in_buffer = m_buffer_ptr - &m_buffer[0]; - int bytes_read = read(m_fd, m_buffer_ptr, BUFFER_SIZE-bytes_in_buffer); - if (bytes_read == 0) + int bytes_read = m_buffer.read_into(m_fd, 4096); + if (bytes_read == 0) { + Debug(3, "No bytes read"); + sleep(1); return -1; + } if (bytes_read < 0) { Error("Problem during reading: %s", strerror(errno)); ::close(m_fd); m_fd = -1; return -1; } - bytes_in_buffer += bytes_read; - unsigned int bytes_remaining = bytes_in_buffer; - timeval tv; - gettimeofday(&tv, nullptr); + Debug(4, "%s bytes read %d bytes, buffer size %u", m_fifo.c_str(), bytes_read, m_buffer.size()); + while (m_buffer.size()) { - std::list< std::pair > framesList = this->splitFrames(m_buffer, bytes_remaining); - Debug(1, "Got %d frames, bytes remaining %d", framesList.size(), bytes_remaining); + unsigned int data_size = 0; + int64_t pts; + unsigned char *header_end = nullptr; + unsigned char *header_start = nullptr; - if ( bytes_remaining > 0 ) { - memmove(&m_buffer[0], &m_buffer[0] + ( bytes_in_buffer - bytes_remaining ), bytes_remaining); - m_buffer_ptr = &m_buffer[0] + bytes_remaining; - } else { - m_buffer_ptr = &m_buffer[0]; - } - - while (framesList.size()) { - std::pair nal = framesList.front(); - framesList.pop_front(); - - NAL_Frame *frame = new NAL_Frame(nal.first, nal.second, tv); - - pthread_mutex_lock(&m_mutex); - if (m_captureQueue.size() > 10) { - NAL_Frame * f = m_captureQueue.front(); - while (m_captureQueue.size() and ((f->m_timestamp.tv_sec - tv.tv_sec) > 2)) { - m_captureQueue.pop_front(); - delete f; - f = m_captureQueue.front(); + if ((header_start = (unsigned char *)memmem(m_buffer.head(), m_buffer.size(), "ZM", 2))) { + // next step, look for \n + header_end = (unsigned char *)memchr(header_start, '\n', m_buffer.tail()-header_start); + if (!header_end) { + // Must not have enough data. So... keep all. + Debug(1, "Didn't find newline"); + return -1; } - } -#if 0 - while ( m_captureQueue.size() >= m_queueSize ) { - Debug(2, "Queue full dropping frame %d", m_captureQueue.size()); - NAL_Frame * f = m_captureQueue.front(); - m_captureQueue.pop_front(); - delete f; - } -#endif - m_captureQueue.push_back(frame); - pthread_mutex_unlock(&m_mutex); - // post an event to ask to deliver the frame - envir().taskScheduler().triggerEvent(m_eventTriggerId, this); - } // end while we get frame from data + unsigned int header_size = header_end-header_start; + char *header = new char[header_size+1]; + header[header_size] = '\0'; + strncpy(header, reinterpret_cast(header_start), header_end-header_start); + + char *content_length_ptr = strchr(header, ' '); + if (!content_length_ptr) { + Debug(1, "Didn't find space delineating size in %s", header); + m_buffer.consume(header_start-m_buffer.head() + 2); + delete header; + return -1; + } + *content_length_ptr = '\0'; + content_length_ptr ++; + char *pts_ptr = strchr(content_length_ptr, ' '); + if (!pts_ptr) { + m_buffer.consume(header_start-m_buffer.head() + 2); + Warning("Didn't find space delineating pts"); + delete header; + return -1; + } + *pts_ptr = '\0'; + pts_ptr ++; + data_size = atoi(content_length_ptr); + pts = strtoll(pts_ptr, nullptr, 10); + delete header; + } else { + Debug(1, "ZM header not found."); + return -1; + } + Debug(4, "ZM Packet size %u pts %" PRId64, data_size, pts); + if (header_start != m_buffer) { + Debug(4, "ZM Packet didn't start at beginning of buffer %u. %c%c", + header_start-m_buffer.head(), m_buffer[0], m_buffer[1]); + } + unsigned char *packet_start = header_end+1; + unsigned int header_size = packet_start - m_buffer.head(); // includes any bytes before header + + int bytes_needed = data_size - (m_buffer.size() - header_size); + if (bytes_needed > 0) { + Debug(4, "Need another %d bytes. Trying to read them", bytes_needed); + int bytes_read = m_buffer.read_into(m_fd, bytes_needed); + if ( bytes_read != bytes_needed ) + return -1; + } + + // splitFrames modifies so make a copy + unsigned int bytes_remaining = data_size; + std::list< std::pair > framesList = this->splitFrames(packet_start, bytes_remaining); + Debug(3, "Got %d frames, consuming %d bytes", framesList.size(), header_size + data_size); + m_buffer.consume(header_size + data_size); + + timeval tv; + tv.tv_sec = pts / 1000000; + tv.tv_usec = pts % 1000000; + + while (framesList.size()) { + std::pair nal = framesList.front(); + framesList.pop_front(); + + NAL_Frame *frame = new NAL_Frame(nal.first, nal.second, tv); + Debug(3, "Got frame, size %d, queue_size %d", frame->size(), m_captureQueue.size()); + + pthread_mutex_lock(&m_mutex); + if (m_captureQueue.size() > 25) { // 1 sec at 25 fps + NAL_Frame * f = m_captureQueue.front(); + while (m_captureQueue.size() and ((f->m_timestamp.tv_sec - tv.tv_sec) > 2)) { + m_captureQueue.pop_front(); + delete f; + f = m_captureQueue.front(); + } + } + m_captureQueue.push_back(frame); + pthread_mutex_unlock(&m_mutex); + + // post an event to ask to deliver the frame + envir().taskScheduler().triggerEvent(m_eventTriggerId, this); + } // end while we get frame from data + } // end while m_buffer.size() return 1; } // split packet in frames std::list< std::pair > ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) { std::list< std::pair > frameList; - if ( frame != nullptr ) { + if (frame != nullptr) { frameList.push_back(std::pair(frame, frameSize)); } // We consume it all diff --git a/src/zm_rtsp_server_fifo_source.h b/src/zm_rtsp_server_fifo_source.h index 142946de2..990d32984 100644 --- a/src/zm_rtsp_server_fifo_source.h +++ b/src/zm_rtsp_server_fifo_source.h @@ -9,6 +9,7 @@ #ifndef ZM_RTSP_SERVER_FIFO_SOURCE_H #define ZM_RTSP_SERVER_FIFO_SOURCE_H +#include "zm_buffer.h" #include "zm_config.h" #include "zm_define.h" #include @@ -73,9 +74,7 @@ class ZoneMinderFifoSource: public FramedSource { int stop; int m_fd; - #define BUFFER_SIZE 65536 - unsigned char m_buffer[BUFFER_SIZE]; - unsigned char *m_buffer_ptr; + Buffer m_buffer; }; #endif // HAVE_RTSP_SERVER