From 3dae3f9173ee13855e5fa91a37f89fc23af48cf2 Mon Sep 17 00:00:00 2001 From: ITotalJustice <47043333+ITotalJustice@users.noreply.github.com> Date: Tue, 16 Sep 2025 04:15:56 +0100 Subject: [PATCH] devoptab/curl: fix rare deadlock casued by sleeping/blocking in curl callback functions. it seems that curl does not like long blocking in the r/w callbacks. blocking for too seems to cause a deadlock as the server stops send/recv anymore data. to fix this, i now use curls pause api. this api is not thread safe, so it's a little more involved than it needs to be. however this fixes the deadlock as curls pause actually reduces the download/upload speed to the minimum. it also reduces exit latency as now exiting is handled in the progress callback as well, which is called far more often than r/w. --- sphaira/include/utils/devoptab_common.hpp | 17 +- sphaira/source/utils/devoptab_common.cpp | 190 ++++++++++++++-------- 2 files changed, 131 insertions(+), 76 deletions(-) diff --git a/sphaira/include/utils/devoptab_common.hpp b/sphaira/include/utils/devoptab_common.hpp index 46184c9..0ca0851 100644 --- a/sphaira/include/utils/devoptab_common.hpp +++ b/sphaira/include/utils/devoptab_common.hpp @@ -93,18 +93,18 @@ void update_devoptab_for_read_only(devoptab_t* devoptab, bool read_only); struct PushPullThreadData { static constexpr size_t MAX_BUFFER_SIZE = 1024 * 64; // 64KB max buffer - PushPullThreadData(CURL* _curl); + explicit PushPullThreadData(CURL* _curl); virtual ~PushPullThreadData(); - Result CreateAndStart(); + Result CreateAndStart(); void Cancel(); bool IsRunning(); - size_t PullData(char* data, size_t total_size); - size_t PushData(const char* data, size_t total_size); + // only set curl=true if called from a curl callback. + size_t PullData(char* data, size_t total_size, bool curl = false); + size_t PushData(const char* data, size_t total_size, bool curl = false); - static size_t push_thread_callback(const char *ptr, size_t size, size_t nmemb, void *userdata); - static size_t pull_thread_callback(char *ptr, size_t size, size_t nmemb, void *userdata); + static size_t progress_callback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow); private: static void thread_func(void* arg); @@ -145,13 +145,12 @@ using MountConfigs = std::vector; struct PullThreadData final : PushPullThreadData { using PushPullThreadData::PushPullThreadData; - ~PullThreadData(); + static size_t pull_thread_callback(char *ptr, size_t size, size_t nmemb, void *userdata); }; - struct PushThreadData final : PushPullThreadData { using PushPullThreadData::PushPullThreadData; - ~PushThreadData(); + static size_t push_thread_callback(const char *ptr, size_t size, size_t nmemb, void *userdata); }; struct MountDevice { diff --git a/sphaira/source/utils/devoptab_common.cpp b/sphaira/source/utils/devoptab_common.cpp index fbc6561..a00b3f4 100644 --- a/sphaira/source/utils/devoptab_common.cpp +++ b/sphaira/source/utils/devoptab_common.cpp @@ -972,10 +972,13 @@ PushPullThreadData::PushPullThreadData(CURL* _curl) : curl{_curl} { } PushPullThreadData::~PushPullThreadData() { + log_write("[PUSH:PULL] Destructor\n"); Cancel(); if (started) { + log_write("[PUSH:PULL] Waiting for thread to exit\n"); threadWaitForExit(&thread); + log_write("[PUSH:PULL] Thread exited\n"); } threadClose(&thread); @@ -1007,7 +1010,7 @@ bool PushPullThreadData::IsRunning() { return !finished && !error; } -size_t PushPullThreadData::PullData(char* data, size_t total_size) { +size_t PushPullThreadData::PullData(char* data, size_t total_size, bool curl) { if (!data || !total_size) { return 0; } @@ -1015,28 +1018,48 @@ size_t PushPullThreadData::PullData(char* data, size_t total_size) { SCOPED_MUTEX(&mutex); ON_SCOPE_EXIT(condvarWakeOne(&can_push)); - size_t bytes_read = 0; - while (bytes_read < total_size && !error) { + if (curl) { + // this should be handled in the progress function. + // however i handle it here as well just in case. if (buffer.empty()) { if (finished) { - break; + log_write("[PUSH:PULL] PullData: finished and no data\n"); + return 0; } - condvarWakeOne(&can_push); - condvarWait(&can_pull, &mutex); - continue; + return CURL_READFUNC_PAUSE; } - const auto rsize = std::min(total_size - bytes_read, buffer.size()); - std::memcpy(data + bytes_read, buffer.data(), rsize); + // read what we can. + const auto rsize = std::min(total_size, buffer.size()); + std::memcpy(data, buffer.data(), rsize); buffer.erase(buffer.begin(), buffer.begin() + rsize); - bytes_read += rsize; - } + return rsize; + } else { + // if we are not in a curl callback, then we can block until we have data. + size_t bytes_read = 0; + while (bytes_read < total_size && !error) { + if (buffer.empty()) { + if (finished) { + break; + } - return bytes_read; + condvarWakeOne(&can_push); + condvarWait(&can_pull, &mutex); + continue; + } + + const auto rsize = std::min(total_size - bytes_read, buffer.size()); + std::memcpy(data + bytes_read, buffer.data(), rsize); + buffer.erase(buffer.begin(), buffer.begin() + rsize); + bytes_read += rsize; + } + + return bytes_read; + } } -size_t PushPullThreadData::PushData(const char* data, size_t total_size) { +size_t PushPullThreadData::PushData(const char* data, size_t total_size, bool curl) { if (!data || !total_size) { return 0; } @@ -1044,53 +1067,120 @@ size_t PushPullThreadData::PushData(const char* data, size_t total_size) { SCOPED_MUTEX(&mutex); ON_SCOPE_EXIT(condvarWakeOne(&can_pull)); - size_t bytes_written = 0; - while (bytes_written < total_size && !error && !finished) { - const size_t space_left = MAX_BUFFER_SIZE - buffer.size(); - if (space_left == 0) { - condvarWakeOne(&can_pull); - condvarWait(&can_push, &mutex); - continue; + if (curl) { + // this should be handled in the progress function. + // however i handle it here as well just in case. + if (buffer.size() + total_size > MAX_BUFFER_SIZE) { + return CURL_WRITEFUNC_PAUSE; } - const auto wsize = std::min(total_size - bytes_written, space_left); - buffer.insert(buffer.end(), data + bytes_written, data + bytes_written + wsize); - bytes_written += wsize; - } + // blocking / pausing is handled in the progress function. + // do NOT block here as curl does not like it and it will deadlock. + // the mutex block above is fine as it only blocks to perform a memcpy. + buffer.insert(buffer.end(), data, data + total_size); + return total_size; + } else { + // if we are not in a curl callback, then we can block until we have space. + size_t bytes_written = 0; + while (bytes_written < total_size && !error && !finished) { + const size_t space_left = MAX_BUFFER_SIZE - buffer.size(); + if (space_left == 0) { + condvarWakeOne(&can_pull); + condvarWait(&can_push, &mutex); + continue; + } - return bytes_written; + const auto wsize = std::min(total_size - bytes_written, space_left); + buffer.insert(buffer.end(), data + bytes_written, data + bytes_written + wsize); + bytes_written += wsize; + } + + return bytes_written; + } } -size_t PushPullThreadData::push_thread_callback(const char *ptr, size_t size, size_t nmemb, void *userdata) { +size_t PushThreadData::push_thread_callback(const char *ptr, size_t size, size_t nmemb, void *userdata) { if (!ptr || !userdata || !size || !nmemb) { return 0; } - auto* data = static_cast(userdata); - return data->PushData(ptr, size * nmemb); + auto* data = static_cast(userdata); + return data->PushData(ptr, size * nmemb, true); } -size_t PushPullThreadData::pull_thread_callback(char *ptr, size_t size, size_t nmemb, void *userdata) { +size_t PullThreadData::pull_thread_callback(char *ptr, size_t size, size_t nmemb, void *userdata) { if (!ptr || !userdata || !size || !nmemb) { return 0; } - auto* data = static_cast(userdata); - return data->PullData(ptr, size * nmemb); + auto* data = static_cast(userdata); + return data->PullData(ptr, size * nmemb, true); +} + +size_t PushPullThreadData::progress_callback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) { + auto *data = static_cast(clientp); + bool should_pause; + + { + SCOPED_MUTEX(&data->mutex); + + // abort early if there was an error. + if (data->error) { + log_write("[PUSH:PULL] progress_callback: aborting transfer, error set\n"); + return 1; + } + + // nothing yet. + if (!dlnow && !ulnow) { + return 0; + } + + // workout if this is a download or upload. + const auto is_download = dlnow > 0; + + if (is_download) { + // no more data wanted, usually this is handled by curl using ranges. + // however, if we did a seek, then we want to cancel early. + if (data->finished) { + log_write("[PUSH:PULL] progress_callback: cancelling download, finished set\n"); + return 1; + } + + // pause if the buffer is full, otherwise continue. + should_pause = data->buffer.size() >= MAX_BUFFER_SIZE; + } else { + // pause if we have no data to send, otherwise continue. + // do not pause if finished as curl may have internal data pending to send. + should_pause = !data->finished && data->buffer.empty(); + } + } + + // curl_easy_pause(CONT) actually calls the read/write callback again immediately. + // so we need to make sure we are not holding the mutex when calling it. + // the curl handle is owned by this thread so no need to lock it. + const auto res = curl_easy_pause(data->curl, should_pause ? CURLPAUSE_ALL : CURLPAUSE_CONT); + if (res != CURLE_OK) { + log_write("[PUSH:PULL] progress_callback: curl_easy_pause(%d) failed: %s\n", should_pause, curl_easy_strerror(res)); + } + + return 0; } void PushPullThreadData::thread_func(void* arg) { log_write("[PUSH:PULL] Read thread started\n"); auto data = static_cast(arg); + + curl_easy_setopt(data->curl, CURLOPT_XFERINFODATA, data); + curl_easy_setopt(data->curl, CURLOPT_XFERINFOFUNCTION, progress_callback); const auto res = curl_easy_perform(data->curl); + log_write("[PUSH:PULL] curl_easy_perform() returned: %s\n", curl_easy_strerror(res)); + // when finished, lock mutex and signal for anything waiting. SCOPED_MUTEX(&data->mutex); condvarWakeOne(&data->can_push); condvarWakeOne(&data->can_pull); - log_write("[PUSH:PULL] curl_easy_perform() finished for read thread: %s\n", curl_easy_strerror(res)); - data->finished = true; data->error = res != CURLE_OK; curl_easy_getinfo(data->curl, CURLINFO_RESPONSE_CODE, &data->code); @@ -1098,40 +1188,6 @@ void PushPullThreadData::thread_func(void* arg) { log_write("[PUSH:PULL] Read thread finished, code: %ld, error: %d\n", data->code, data->error); } -PullThreadData::~PullThreadData() { - if (started) { - SCOPED_MUTEX(&mutex); - - // for now, always wait until the dat is flushed. - // may enable a timeout later on, however i don't want to risk - // data loss for users that have slow hdd / connections. - #if 1 - while (!finished && !error && !buffer.empty()) { - condvarWakeOne(&can_pull); - condvarWait(&can_push, &mutex); - } - #else - u64 timeout = 5e+9; - const auto deadline = armGetSystemTick() + armNsToTicks(timeout); - - while (!finished && !error && !buffer.empty()) { - const s64 remaining = deadline - armGetSystemTick(); - timeout = remaining > 0 ? armTicksToNs(remaining) : 0; - - condvarWakeOne(&can_pull); - if (R_FAILED(condvarWaitTimeout(&can_push, &mutex, timeout))) { - log_write("[PullThreadData] condvarWaitTimeout() timed out flushing data: %zu\n", buffer.size()); - break; - } - } - #endif - } -} - -PushThreadData::~PushThreadData() { - -} - MountCurlDevice::~MountCurlDevice() { log_write("[CURL] Cleaning up mount device\n"); if (curlu) {