From f52ead7cdae24d58e02766013c4fd93d0ec6ef26 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Thu, 1 Apr 2021 11:52:25 -0400 Subject: [PATCH] introduce packetqueue::unlock to unlock packets and notify anyone waiting. Also check for iterators pointing to packets when not keeping keyframes. Loop with waiting when we can't lock a packet --- src/zm_packetqueue.cpp | 60 +++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/src/zm_packetqueue.cpp b/src/zm_packetqueue.cpp index 234fb2b1c..c6bc273e1 100644 --- a/src/zm_packetqueue.cpp +++ b/src/zm_packetqueue.cpp @@ -189,7 +189,6 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) { // If ananlysis_it isn't at the end, we need to keep that many additional packets int tail_count = 0; if (pktQueue.back() != add_packet) { - Debug(1, "Ours is not the back"); packetqueue_iterator it = pktQueue.end(); --it; while (*it != add_packet) { @@ -208,6 +207,11 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) { if (!lp->trylock()) break; delete lp; + if (is_there_an_iterator_pointing_to_packet(zm_packet)) { + Warning("Found iterator at beginning of queue. Some thread isn't keeping up"); + break; + } + pktQueue.pop_front(); packet_counts[zm_packet->packet.stream_index] -= 1; Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d", @@ -530,31 +534,45 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) { std::unique_lock lck(mutex); Debug(4, "Have Lock in get_packet"); - while (*it == pktQueue.end()) { + ZMLockedPacket *lp = nullptr; + while (!lp) { + while (*it == pktQueue.end()) { + if (deleting or zm_terminate) + return nullptr; + Debug(2, "waiting. Queue size %d it == end? %d", pktQueue.size(), (*it == pktQueue.end())); + condition.wait(lck); + } if (deleting or zm_terminate) return nullptr; - Debug(2, "waiting. Queue size %d it == end? %d", pktQueue.size(), (*it == pktQueue.end())); + + ZMPacket *p = *(*it); + if (!p) { + Error("Null p?!"); + return nullptr; + } + Debug(4, "get_packet using it %p locking index %d, packet %p", + *it, p->image_index, p); + // Packets are only deleted by packetqueue, so lock must be held. + // We shouldn't have to trylock. Someone else might hold the lock but not for long + + lp = new ZMLockedPacket(p); + if (lp->trylock()) { + Debug(2, "Locked packet %d, unlocking packetqueue mutex", p->image_index); + + return lp; + } + delete lp; + lp = nullptr; condition.wait(lck); - } - if (deleting or zm_terminate) - return nullptr; - - Debug(4, "get_packet using it %p queue end? %d, packet %p", - *it, (*it == pktQueue.end()), *(*it)); - ZMPacket *p = *(*it); - if (!p) { - Error("Null p?!"); - return nullptr; - } - // Packets are only deleted by packetqueue, so lock must be held. - // We shouldn't have to trylock. Someone else might hold the lock but not for long - - ZMLockedPacket *lp = new ZMLockedPacket(p); - lp->lock(); - Debug(2, "Locked packet %d, unlocking packetqueue mutex", p->image_index); - return lp; + } // end while !lp + return nullptr; } // end ZMLockedPacket *PacketQueue::get_packet(it) +void PacketQueue::unlock(ZMLockedPacket *lp) { + delete lp; + condition.notify_all(); +} + bool PacketQueue::increment_it(packetqueue_iterator *it) { Debug(2, "Incrementing %p, queue size %d, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end())); if ( (*it) == pktQueue.end() ) {