introduce packetqueue::unlock to unlock packets and notify anyone waiting. Also check for iterators pointing to packets when not keeping keyframes. Loop with waiting when we can't lock a packet

This commit is contained in:
Isaac Connor
2021-04-01 11:52:25 -04:00
parent 1c9bc5c27f
commit f52ead7cda

View File

@@ -189,7 +189,6 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
// If ananlysis_it isn't at the end, we need to keep that many additional packets
int tail_count = 0;
if (pktQueue.back() != add_packet) {
Debug(1, "Ours is not the back");
packetqueue_iterator it = pktQueue.end();
--it;
while (*it != add_packet) {
@@ -208,6 +207,11 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
if (!lp->trylock()) break;
delete lp;
if (is_there_an_iterator_pointing_to_packet(zm_packet)) {
Warning("Found iterator at beginning of queue. Some thread isn't keeping up");
break;
}
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d",
@@ -530,31 +534,45 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
std::unique_lock<std::mutex> lck(mutex);
Debug(4, "Have Lock in get_packet");
while (*it == pktQueue.end()) {
ZMLockedPacket *lp = nullptr;
while (!lp) {
while (*it == pktQueue.end()) {
if (deleting or zm_terminate)
return nullptr;
Debug(2, "waiting. Queue size %d it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck);
}
if (deleting or zm_terminate)
return nullptr;
Debug(2, "waiting. Queue size %d it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
ZMPacket *p = *(*it);
if (!p) {
Error("Null p?!");
return nullptr;
}
Debug(4, "get_packet using it %p locking index %d, packet %p",
*it, p->image_index, p);
// Packets are only deleted by packetqueue, so lock must be held.
// We shouldn't have to trylock. Someone else might hold the lock but not for long
lp = new ZMLockedPacket(p);
if (lp->trylock()) {
Debug(2, "Locked packet %d, unlocking packetqueue mutex", p->image_index);
return lp;
}
delete lp;
lp = nullptr;
condition.wait(lck);
}
if (deleting or zm_terminate)
return nullptr;
Debug(4, "get_packet using it %p queue end? %d, packet %p",
*it, (*it == pktQueue.end()), *(*it));
ZMPacket *p = *(*it);
if (!p) {
Error("Null p?!");
return nullptr;
}
// Packets are only deleted by packetqueue, so lock must be held.
// We shouldn't have to trylock. Someone else might hold the lock but not for long
ZMLockedPacket *lp = new ZMLockedPacket(p);
lp->lock();
Debug(2, "Locked packet %d, unlocking packetqueue mutex", p->image_index);
return lp;
} // end while !lp
return nullptr;
} // end ZMLockedPacket *PacketQueue::get_packet(it)
void PacketQueue::unlock(ZMLockedPacket *lp) {
delete lp;
condition.notify_all();
}
bool PacketQueue::increment_it(packetqueue_iterator *it) {
Debug(2, "Incrementing %p, queue size %d, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end()));
if ( (*it) == pktQueue.end() ) {