From 5ca4828255a3550ef4fb8d5404fd9b54f10182b9 Mon Sep 17 00:00:00 2001 From: rezky_nightky Date: Wed, 21 Jan 2026 21:22:43 +0700 Subject: [PATCH] feat: stability improvements, see detail below MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Key stability improvements made (deterministic + bounded) 1) Bounded memory usage in long-running stats Fixed unbounded growth in NetworkState latency tracking: Replaced std::vector m_latency + push_back() with a fixed-size ring buffer (kLatencyWindow = 1024) and explicit counters. Median latency computation now operates on at most 1024 samples, preventing memory growth and avoiding performance cliffs from ever-growing copies/sorts. 2) Prevent crash/UAF on shutdown + more predictable teardown Controller shutdown ordering (Controller::stop()): Now stops m_miner before destroying m_network. This reduces chances of worker threads submitting results into a network listener that’s already destroyed. Thread teardown hardening (backend/common/Thread.h): Destructor now checks std::thread::joinable() before join(). Avoids std::terminate() if a thread object exists but never started due to early exit/error paths. 3) Fixed real leaks (including executable memory) Executable memory leak fixed (crypto/cn/CnCtx.cpp): CnCtx::create() allocates executable memory for generated_code via VirtualMemory::allocateExecutableMemory(0x4000, ...). Previously CnCtx::release() only _mm_free()’d the struct, leaking the executable mapping. Now CnCtx::release() frees generated_code before freeing the ctx. GPU verification leak fixed (net/JobResults.cpp): In getResults() (GPU result verification), a cryptonight_ctx was created via CnCtx::create() but never released. Added CnCtx::release(ctx, 1). 4) JobResults: bounded queues + backpressure + safe shutdown semantics The old JobResults could: enqueue unlimited std::list items (m_results, m_bundles) → unbounded RAM, call uv_queue_work per async batch → unbounded libuv threadpool backlog, delete handler directly while worker threads might still submit → potential crash/UAF. Changes made: Hard queue limits: kMaxQueuedResults = 4096 kMaxQueuedBundles = 256 Excess is dropped (bounded behavior under load). Async coalescing: Only one pending async notification at a time (m_pendingAsync), reducing eventfd/uv wake storms. Bounded libuv work scheduling: Only one uv_queue_work is scheduled at a time (m_workScheduled), preventing CPU starvation and unpredictable backlog. Safe shutdown: JobResults::stop() now detaches global handler first, then calls handler->stop(). Shutdown detaches m_listener, clears queues, and defers deletion until in-flight work is done. Defensive bound on GPU result count: Clamp count to 0xFF inside JobResults as well, not just in the caller, to guard against corrupted kernels/drivers. 5) Idempotent cleanup VirtualMemory::destroy() now sets pool = nullptr after delete: prevents accidental double-delete on repeated teardown paths. Verification performed codespell . --config ./.codespellrc: clean CMake configure + build completed successfully (Release build) Signed-off-by: rezky_nightky --- src/backend/common/Thread.h | 2 +- src/base/net/stratum/NetworkState.cpp | 28 +++-- src/base/net/stratum/NetworkState.h | 7 +- src/core/Controller.cpp | 8 +- src/crypto/cn/CnCtx.cpp | 9 ++ src/crypto/common/VirtualMemory.cpp | 1 + src/net/JobResults.cpp | 175 +++++++++++++++++++++++--- 7 files changed, 199 insertions(+), 31 deletions(-) diff --git a/src/backend/common/Thread.h b/src/backend/common/Thread.h index fb6c618b0..95664466a 100644 --- a/src/backend/common/Thread.h +++ b/src/backend/common/Thread.h @@ -65,7 +65,7 @@ public: } } # else - inline ~Thread() { m_thread.join(); delete m_worker; } + inline ~Thread() { if (m_thread.joinable()) { m_thread.join(); } delete m_worker; } inline void start(void *(*callback)(void *)) { m_thread = std::thread(callback, this); } # endif diff --git a/src/base/net/stratum/NetworkState.cpp b/src/base/net/stratum/NetworkState.cpp index 40516a7f3..4e402eb1c 100644 --- a/src/base/net/stratum/NetworkState.cpp +++ b/src/base/net/stratum/NetworkState.cpp @@ -210,7 +210,7 @@ void xmrig::NetworkState::printResults() const printHashes(m_accepted, m_hashes); printDiff(m_diff); - if (m_active && !m_latency.empty()) { + if (m_active && m_latencyCount > 0) { printAvgTime(avgTime()); } @@ -298,13 +298,19 @@ void xmrig::NetworkState::onResultAccepted(IStrategy *strategy, IClient *client, uint32_t xmrig::NetworkState::latency() const { - const size_t calls = m_latency.size(); + const size_t calls = m_latencyCount; if (calls == 0) { return 0; } - auto v = m_latency; - std::nth_element(v.begin(), v.begin() + calls / 2, v.end()); + std::array v; + const size_t start = (m_latencyPos + kLatencyWindow - calls) % kLatencyWindow; + + for (size_t i = 0; i < calls; ++i) { + v[i] = m_latency[(start + i) % kLatencyWindow]; + } + + std::nth_element(v.begin(), v.begin() + calls / 2, v.begin() + calls); return v[calls / 2]; } @@ -312,11 +318,11 @@ uint32_t xmrig::NetworkState::latency() const uint64_t xmrig::NetworkState::avgTime() const { - if (m_latency.empty()) { + if (m_latencyCount == 0) { return 0; } - return connectionTime() / m_latency.size(); + return connectionTime() / m_latencyCount; } @@ -342,7 +348,12 @@ void xmrig::NetworkState::add(const SubmitResult &result, const char *error) std::sort(m_topDiff.rbegin(), m_topDiff.rend()); } - m_latency.push_back(result.elapsed > 0xFFFF ? 0xFFFF : static_cast(result.elapsed)); + m_latency[m_latencyPos] = result.elapsed > 0xFFFF ? 0xFFFF : static_cast(result.elapsed); + m_latencyPos = (m_latencyPos + 1) % kLatencyWindow; + + if (m_latencyCount < kLatencyWindow) { + m_latencyCount++; + } } @@ -355,5 +366,6 @@ void xmrig::NetworkState::stop() m_fingerprint = nullptr; m_failures++; - m_latency.clear(); + m_latencyCount = 0; + m_latencyPos = 0; } diff --git a/src/base/net/stratum/NetworkState.h b/src/base/net/stratum/NetworkState.h index d47a3511f..8507ab51d 100644 --- a/src/base/net/stratum/NetworkState.h +++ b/src/base/net/stratum/NetworkState.h @@ -27,7 +27,6 @@ #include #include -#include namespace xmrig { @@ -60,6 +59,8 @@ protected: void onResultAccepted(IStrategy *strategy, IClient *client, const SubmitResult &result, const char *error) override; private: + constexpr static size_t kLatencyWindow = 1024; + uint32_t latency() const; uint64_t avgTime() const; uint64_t connectionTime() const; @@ -70,7 +71,9 @@ private: bool m_active = false; char m_pool[256]{}; std::array m_topDiff { { } }; - std::vector m_latency; + std::array m_latency { { } }; + size_t m_latencyCount = 0; + size_t m_latencyPos = 0; String m_fingerprint; String m_ip; String m_tls; diff --git a/src/core/Controller.cpp b/src/core/Controller.cpp index 626175aeb..763885fa7 100644 --- a/src/core/Controller.cpp +++ b/src/core/Controller.cpp @@ -76,10 +76,12 @@ void xmrig::Controller::stop() { Base::stop(); - m_network.reset(); + if (m_miner) { + m_miner->stop(); + m_miner.reset(); + } - m_miner->stop(); - m_miner.reset(); + m_network.reset(); } diff --git a/src/crypto/cn/CnCtx.cpp b/src/crypto/cn/CnCtx.cpp index c0dc8b344..ceace2bd9 100644 --- a/src/crypto/cn/CnCtx.cpp +++ b/src/crypto/cn/CnCtx.cpp @@ -49,6 +49,15 @@ void xmrig::CnCtx::release(cryptonight_ctx **ctx, size_t count) } for (size_t i = 0; i < count; ++i) { + if (ctx[i] && ctx[i]->generated_code) { +# ifdef XMRIG_OS_WIN + VirtualMemory::freeLargePagesMemory(reinterpret_cast(ctx[i]->generated_code), 0); +# else + VirtualMemory::freeLargePagesMemory(reinterpret_cast(ctx[i]->generated_code), 0x4000); +# endif + ctx[i]->generated_code = nullptr; + } + _mm_free(ctx[i]); } } diff --git a/src/crypto/common/VirtualMemory.cpp b/src/crypto/common/VirtualMemory.cpp index d7d3a545e..437b63cff 100644 --- a/src/crypto/common/VirtualMemory.cpp +++ b/src/crypto/common/VirtualMemory.cpp @@ -124,6 +124,7 @@ uint32_t xmrig::VirtualMemory::bindToNUMANode(int64_t) void xmrig::VirtualMemory::destroy() { delete pool; + pool = nullptr; } diff --git a/src/net/JobResults.cpp b/src/net/JobResults.cpp index 57e4a8db0..d1a78a4e4 100644 --- a/src/net/JobResults.cpp +++ b/src/net/JobResults.cpp @@ -56,6 +56,7 @@ #include +#include #include #include #include @@ -66,6 +67,9 @@ namespace xmrig { #if defined(XMRIG_FEATURE_OPENCL) || defined(XMRIG_FEATURE_CUDA) +class JobResultsPrivate; + + class JobBundle { public: @@ -86,14 +90,14 @@ public: class JobBaton : public Baton { public: - inline JobBaton(std::list &&bundles, IJobResultListener *listener, bool hwAES) : + inline JobBaton(std::list &&bundles, JobResultsPrivate *owner, bool hwAES) : hwAES(hwAES), - listener(listener), + owner(owner), bundles(std::move(bundles)) {} const bool hwAES; - IJobResultListener *listener; + JobResultsPrivate *owner; std::list bundles; std::vector results; uint32_t errors = 0; @@ -188,6 +192,8 @@ static void getResults(JobBundle &bundle, std::vector &results, uint3 checkHash(bundle, results, nonce, hash, errors); } + + CnCtx::release(ctx, 1); } delete memory; @@ -200,6 +206,11 @@ class JobResultsPrivate : public IAsyncListener public: XMRIG_DISABLE_COPY_MOVE_DEFAULT(JobResultsPrivate) + constexpr static size_t kMaxQueuedResults = 4096; +# if defined(XMRIG_FEATURE_OPENCL) || defined(XMRIG_FEATURE_CUDA) + constexpr static size_t kMaxQueuedBundles = 256; +# endif + inline JobResultsPrivate(IJobResultListener *listener, bool hwAES) : m_hwAES(hwAES), m_listener(listener) @@ -214,9 +225,20 @@ public: inline void submit(const JobResult &result) { std::lock_guard lock(m_mutex); + + if (m_stopping) { + return; + } + + if (m_results.size() >= kMaxQueuedResults) { + return; + } + m_results.push_back(result); - m_async->send(); + if (m_async && !m_pendingAsync.exchange(true)) { + m_async->send(); + } } @@ -224,13 +246,55 @@ public: inline void submit(const Job &job, uint32_t *results, size_t count, uint32_t device_index) { std::lock_guard lock(m_mutex); + + if (count > 0xFF) { + count = 0xFF; + } + + if (m_stopping) { + return; + } + + if (m_bundles.size() >= kMaxQueuedBundles) { + return; + } + m_bundles.emplace_back(job, results, count, device_index); - m_async->send(); + if (m_async && !m_pendingAsync.exchange(true)) { + m_async->send(); + } } # endif + inline void stop() + { + bool deleteNow = false; + + { + std::lock_guard lock(m_mutex); + m_stopping = true; + m_listener = nullptr; + m_results.clear(); + +# if defined(XMRIG_FEATURE_OPENCL) || defined(XMRIG_FEATURE_CUDA) + m_bundles.clear(); + m_workScheduled = false; + m_deleteWhenDone = true; + deleteNow = (m_pendingWork == 0); +# else + deleteNow = true; +# endif + } + + if (deleteNow) { + m_async.reset(); + delete this; + } + } + + protected: inline void onAsync() override { submit(); } @@ -239,23 +303,33 @@ private: # if defined(XMRIG_FEATURE_OPENCL) || defined(XMRIG_FEATURE_CUDA) inline void submit() { + m_pendingAsync.store(false); + std::list bundles; std::list results; m_mutex.lock(); - m_bundles.swap(bundles); m_results.swap(results); + + const bool canScheduleWork = !m_workScheduled && !m_stopping && !m_bundles.empty(); + if (canScheduleWork) { + m_bundles.swap(bundles); + m_workScheduled = true; + m_pendingWork++; + } m_mutex.unlock(); for (const auto &result : results) { - m_listener->onJobResult(result); + if (m_listener) { + m_listener->onJobResult(result); + } } if (bundles.empty()) { return; } - auto baton = new JobBaton(std::move(bundles), m_listener, m_hwAES); + auto baton = new JobBaton(std::move(bundles), this, m_hwAES); uv_queue_work(uv_default_loop(), &baton->req, [](uv_work_t *req) { @@ -268,8 +342,67 @@ private: [](uv_work_t *req, int) { auto baton = static_cast(req->data); - for (const auto &result : baton->results) { - baton->listener->onJobResult(result); + if (baton->owner) { + baton->owner->onBatonDone(std::move(baton->results)); + } + + delete baton; + } + ); + } + + + inline void onBatonDone(std::vector &&results) + { + for (const auto &result : results) { + if (m_listener) { + m_listener->onJobResult(result); + } + } + + std::list bundles; + + m_mutex.lock(); + + m_pendingWork--; + + const bool canScheduleWork = !m_stopping && !m_bundles.empty(); + if (canScheduleWork) { + m_bundles.swap(bundles); + m_pendingWork++; + } + else { + m_workScheduled = false; + } + + const bool canDelete = m_deleteWhenDone && m_pendingWork == 0; + m_mutex.unlock(); + + if (canDelete) { + m_async.reset(); + delete this; + return; + } + + if (bundles.empty()) { + return; + } + + auto baton = new JobBaton(std::move(bundles), this, m_hwAES); + + uv_queue_work(uv_default_loop(), &baton->req, + [](uv_work_t *req) { + auto baton = static_cast(req->data); + + for (JobBundle &bundle : baton->bundles) { + getResults(bundle, baton->results, baton->errors, baton->hwAES); + } + }, + [](uv_work_t *req, int) { + auto baton = static_cast(req->data); + + if (baton->owner) { + baton->owner->onBatonDone(std::move(baton->results)); } delete baton; @@ -279,6 +412,8 @@ private: # else inline void submit() { + m_pendingAsync.store(false); + std::list results; m_mutex.lock(); @@ -286,7 +421,9 @@ private: m_mutex.unlock(); for (const auto &result : results) { - m_listener->onJobResult(result); + if (m_listener) { + m_listener->onJobResult(result); + } } } # endif @@ -296,9 +433,14 @@ private: std::list m_results; std::mutex m_mutex; std::shared_ptr m_async; + std::atomic m_pendingAsync{ false }; + bool m_stopping = false; # if defined(XMRIG_FEATURE_OPENCL) || defined(XMRIG_FEATURE_CUDA) std::list m_bundles; + bool m_workScheduled = false; + uint32_t m_pendingWork = 0; + bool m_deleteWhenDone = false; # endif }; @@ -325,11 +467,12 @@ void xmrig::JobResults::setListener(IJobResultListener *listener, bool hwAES) void xmrig::JobResults::stop() { - assert(handler != nullptr); - - delete handler; - + auto h = handler; handler = nullptr; + + if (h) { + h->stop(); + } } @@ -347,8 +490,6 @@ void xmrig::JobResults::submit(const Job& job, uint32_t nonce, const uint8_t* re void xmrig::JobResults::submit(const JobResult &result) { - assert(handler != nullptr); - if (handler) { handler->submit(result); }