1
0
mirror of https://github.com/doitsujin/dxvk.git synced 2025-02-24 04:54:14 +01:00

[dxvk] Refactor CS chunk queues

Introduces two queues and allows us to dispatch chunks to the ordered
queue without disrupting the sequence number.
This commit is contained in:
Philip Rebohle 2025-01-20 18:54:43 +01:00
parent 95e2635397
commit b686d95e71
4 changed files with 132 additions and 45 deletions

View File

@ -924,7 +924,7 @@ namespace dxvk {
bool Synchronize) { bool Synchronize) {
// Do not update the sequence number when emitting a chunk // Do not update the sequence number when emitting a chunk
// from an external source since that would break tracking // from an external source since that would break tracking
m_csThread.injectChunk(std::move(Chunk), Synchronize); m_csThread.injectChunk(DxvkCsQueue::HighPriority, std::move(Chunk), Synchronize);
} }

View File

@ -5649,7 +5649,7 @@ namespace dxvk {
void D3D9DeviceEx::InjectCsChunk( void D3D9DeviceEx::InjectCsChunk(
DxvkCsChunkRef&& Chunk, DxvkCsChunkRef&& Chunk,
bool Synchronize) { bool Synchronize) {
m_csThread.injectChunk(std::move(Chunk), Synchronize); m_csThread.injectChunk(DxvkCsQueue::HighPriority, std::move(Chunk), Synchronize);
} }

View File

@ -118,8 +118,12 @@ namespace dxvk {
uint64_t seq; uint64_t seq;
{ std::unique_lock<dxvk::mutex> lock(m_mutex); { std::unique_lock<dxvk::mutex> lock(m_mutex);
seq = ++m_chunksDispatched; seq = ++m_queueOrdered.seqDispatch;
m_chunksQueued.push_back(std::move(chunk));
auto& entry = m_queueOrdered.queue.emplace_back();
entry.chunk = std::move(chunk);
entry.seq = seq;
m_condOnAdd.notify_one(); m_condOnAdd.notify_one();
} }
@ -127,42 +131,53 @@ namespace dxvk {
} }
void DxvkCsThread::injectChunk(DxvkCsChunkRef&& chunk, bool synchronize) { void DxvkCsThread::injectChunk(DxvkCsQueue queue, DxvkCsChunkRef&& chunk, bool synchronize) {
uint64_t timeline; uint64_t timeline = 0u;
{ std::unique_lock<dxvk::mutex> lock(m_mutex); { std::unique_lock<dxvk::mutex> lock(m_mutex);
auto& q = getQueue(queue);
timeline = ++m_chunksInjectedCount; if (synchronize)
m_chunksInjected.push_back(std::move(chunk)); timeline = ++q.seqDispatch;
auto& entry = q.queue.emplace_back();
entry.chunk = std::move(chunk);
entry.seq = timeline;
m_condOnAdd.notify_one(); m_condOnAdd.notify_one();
if (queue == DxvkCsQueue::HighPriority) {
// Worker will check this flag after executing any
// chunk without causing additional lock contention
m_hasHighPrio.store(true, std::memory_order_release);
}
} }
if (synchronize) { if (synchronize) {
std::unique_lock<dxvk::mutex> lock(m_counterMutex); std::unique_lock<dxvk::mutex> lock(m_counterMutex);
m_condOnSync.wait(lock, [this, timeline] { m_condOnSync.wait(lock, [this, queue, timeline] {
return m_chunksInjectedComplete.load() >= timeline; return getCounter(queue).load(std::memory_order_acquire) >= timeline;
}); });
} }
} }
void DxvkCsThread::synchronize(uint64_t seq) { void DxvkCsThread::synchronize(uint64_t seq) {
// Avoid locking if we know the sync is a no-op, may // Avoid locking if we know the sync is a no-op, may
// reduce overhead if this is being called frequently // reduce overhead if this is being called frequently
if (seq > m_chunksExecuted.load(std::memory_order_acquire)) { if (seq > m_seqOrdered.load(std::memory_order_acquire)) {
// We don't need to lock the queue here, if synchronization // We don't need to lock the queue here, if synchronization
// happens while another thread is submitting then there is // happens while another thread is submitting then there is
// an inherent race anyway // an inherent race anyway
if (seq == SynchronizeAll) if (seq == SynchronizeAll)
seq = m_chunksDispatched.load(); seq = m_queueOrdered.seqDispatch;
auto t0 = dxvk::high_resolution_clock::now(); auto t0 = dxvk::high_resolution_clock::now();
{ std::unique_lock<dxvk::mutex> lock(m_counterMutex); { std::unique_lock<dxvk::mutex> lock(m_counterMutex);
m_condOnSync.wait(lock, [this, seq] { m_condOnSync.wait(lock, [this, seq] {
return m_chunksExecuted.load() >= seq; return m_seqOrdered.load(std::memory_order_acquire) >= seq;
}); });
} }
@ -178,45 +193,69 @@ namespace dxvk {
void DxvkCsThread::threadFunc() { void DxvkCsThread::threadFunc() {
env::setThreadName("dxvk-cs"); env::setThreadName("dxvk-cs");
// Local chunk queue, we use two queues and swap between // Local chunk queues, we use two queues and swap between
// them in order to potentially reduce lock contention. // them in order to potentially reduce lock contention.
std::vector<DxvkCsChunkRef> chunks; std::vector<DxvkCsQueuedChunk> ordered;
std::vector<DxvkCsQueuedChunk> highPrio;
try { try {
while (!m_stopped.load()) { while (!m_stopped.load()) {
bool injected = false;
{ std::unique_lock<dxvk::mutex> lock(m_mutex); { std::unique_lock<dxvk::mutex> lock(m_mutex);
m_condOnAdd.wait(lock, [this] { m_condOnAdd.wait(lock, [this] {
return (!m_chunksQueued.empty()) return (!m_queueOrdered.queue.empty())
|| (!m_chunksInjected.empty()) || (!m_queueHighPrio.queue.empty())
|| (m_stopped.load()); || (m_stopped.load());
}); });
injected = !m_chunksInjected.empty(); std::swap(ordered, m_queueOrdered.queue);
std::swap(chunks, injected ? m_chunksInjected : m_chunksQueued); std::swap(highPrio, m_queueHighPrio.queue);
m_hasHighPrio.store(false, std::memory_order_release);
} }
for (auto& chunk : chunks) { size_t orderedIndex = 0u;
size_t highPrioIndex = 0u;
while (highPrioIndex < highPrio.size() || orderedIndex < ordered.size()) {
// Re-fill local high-priority queue if the app has queued anything up
// in the meantime, we want to reduce possible synchronization delays.
if (highPrioIndex >= highPrio.size() && m_hasHighPrio.load(std::memory_order_acquire)) {
highPrio.clear();
highPrioIndex = 0u;
std::unique_lock<dxvk::mutex> lock(m_mutex);
std::swap(highPrio, m_queueHighPrio.queue);
m_hasHighPrio.store(false, std::memory_order_release);
}
// Drain high-priority queue first
bool isHighPrio = highPrioIndex < highPrio.size();
auto& entry = isHighPrio ? highPrio[highPrioIndex++] : ordered[orderedIndex++];
m_context->addStatCtr(DxvkStatCounter::CsChunkCount, 1); m_context->addStatCtr(DxvkStatCounter::CsChunkCount, 1);
chunk->executeAll(m_context.ptr()); entry.chunk->executeAll(m_context.ptr());
if (entry.seq) {
// Use a separate mutex for the chunk counter, this will only
// ever be contested if synchronization is actually necessary.
std::lock_guard lock(m_counterMutex);
auto& counter = isHighPrio ? m_seqHighPrio : m_seqOrdered;
counter.store(entry.seq, std::memory_order_release);
// Use a separate mutex for the chunk counter, this
// will only ever be contested if synchronization is
// actually necessary.
{ std::unique_lock<dxvk::mutex> lock(m_counterMutex);
(injected ? m_chunksInjectedComplete : m_chunksExecuted) += 1u;
m_condOnSync.notify_one(); m_condOnSync.notify_one();
} }
// Explicitly free chunk here to release // Immediately free the chunk to release
// references to any resources held by it // references to any resources held by it
chunk = DxvkCsChunkRef(); entry.chunk = DxvkCsChunkRef();
} }
chunks.clear(); ordered.clear();
highPrio.clear();
} }
} catch (const DxvkError& e) { } catch (const DxvkError& e) {
Logger::err("Exception on CS thread!"); Logger::err("Exception on CS thread!");

View File

@ -377,6 +377,36 @@ namespace dxvk {
}; };
/**
* \brief Queue type
*/
enum class DxvkCsQueue : uint32_t {
Ordered = 0, /// Normal queue with ordering guarantees
HighPriority = 1, /// High-priority queue
};
/**
* \brief Queued chunk entry
*/
struct DxvkCsQueuedChunk {
DxvkCsChunkRef chunk;
uint64_t seq;
};
/**
* \brief Chunk queue
*
* Stores queued chunks as well as the sequence
* counters for synchronization.
*/
struct DxvkCsChunkQueue {
std::vector<DxvkCsQueuedChunk> queue;
uint64_t seqDispatch = 0u;
};
/** /**
* \brief Command stream thread * \brief Command stream thread
* *
@ -412,10 +442,14 @@ namespace dxvk {
* commands. The context can still be safely accessed, but chunks * commands. The context can still be safely accessed, but chunks
* will not be executed in any particular oder. These chunks also * will not be executed in any particular oder. These chunks also
* do not contribute to the main timeline. * do not contribute to the main timeline.
* \param [in] queue Which queue to add the chunk to
* \param [in] chunk The chunk to dispatch * \param [in] chunk The chunk to dispatch
* \param [in] synchronize Whether to wait for execution to complete * \param [in] synchronize Whether to wait for execution to complete
*/ */
void injectChunk(DxvkCsChunkRef&& chunk, bool synchronize); void injectChunk(
DxvkCsQueue queue,
DxvkCsChunkRef&& chunk,
bool synchronize);
/** /**
* \brief Synchronizes with the thread * \brief Synchronizes with the thread
@ -435,29 +469,43 @@ namespace dxvk {
* \returns Sequence number of last executed chunk * \returns Sequence number of last executed chunk
*/ */
uint64_t lastSequenceNumber() const { uint64_t lastSequenceNumber() const {
return m_chunksExecuted.load(); return m_seqOrdered.load(std::memory_order_acquire);
} }
private: private:
Rc<DxvkDevice> m_device; Rc<DxvkDevice> m_device;
Rc<DxvkContext> m_context; Rc<DxvkContext> m_context;
alignas(CACHE_LINE_SIZE)
dxvk::mutex m_counterMutex; dxvk::mutex m_counterMutex;
std::atomic<uint64_t> m_chunksDispatched = { 0ull };
std::atomic<uint64_t> m_chunksExecuted = { 0ull };
std::atomic<uint64_t> m_chunksInjectedCount = { 0ull }; std::atomic<uint64_t> m_seqHighPrio = { 0u };
std::atomic<uint64_t> m_chunksInjectedComplete = { 0ull }; std::atomic<uint64_t> m_seqOrdered = { 0u };
std::atomic<bool> m_stopped = { false }; std::atomic<bool> m_stopped = { false };
std::atomic<bool> m_hasHighPrio = { false };
alignas(CACHE_LINE_SIZE)
dxvk::mutex m_mutex; dxvk::mutex m_mutex;
dxvk::condition_variable m_condOnAdd; dxvk::condition_variable m_condOnAdd;
dxvk::condition_variable m_condOnSync; dxvk::condition_variable m_condOnSync;
std::vector<DxvkCsChunkRef> m_chunksQueued;
std::vector<DxvkCsChunkRef> m_chunksInjected; DxvkCsChunkQueue m_queueOrdered;
DxvkCsChunkQueue m_queueHighPrio;
dxvk::thread m_thread; dxvk::thread m_thread;
auto& getQueue(DxvkCsQueue which) {
return which == DxvkCsQueue::Ordered
? m_queueOrdered : m_queueHighPrio;
}
auto& getCounter(DxvkCsQueue which) {
return which == DxvkCsQueue::Ordered
? m_seqOrdered : m_seqHighPrio;
}
void threadFunc(); void threadFunc();
}; };