devoptab/curl: fix rare deadlock casued by sleeping/blocking in curl callback functions.

it seems that curl does not like long blocking in the r/w callbacks.
blocking for too seems to cause a deadlock as the server stops send/recv anymore data.

to fix this, i now use curls pause api.
this api is not thread safe, so it's a little more involved than it needs to be.

however this fixes the deadlock as curls pause actually reduces the download/upload speed
to the minimum. it also reduces exit latency as now exiting is handled in the progress callback
as well, which is called far more often than r/w.
This commit is contained in:
ITotalJustice
2025-09-16 04:15:56 +01:00
parent 63c420d5d8
commit 3dae3f9173
2 changed files with 131 additions and 76 deletions

View File

@@ -93,18 +93,18 @@ void update_devoptab_for_read_only(devoptab_t* devoptab, bool read_only);
struct PushPullThreadData { struct PushPullThreadData {
static constexpr size_t MAX_BUFFER_SIZE = 1024 * 64; // 64KB max buffer static constexpr size_t MAX_BUFFER_SIZE = 1024 * 64; // 64KB max buffer
PushPullThreadData(CURL* _curl); explicit PushPullThreadData(CURL* _curl);
virtual ~PushPullThreadData(); virtual ~PushPullThreadData();
Result CreateAndStart();
Result CreateAndStart();
void Cancel(); void Cancel();
bool IsRunning(); bool IsRunning();
size_t PullData(char* data, size_t total_size); // only set curl=true if called from a curl callback.
size_t PushData(const char* data, size_t total_size); size_t PullData(char* data, size_t total_size, bool curl = false);
size_t PushData(const char* data, size_t total_size, bool curl = false);
static size_t push_thread_callback(const char *ptr, size_t size, size_t nmemb, void *userdata); static size_t progress_callback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
static size_t pull_thread_callback(char *ptr, size_t size, size_t nmemb, void *userdata);
private: private:
static void thread_func(void* arg); static void thread_func(void* arg);
@@ -145,13 +145,12 @@ using MountConfigs = std::vector<MountConfig>;
struct PullThreadData final : PushPullThreadData { struct PullThreadData final : PushPullThreadData {
using PushPullThreadData::PushPullThreadData; using PushPullThreadData::PushPullThreadData;
~PullThreadData(); static size_t pull_thread_callback(char *ptr, size_t size, size_t nmemb, void *userdata);
}; };
struct PushThreadData final : PushPullThreadData { struct PushThreadData final : PushPullThreadData {
using PushPullThreadData::PushPullThreadData; using PushPullThreadData::PushPullThreadData;
~PushThreadData(); static size_t push_thread_callback(const char *ptr, size_t size, size_t nmemb, void *userdata);
}; };
struct MountDevice { struct MountDevice {

View File

@@ -972,10 +972,13 @@ PushPullThreadData::PushPullThreadData(CURL* _curl) : curl{_curl} {
} }
PushPullThreadData::~PushPullThreadData() { PushPullThreadData::~PushPullThreadData() {
log_write("[PUSH:PULL] Destructor\n");
Cancel(); Cancel();
if (started) { if (started) {
log_write("[PUSH:PULL] Waiting for thread to exit\n");
threadWaitForExit(&thread); threadWaitForExit(&thread);
log_write("[PUSH:PULL] Thread exited\n");
} }
threadClose(&thread); threadClose(&thread);
@@ -1007,7 +1010,7 @@ bool PushPullThreadData::IsRunning() {
return !finished && !error; return !finished && !error;
} }
size_t PushPullThreadData::PullData(char* data, size_t total_size) { size_t PushPullThreadData::PullData(char* data, size_t total_size, bool curl) {
if (!data || !total_size) { if (!data || !total_size) {
return 0; return 0;
} }
@@ -1015,28 +1018,48 @@ size_t PushPullThreadData::PullData(char* data, size_t total_size) {
SCOPED_MUTEX(&mutex); SCOPED_MUTEX(&mutex);
ON_SCOPE_EXIT(condvarWakeOne(&can_push)); ON_SCOPE_EXIT(condvarWakeOne(&can_push));
size_t bytes_read = 0; if (curl) {
while (bytes_read < total_size && !error) { // this should be handled in the progress function.
// however i handle it here as well just in case.
if (buffer.empty()) { if (buffer.empty()) {
if (finished) { if (finished) {
break; log_write("[PUSH:PULL] PullData: finished and no data\n");
return 0;
} }
condvarWakeOne(&can_push); return CURL_READFUNC_PAUSE;
condvarWait(&can_pull, &mutex);
continue;
} }
const auto rsize = std::min(total_size - bytes_read, buffer.size()); // read what we can.
std::memcpy(data + bytes_read, buffer.data(), rsize); const auto rsize = std::min(total_size, buffer.size());
std::memcpy(data, buffer.data(), rsize);
buffer.erase(buffer.begin(), buffer.begin() + rsize); buffer.erase(buffer.begin(), buffer.begin() + rsize);
bytes_read += rsize; return rsize;
} } else {
// if we are not in a curl callback, then we can block until we have data.
size_t bytes_read = 0;
while (bytes_read < total_size && !error) {
if (buffer.empty()) {
if (finished) {
break;
}
return bytes_read; condvarWakeOne(&can_push);
condvarWait(&can_pull, &mutex);
continue;
}
const auto rsize = std::min(total_size - bytes_read, buffer.size());
std::memcpy(data + bytes_read, buffer.data(), rsize);
buffer.erase(buffer.begin(), buffer.begin() + rsize);
bytes_read += rsize;
}
return bytes_read;
}
} }
size_t PushPullThreadData::PushData(const char* data, size_t total_size) { size_t PushPullThreadData::PushData(const char* data, size_t total_size, bool curl) {
if (!data || !total_size) { if (!data || !total_size) {
return 0; return 0;
} }
@@ -1044,53 +1067,120 @@ size_t PushPullThreadData::PushData(const char* data, size_t total_size) {
SCOPED_MUTEX(&mutex); SCOPED_MUTEX(&mutex);
ON_SCOPE_EXIT(condvarWakeOne(&can_pull)); ON_SCOPE_EXIT(condvarWakeOne(&can_pull));
size_t bytes_written = 0; if (curl) {
while (bytes_written < total_size && !error && !finished) { // this should be handled in the progress function.
const size_t space_left = MAX_BUFFER_SIZE - buffer.size(); // however i handle it here as well just in case.
if (space_left == 0) { if (buffer.size() + total_size > MAX_BUFFER_SIZE) {
condvarWakeOne(&can_pull); return CURL_WRITEFUNC_PAUSE;
condvarWait(&can_push, &mutex);
continue;
} }
const auto wsize = std::min(total_size - bytes_written, space_left); // blocking / pausing is handled in the progress function.
buffer.insert(buffer.end(), data + bytes_written, data + bytes_written + wsize); // do NOT block here as curl does not like it and it will deadlock.
bytes_written += wsize; // the mutex block above is fine as it only blocks to perform a memcpy.
} buffer.insert(buffer.end(), data, data + total_size);
return total_size;
} else {
// if we are not in a curl callback, then we can block until we have space.
size_t bytes_written = 0;
while (bytes_written < total_size && !error && !finished) {
const size_t space_left = MAX_BUFFER_SIZE - buffer.size();
if (space_left == 0) {
condvarWakeOne(&can_pull);
condvarWait(&can_push, &mutex);
continue;
}
return bytes_written; const auto wsize = std::min(total_size - bytes_written, space_left);
buffer.insert(buffer.end(), data + bytes_written, data + bytes_written + wsize);
bytes_written += wsize;
}
return bytes_written;
}
} }
size_t PushPullThreadData::push_thread_callback(const char *ptr, size_t size, size_t nmemb, void *userdata) { size_t PushThreadData::push_thread_callback(const char *ptr, size_t size, size_t nmemb, void *userdata) {
if (!ptr || !userdata || !size || !nmemb) { if (!ptr || !userdata || !size || !nmemb) {
return 0; return 0;
} }
auto* data = static_cast<PushPullThreadData*>(userdata); auto* data = static_cast<PushThreadData*>(userdata);
return data->PushData(ptr, size * nmemb); return data->PushData(ptr, size * nmemb, true);
} }
size_t PushPullThreadData::pull_thread_callback(char *ptr, size_t size, size_t nmemb, void *userdata) { size_t PullThreadData::pull_thread_callback(char *ptr, size_t size, size_t nmemb, void *userdata) {
if (!ptr || !userdata || !size || !nmemb) { if (!ptr || !userdata || !size || !nmemb) {
return 0; return 0;
} }
auto* data = static_cast<PushPullThreadData*>(userdata); auto* data = static_cast<PullThreadData*>(userdata);
return data->PullData(ptr, size * nmemb); return data->PullData(ptr, size * nmemb, true);
}
size_t PushPullThreadData::progress_callback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
auto *data = static_cast<PushPullThreadData*>(clientp);
bool should_pause;
{
SCOPED_MUTEX(&data->mutex);
// abort early if there was an error.
if (data->error) {
log_write("[PUSH:PULL] progress_callback: aborting transfer, error set\n");
return 1;
}
// nothing yet.
if (!dlnow && !ulnow) {
return 0;
}
// workout if this is a download or upload.
const auto is_download = dlnow > 0;
if (is_download) {
// no more data wanted, usually this is handled by curl using ranges.
// however, if we did a seek, then we want to cancel early.
if (data->finished) {
log_write("[PUSH:PULL] progress_callback: cancelling download, finished set\n");
return 1;
}
// pause if the buffer is full, otherwise continue.
should_pause = data->buffer.size() >= MAX_BUFFER_SIZE;
} else {
// pause if we have no data to send, otherwise continue.
// do not pause if finished as curl may have internal data pending to send.
should_pause = !data->finished && data->buffer.empty();
}
}
// curl_easy_pause(CONT) actually calls the read/write callback again immediately.
// so we need to make sure we are not holding the mutex when calling it.
// the curl handle is owned by this thread so no need to lock it.
const auto res = curl_easy_pause(data->curl, should_pause ? CURLPAUSE_ALL : CURLPAUSE_CONT);
if (res != CURLE_OK) {
log_write("[PUSH:PULL] progress_callback: curl_easy_pause(%d) failed: %s\n", should_pause, curl_easy_strerror(res));
}
return 0;
} }
void PushPullThreadData::thread_func(void* arg) { void PushPullThreadData::thread_func(void* arg) {
log_write("[PUSH:PULL] Read thread started\n"); log_write("[PUSH:PULL] Read thread started\n");
auto data = static_cast<PushPullThreadData*>(arg); auto data = static_cast<PushPullThreadData*>(arg);
curl_easy_setopt(data->curl, CURLOPT_XFERINFODATA, data);
curl_easy_setopt(data->curl, CURLOPT_XFERINFOFUNCTION, progress_callback);
const auto res = curl_easy_perform(data->curl); const auto res = curl_easy_perform(data->curl);
log_write("[PUSH:PULL] curl_easy_perform() returned: %s\n", curl_easy_strerror(res));
// when finished, lock mutex and signal for anything waiting. // when finished, lock mutex and signal for anything waiting.
SCOPED_MUTEX(&data->mutex); SCOPED_MUTEX(&data->mutex);
condvarWakeOne(&data->can_push); condvarWakeOne(&data->can_push);
condvarWakeOne(&data->can_pull); condvarWakeOne(&data->can_pull);
log_write("[PUSH:PULL] curl_easy_perform() finished for read thread: %s\n", curl_easy_strerror(res));
data->finished = true; data->finished = true;
data->error = res != CURLE_OK; data->error = res != CURLE_OK;
curl_easy_getinfo(data->curl, CURLINFO_RESPONSE_CODE, &data->code); curl_easy_getinfo(data->curl, CURLINFO_RESPONSE_CODE, &data->code);
@@ -1098,40 +1188,6 @@ void PushPullThreadData::thread_func(void* arg) {
log_write("[PUSH:PULL] Read thread finished, code: %ld, error: %d\n", data->code, data->error); log_write("[PUSH:PULL] Read thread finished, code: %ld, error: %d\n", data->code, data->error);
} }
PullThreadData::~PullThreadData() {
if (started) {
SCOPED_MUTEX(&mutex);
// for now, always wait until the dat is flushed.
// may enable a timeout later on, however i don't want to risk
// data loss for users that have slow hdd / connections.
#if 1
while (!finished && !error && !buffer.empty()) {
condvarWakeOne(&can_pull);
condvarWait(&can_push, &mutex);
}
#else
u64 timeout = 5e+9;
const auto deadline = armGetSystemTick() + armNsToTicks(timeout);
while (!finished && !error && !buffer.empty()) {
const s64 remaining = deadline - armGetSystemTick();
timeout = remaining > 0 ? armTicksToNs(remaining) : 0;
condvarWakeOne(&can_pull);
if (R_FAILED(condvarWaitTimeout(&can_push, &mutex, timeout))) {
log_write("[PullThreadData] condvarWaitTimeout() timed out flushing data: %zu\n", buffer.size());
break;
}
}
#endif
}
}
PushThreadData::~PushThreadData() {
}
MountCurlDevice::~MountCurlDevice() { MountCurlDevice::~MountCurlDevice() {
log_write("[CURL] Cleaning up mount device\n"); log_write("[CURL] Cleaning up mount device\n");
if (curlu) { if (curlu) {