use a Buffer to implement the simple ZM header protocol for passing packet data from ZM to RTSP Server

This commit is contained in:
Isaac Connor
2021-03-01 14:00:26 -05:00
parent c716e8c149
commit 53851b67c7
2 changed files with 106 additions and 53 deletions

View File

@@ -34,19 +34,18 @@ ZoneMinderFifoSource::ZoneMinderFifoSource(
memset(&m_mutex, 0, sizeof(m_mutex));
pthread_mutex_init(&m_mutex, nullptr);
pthread_create(&m_thid, nullptr, threadStub, this);
m_buffer_ptr = &m_buffer[0];
}
ZoneMinderFifoSource::~ZoneMinderFifoSource() {
Debug(1, "Deleting Fifo Source");
stop = 1;
envir().taskScheduler().deleteEventTrigger(m_eventTriggerId);
pthread_join(m_thid, nullptr);
while ( m_captureQueue.size() ) {
while (m_captureQueue.size()) {
NAL_Frame * f = m_captureQueue.front();
m_captureQueue.pop_front();
delete f;
}
pthread_mutex_destroy(&m_mutex);
}
@@ -75,13 +74,13 @@ void ZoneMinderFifoSource::doStopGettingFrames() {
// deliver frame to the sink
void ZoneMinderFifoSource::deliverFrame() {
if (!isCurrentlyAwaitingData()) {
Debug(4, "not awaiting data");
Debug(5, "not awaiting data");
return;
}
pthread_mutex_lock(&m_mutex);
if (m_captureQueue.empty()) {
Debug(4, "Queue is empty");
Debug(5, "Queue is empty");
pthread_mutex_unlock(&m_mutex);
return;
}
@@ -101,7 +100,8 @@ void ZoneMinderFifoSource::deliverFrame() {
} else {
fFrameSize = nal_size;
}
Debug(2, "deliverFrame timestamp: %ld.%06ld size: %d queuesize: %d",
Debug(4, "deliverFrame timestamp: %ld.%06ld size: %d queuesize: %d",
frame->m_timestamp.tv_sec, frame->m_timestamp.tv_usec,
fFrameSize,
m_captureQueue.size()
@@ -137,68 +137,122 @@ int ZoneMinderFifoSource::getNextFrame() {
}
}
int bytes_in_buffer = m_buffer_ptr - &m_buffer[0];
int bytes_read = read(m_fd, m_buffer_ptr, BUFFER_SIZE-bytes_in_buffer);
if (bytes_read == 0)
int bytes_read = m_buffer.read_into(m_fd, 4096);
if (bytes_read == 0) {
Debug(3, "No bytes read");
sleep(1);
return -1;
}
if (bytes_read < 0) {
Error("Problem during reading: %s", strerror(errno));
::close(m_fd);
m_fd = -1;
return -1;
}
bytes_in_buffer += bytes_read;
unsigned int bytes_remaining = bytes_in_buffer;
timeval tv;
gettimeofday(&tv, nullptr);
Debug(4, "%s bytes read %d bytes, buffer size %u", m_fifo.c_str(), bytes_read, m_buffer.size());
while (m_buffer.size()) {
std::list< std::pair<unsigned char*, size_t> > framesList = this->splitFrames(m_buffer, bytes_remaining);
Debug(1, "Got %d frames, bytes remaining %d", framesList.size(), bytes_remaining);
unsigned int data_size = 0;
int64_t pts;
unsigned char *header_end = nullptr;
unsigned char *header_start = nullptr;
if ( bytes_remaining > 0 ) {
memmove(&m_buffer[0], &m_buffer[0] + ( bytes_in_buffer - bytes_remaining ), bytes_remaining);
m_buffer_ptr = &m_buffer[0] + bytes_remaining;
} else {
m_buffer_ptr = &m_buffer[0];
}
while (framesList.size()) {
std::pair<unsigned char*, size_t> nal = framesList.front();
framesList.pop_front();
NAL_Frame *frame = new NAL_Frame(nal.first, nal.second, tv);
pthread_mutex_lock(&m_mutex);
if (m_captureQueue.size() > 10) {
NAL_Frame * f = m_captureQueue.front();
while (m_captureQueue.size() and ((f->m_timestamp.tv_sec - tv.tv_sec) > 2)) {
m_captureQueue.pop_front();
delete f;
f = m_captureQueue.front();
if ((header_start = (unsigned char *)memmem(m_buffer.head(), m_buffer.size(), "ZM", 2))) {
// next step, look for \n
header_end = (unsigned char *)memchr(header_start, '\n', m_buffer.tail()-header_start);
if (!header_end) {
// Must not have enough data. So... keep all.
Debug(1, "Didn't find newline");
return -1;
}
}
#if 0
while ( m_captureQueue.size() >= m_queueSize ) {
Debug(2, "Queue full dropping frame %d", m_captureQueue.size());
NAL_Frame * f = m_captureQueue.front();
m_captureQueue.pop_front();
delete f;
}
#endif
m_captureQueue.push_back(frame);
pthread_mutex_unlock(&m_mutex);
// post an event to ask to deliver the frame
envir().taskScheduler().triggerEvent(m_eventTriggerId, this);
} // end while we get frame from data
unsigned int header_size = header_end-header_start;
char *header = new char[header_size+1];
header[header_size] = '\0';
strncpy(header, reinterpret_cast<const char *>(header_start), header_end-header_start);
char *content_length_ptr = strchr(header, ' ');
if (!content_length_ptr) {
Debug(1, "Didn't find space delineating size in %s", header);
m_buffer.consume(header_start-m_buffer.head() + 2);
delete header;
return -1;
}
*content_length_ptr = '\0';
content_length_ptr ++;
char *pts_ptr = strchr(content_length_ptr, ' ');
if (!pts_ptr) {
m_buffer.consume(header_start-m_buffer.head() + 2);
Warning("Didn't find space delineating pts");
delete header;
return -1;
}
*pts_ptr = '\0';
pts_ptr ++;
data_size = atoi(content_length_ptr);
pts = strtoll(pts_ptr, nullptr, 10);
delete header;
} else {
Debug(1, "ZM header not found.");
return -1;
}
Debug(4, "ZM Packet size %u pts %" PRId64, data_size, pts);
if (header_start != m_buffer) {
Debug(4, "ZM Packet didn't start at beginning of buffer %u. %c%c",
header_start-m_buffer.head(), m_buffer[0], m_buffer[1]);
}
unsigned char *packet_start = header_end+1;
unsigned int header_size = packet_start - m_buffer.head(); // includes any bytes before header
int bytes_needed = data_size - (m_buffer.size() - header_size);
if (bytes_needed > 0) {
Debug(4, "Need another %d bytes. Trying to read them", bytes_needed);
int bytes_read = m_buffer.read_into(m_fd, bytes_needed);
if ( bytes_read != bytes_needed )
return -1;
}
// splitFrames modifies so make a copy
unsigned int bytes_remaining = data_size;
std::list< std::pair<unsigned char*, size_t> > framesList = this->splitFrames(packet_start, bytes_remaining);
Debug(3, "Got %d frames, consuming %d bytes", framesList.size(), header_size + data_size);
m_buffer.consume(header_size + data_size);
timeval tv;
tv.tv_sec = pts / 1000000;
tv.tv_usec = pts % 1000000;
while (framesList.size()) {
std::pair<unsigned char*, size_t> nal = framesList.front();
framesList.pop_front();
NAL_Frame *frame = new NAL_Frame(nal.first, nal.second, tv);
Debug(3, "Got frame, size %d, queue_size %d", frame->size(), m_captureQueue.size());
pthread_mutex_lock(&m_mutex);
if (m_captureQueue.size() > 25) { // 1 sec at 25 fps
NAL_Frame * f = m_captureQueue.front();
while (m_captureQueue.size() and ((f->m_timestamp.tv_sec - tv.tv_sec) > 2)) {
m_captureQueue.pop_front();
delete f;
f = m_captureQueue.front();
}
}
m_captureQueue.push_back(frame);
pthread_mutex_unlock(&m_mutex);
// post an event to ask to deliver the frame
envir().taskScheduler().triggerEvent(m_eventTriggerId, this);
} // end while we get frame from data
} // end while m_buffer.size()
return 1;
}
// split packet in frames
std::list< std::pair<unsigned char*,size_t> > ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) {
std::list< std::pair<unsigned char*,size_t> > frameList;
if ( frame != nullptr ) {
if (frame != nullptr) {
frameList.push_back(std::pair<unsigned char*,size_t>(frame, frameSize));
}
// We consume it all

View File

@@ -9,6 +9,7 @@
#ifndef ZM_RTSP_SERVER_FIFO_SOURCE_H
#define ZM_RTSP_SERVER_FIFO_SOURCE_H
#include "zm_buffer.h"
#include "zm_config.h"
#include "zm_define.h"
#include <list>
@@ -73,9 +74,7 @@ class ZoneMinderFifoSource: public FramedSource {
int stop;
int m_fd;
#define BUFFER_SIZE 65536
unsigned char m_buffer[BUFFER_SIZE];
unsigned char *m_buffer_ptr;
Buffer m_buffer;
};
#endif // HAVE_RTSP_SERVER