diff --git a/plugins/decklink/decklink-device-instance.cpp b/plugins/decklink/decklink-device-instance.cpp index 9eb8ec941..8727d12e3 100644 --- a/plugins/decklink/decklink-device-instance.cpp +++ b/plugins/decklink/decklink-device-instance.cpp @@ -613,24 +613,31 @@ bool DeckLinkDeviceInstance::StartOutput(DeckLinkDeviceMode *mode_) } } - frameData.clear(); + frameQueueDecklinkToObs.reset(); + frameQueueObsToDecklink.reset(); + + const int rowSize = decklinkOutput->GetWidth() * 4; + const int frameSize = rowSize * decklinkOutput->GetHeight(); + for (std::vector &blob : frameBlobs) { + blob.assign(frameSize, 0); + frameQueueDecklinkToObs.push(blob.data()); + } + activeBlob = nullptr; + const int64_t minimumPrerollFrames = std::max(device->GetMinimumPrerollFrames(), INT64_C(3)); for (int64_t i = 0; i < minimumPrerollFrames; ++i) { ComPtr decklinkOutputFrame; HRESULT result = output_->CreateVideoFrame( decklinkOutput->GetWidth(), decklinkOutput->GetHeight(), - decklinkOutput->GetWidth() * 4, bmdFormat8BitBGRA, - bmdFrameFlagDefault, &decklinkOutputFrame); + rowSize, bmdFormat8BitBGRA, bmdFrameFlagDefault, + &decklinkOutputFrame); if (result != S_OK) { blog(LOG_ERROR, "failed to create video frame 0x%X", result); return false; } - const long size = decklinkOutputFrame->GetRowBytes() * - decklinkOutputFrame->GetHeight(); - frameData.resize(size); result = output_->ScheduleVideoFrame(decklinkOutputFrame, i * frameDuration, frameDuration, @@ -669,6 +676,8 @@ bool DeckLinkDeviceInstance::StopOutput() output->DisableAudioOutput(); output.Clear(); renderDelegate.Clear(); + frameQueueDecklinkToObs.reset(); + frameQueueObsToDecklink.reset(); return true; } @@ -679,26 +688,36 @@ void DeckLinkDeviceInstance::UpdateVideoFrame(video_data *frame) if (decklinkOutput == nullptr) return; - std::lock_guard lock(frameDataMutex); - const uint8_t *const outData = frame->data[0]; - frameData.assign(outData, - outData + decklinkOutput->GetWidth() * - decklinkOutput->GetHeight() * 4); + uint8_t *const blob = frameQueueDecklinkToObs.pop(); + if (blob) { + memcpy(blob, frame->data[0], + frame->linesize[0] * decklinkOutput->GetHeight()); + frameQueueObsToDecklink.push(blob); + } } void DeckLinkDeviceInstance::ScheduleVideoFrame(IDeckLinkVideoFrame *frame) { void *bytes; if (SUCCEEDED(frame->GetBytes(&bytes))) { - { - std::lock_guard lock(frameDataMutex); - memcpy(bytes, frameData.data(), - frame->GetRowBytes() * frame->GetHeight()); + uint8_t *blob = frameQueueObsToDecklink.pop(); + if (blob) { + if (activeBlob) + frameQueueDecklinkToObs.push(activeBlob); + activeBlob = blob; + } else { + blob = activeBlob; } - output->ScheduleVideoFrame( - frame, (totalFramesScheduled * frameDuration), - frameDuration, frameTimescale); + const int frameSize = frame->GetRowBytes() * frame->GetHeight(); + if (blob) + memcpy(bytes, blob, frameSize); + else + memset(bytes, 0, frameSize); + + output->ScheduleVideoFrame(frame, + totalFramesScheduled * frameDuration, + frameDuration, frameTimescale); ++totalFramesScheduled; } } diff --git a/plugins/decklink/decklink-device-instance.hpp b/plugins/decklink/decklink-device-instance.hpp index 47596fc8d..0eda6e17c 100644 --- a/plugins/decklink/decklink-device-instance.hpp +++ b/plugins/decklink/decklink-device-instance.hpp @@ -8,7 +8,6 @@ #include "decklink-device.hpp" #include "OBSVideoFrame.h" #include -#include #include class AudioRepacker; @@ -38,6 +37,104 @@ public: virtual HRESULT STDMETHODCALLTYPE ScheduledPlaybackHasStopped(); }; +/* +Unbounded SPSC Queue with modifications: +https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue +- Convert to bounded. Fixed node cache is part of the class layout. +- Queue doesn't handle push failure because it should never be full. +- Templated type has been replaced with a hard-coded type. +The license text has been copied below. +Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. +THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED +WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT +SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +The views and conclusions contained in the software and documentation are those +of the authors and should not be interpreted as representing official policies, +either expressed or implied, of Dmitry Vyukov. +*/ +static constexpr size_t FrameQueueFrameCount = 3; +class FrameQueue { +private: + /* Intel may grab two 64-byte cache lines */ + static constexpr size_t FalseSharingSize = 128; + + struct Node { + std::atomic next = nullptr; + uint8_t *frame = nullptr; + }; + + struct alignas(FalseSharingSize) PaddedNode { + Node node; + uint8_t padding[FalseSharingSize - sizeof(struct Node)]{}; + }; + + PaddedNode cache[FrameQueueFrameCount + 1]; + + alignas(FalseSharingSize) Node *front; + alignas(FalseSharingSize) Node *back; + Node *cache_list; + +public: + FrameQueue() { reset(); } + + void reset() + { + for (size_t i = 0; i < FrameQueueFrameCount; ++i) { + cache[i].node.next.store(&cache[i + 1].node, + std::memory_order_relaxed); + } + + Node &last = cache[FrameQueueFrameCount].node; + last.next.store(nullptr, std::memory_order_relaxed); + last.frame = nullptr; + + front = &last; + back = &last; + cache_list = &cache[0].node; + } + + void push(uint8_t *v) + { + Node *const n = cache_list; + cache_list = cache_list->next.load(std::memory_order_relaxed); + + n->next.store(nullptr, std::memory_order_relaxed); + n->frame = v; + + back->next.store(n, std::memory_order_release); + + back = n; + } + + uint8_t *pop() + { + uint8_t *frame = nullptr; + + Node *const n_front = + front->next.load(std::memory_order_consume); + if (n_front != nullptr) { + frame = n_front->frame; + front = n_front; + } + + return frame; + } +}; + class DeckLinkDeviceInstance : public IDeckLinkInputCallback { protected: ComPtr deckLinkConfiguration; @@ -66,8 +163,10 @@ protected: bool allow10Bit; OBSVideoFrame *convertFrame = nullptr; - std::mutex frameDataMutex; - std::vector frameData; + std::vector frameBlobs[FrameQueueFrameCount]; + FrameQueue frameQueueObsToDecklink; + FrameQueue frameQueueDecklinkToObs; + uint8_t *activeBlob = nullptr; BMDTimeValue frameDuration; BMDTimeScale frameTimescale; BMDTimeScale totalFramesScheduled;