From b686d95e71549e1fe99e524f977e7093d10c74e4 Mon Sep 17 00:00:00 2001 From: Philip Rebohle Date: Mon, 20 Jan 2025 18:54:43 +0100 Subject: [PATCH] [dxvk] Refactor CS chunk queues Introduces two queues and allows us to dispatch chunks to the ordered queue without disrupting the sequence number. --- src/d3d11/d3d11_context_imm.cpp | 2 +- src/d3d9/d3d9_device.cpp | 2 +- src/dxvk/dxvk_cs.cpp | 101 ++++++++++++++++++++++---------- src/dxvk/dxvk_cs.h | 72 +++++++++++++++++++---- 4 files changed, 132 insertions(+), 45 deletions(-) diff --git a/src/d3d11/d3d11_context_imm.cpp b/src/d3d11/d3d11_context_imm.cpp index 122a4763e..09d6add3b 100644 --- a/src/d3d11/d3d11_context_imm.cpp +++ b/src/d3d11/d3d11_context_imm.cpp @@ -924,7 +924,7 @@ namespace dxvk { bool Synchronize) { // Do not update the sequence number when emitting a chunk // from an external source since that would break tracking - m_csThread.injectChunk(std::move(Chunk), Synchronize); + m_csThread.injectChunk(DxvkCsQueue::HighPriority, std::move(Chunk), Synchronize); } diff --git a/src/d3d9/d3d9_device.cpp b/src/d3d9/d3d9_device.cpp index 46f9061bf..502625556 100644 --- a/src/d3d9/d3d9_device.cpp +++ b/src/d3d9/d3d9_device.cpp @@ -5649,7 +5649,7 @@ namespace dxvk { void D3D9DeviceEx::InjectCsChunk( DxvkCsChunkRef&& Chunk, bool Synchronize) { - m_csThread.injectChunk(std::move(Chunk), Synchronize); + m_csThread.injectChunk(DxvkCsQueue::HighPriority, std::move(Chunk), Synchronize); } diff --git a/src/dxvk/dxvk_cs.cpp b/src/dxvk/dxvk_cs.cpp index 37ed9f145..72f49ab45 100644 --- a/src/dxvk/dxvk_cs.cpp +++ b/src/dxvk/dxvk_cs.cpp @@ -118,8 +118,12 @@ namespace dxvk { uint64_t seq; { std::unique_lock lock(m_mutex); - seq = ++m_chunksDispatched; - m_chunksQueued.push_back(std::move(chunk)); + seq = ++m_queueOrdered.seqDispatch; + + auto& entry = m_queueOrdered.queue.emplace_back(); + entry.chunk = std::move(chunk); + entry.seq = seq; + m_condOnAdd.notify_one(); } @@ -127,42 +131,53 @@ namespace dxvk { } - void DxvkCsThread::injectChunk(DxvkCsChunkRef&& chunk, bool synchronize) { - uint64_t timeline; + void DxvkCsThread::injectChunk(DxvkCsQueue queue, DxvkCsChunkRef&& chunk, bool synchronize) { + uint64_t timeline = 0u; { std::unique_lock lock(m_mutex); + auto& q = getQueue(queue); - timeline = ++m_chunksInjectedCount; - m_chunksInjected.push_back(std::move(chunk)); + if (synchronize) + timeline = ++q.seqDispatch; + + auto& entry = q.queue.emplace_back(); + entry.chunk = std::move(chunk); + entry.seq = timeline; m_condOnAdd.notify_one(); + + if (queue == DxvkCsQueue::HighPriority) { + // Worker will check this flag after executing any + // chunk without causing additional lock contention + m_hasHighPrio.store(true, std::memory_order_release); + } } if (synchronize) { std::unique_lock lock(m_counterMutex); - m_condOnSync.wait(lock, [this, timeline] { - return m_chunksInjectedComplete.load() >= timeline; + m_condOnSync.wait(lock, [this, queue, timeline] { + return getCounter(queue).load(std::memory_order_acquire) >= timeline; }); } } - - + + void DxvkCsThread::synchronize(uint64_t seq) { // Avoid locking if we know the sync is a no-op, may // reduce overhead if this is being called frequently - if (seq > m_chunksExecuted.load(std::memory_order_acquire)) { + if (seq > m_seqOrdered.load(std::memory_order_acquire)) { // We don't need to lock the queue here, if synchronization // happens while another thread is submitting then there is // an inherent race anyway if (seq == SynchronizeAll) - seq = m_chunksDispatched.load(); + seq = m_queueOrdered.seqDispatch; auto t0 = dxvk::high_resolution_clock::now(); { std::unique_lock lock(m_counterMutex); m_condOnSync.wait(lock, [this, seq] { - return m_chunksExecuted.load() >= seq; + return m_seqOrdered.load(std::memory_order_acquire) >= seq; }); } @@ -178,45 +193,69 @@ namespace dxvk { void DxvkCsThread::threadFunc() { env::setThreadName("dxvk-cs"); - // Local chunk queue, we use two queues and swap between + // Local chunk queues, we use two queues and swap between // them in order to potentially reduce lock contention. - std::vector chunks; + std::vector ordered; + std::vector highPrio; try { while (!m_stopped.load()) { - bool injected = false; - { std::unique_lock lock(m_mutex); m_condOnAdd.wait(lock, [this] { - return (!m_chunksQueued.empty()) - || (!m_chunksInjected.empty()) + return (!m_queueOrdered.queue.empty()) + || (!m_queueHighPrio.queue.empty()) || (m_stopped.load()); }); - injected = !m_chunksInjected.empty(); - std::swap(chunks, injected ? m_chunksInjected : m_chunksQueued); + std::swap(ordered, m_queueOrdered.queue); + std::swap(highPrio, m_queueHighPrio.queue); + + m_hasHighPrio.store(false, std::memory_order_release); } - for (auto& chunk : chunks) { + size_t orderedIndex = 0u; + size_t highPrioIndex = 0u; + + while (highPrioIndex < highPrio.size() || orderedIndex < ordered.size()) { + // Re-fill local high-priority queue if the app has queued anything up + // in the meantime, we want to reduce possible synchronization delays. + if (highPrioIndex >= highPrio.size() && m_hasHighPrio.load(std::memory_order_acquire)) { + highPrio.clear(); + highPrioIndex = 0u; + + std::unique_lock lock(m_mutex); + std::swap(highPrio, m_queueHighPrio.queue); + + m_hasHighPrio.store(false, std::memory_order_release); + } + + // Drain high-priority queue first + bool isHighPrio = highPrioIndex < highPrio.size(); + auto& entry = isHighPrio ? highPrio[highPrioIndex++] : ordered[orderedIndex++]; + m_context->addStatCtr(DxvkStatCounter::CsChunkCount, 1); - chunk->executeAll(m_context.ptr()); + entry.chunk->executeAll(m_context.ptr()); + + if (entry.seq) { + // Use a separate mutex for the chunk counter, this will only + // ever be contested if synchronization is actually necessary. + std::lock_guard lock(m_counterMutex); + + auto& counter = isHighPrio ? m_seqHighPrio : m_seqOrdered; + counter.store(entry.seq, std::memory_order_release); - // Use a separate mutex for the chunk counter, this - // will only ever be contested if synchronization is - // actually necessary. - { std::unique_lock lock(m_counterMutex); - (injected ? m_chunksInjectedComplete : m_chunksExecuted) += 1u; m_condOnSync.notify_one(); } - // Explicitly free chunk here to release + // Immediately free the chunk to release // references to any resources held by it - chunk = DxvkCsChunkRef(); + entry.chunk = DxvkCsChunkRef(); } - chunks.clear(); + ordered.clear(); + highPrio.clear(); } } catch (const DxvkError& e) { Logger::err("Exception on CS thread!"); diff --git a/src/dxvk/dxvk_cs.h b/src/dxvk/dxvk_cs.h index 463cd7424..ca7d85ba4 100644 --- a/src/dxvk/dxvk_cs.h +++ b/src/dxvk/dxvk_cs.h @@ -377,6 +377,36 @@ namespace dxvk { }; + /** + * \brief Queue type + */ + enum class DxvkCsQueue : uint32_t { + Ordered = 0, /// Normal queue with ordering guarantees + HighPriority = 1, /// High-priority queue + }; + + + /** + * \brief Queued chunk entry + */ + struct DxvkCsQueuedChunk { + DxvkCsChunkRef chunk; + uint64_t seq; + }; + + + /** + * \brief Chunk queue + * + * Stores queued chunks as well as the sequence + * counters for synchronization. + */ + struct DxvkCsChunkQueue { + std::vector queue; + uint64_t seqDispatch = 0u; + }; + + /** * \brief Command stream thread * @@ -412,10 +442,14 @@ namespace dxvk { * commands. The context can still be safely accessed, but chunks * will not be executed in any particular oder. These chunks also * do not contribute to the main timeline. + * \param [in] queue Which queue to add the chunk to * \param [in] chunk The chunk to dispatch * \param [in] synchronize Whether to wait for execution to complete */ - void injectChunk(DxvkCsChunkRef&& chunk, bool synchronize); + void injectChunk( + DxvkCsQueue queue, + DxvkCsChunkRef&& chunk, + bool synchronize); /** * \brief Synchronizes with the thread @@ -435,29 +469,43 @@ namespace dxvk { * \returns Sequence number of last executed chunk */ uint64_t lastSequenceNumber() const { - return m_chunksExecuted.load(); + return m_seqOrdered.load(std::memory_order_acquire); } private: - + Rc m_device; Rc m_context; + alignas(CACHE_LINE_SIZE) dxvk::mutex m_counterMutex; - std::atomic m_chunksDispatched = { 0ull }; - std::atomic m_chunksExecuted = { 0ull }; - std::atomic m_chunksInjectedCount = { 0ull }; - std::atomic m_chunksInjectedComplete = { 0ull }; - - std::atomic m_stopped = { false }; + std::atomic m_seqHighPrio = { 0u }; + std::atomic m_seqOrdered = { 0u }; + + std::atomic m_stopped = { false }; + std::atomic m_hasHighPrio = { false }; + + alignas(CACHE_LINE_SIZE) dxvk::mutex m_mutex; dxvk::condition_variable m_condOnAdd; dxvk::condition_variable m_condOnSync; - std::vector m_chunksQueued; - std::vector m_chunksInjected; + + DxvkCsChunkQueue m_queueOrdered; + DxvkCsChunkQueue m_queueHighPrio; + dxvk::thread m_thread; - + + auto& getQueue(DxvkCsQueue which) { + return which == DxvkCsQueue::Ordered + ? m_queueOrdered : m_queueHighPrio; + } + + auto& getCounter(DxvkCsQueue which) { + return which == DxvkCsQueue::Ordered + ? m_seqOrdered : m_seqHighPrio; + } + void threadFunc(); };