remove download non-thread_queue code, fix thread queue exit bug

due to the previous commit, requesting the stop token to exit during a download
would cause the thread queue itself to exit.
This commit is contained in:
ITotalJustice
2025-01-14 15:45:52 +00:00
parent 64a40ae672
commit 977331c3b2

View File

@@ -25,7 +25,6 @@ namespace {
log_write("curl_share_setopt(%s, %s) msg: %s\n", #opt, #v, curl_share_strerror(r)); \ log_write("curl_share_setopt(%s, %s) msg: %s\n", #opt, #v, curl_share_strerror(r)); \
} \ } \
#define USE_THREAD_QUEUE 1
constexpr auto API_AGENT = "ITotalJustice"; constexpr auto API_AGENT = "ITotalJustice";
constexpr u64 CHUNK_SIZE = 1024*1024; constexpr u64 CHUNK_SIZE = 1024*1024;
constexpr auto MAX_THREADS = 4; constexpr auto MAX_THREADS = 4;
@@ -302,18 +301,19 @@ struct ThreadQueue {
} }
auto Add(const Api& api) -> bool { auto Add(const Api& api) -> bool {
if (api.GetUrl().empty() || api.GetPath().empty() || !api.GetOnComplete()) {
return false;
}
mutexLock(&m_mutex); mutexLock(&m_mutex);
ON_SCOPE_EXIT(mutexUnlock(&m_mutex)); ON_SCOPE_EXIT(mutexUnlock(&m_mutex));
ThreadQueueEntry entry{};
entry.api = api;
switch (api.GetPriority()) { switch (api.GetPriority()) {
case Priority::Normal: case Priority::Normal:
m_entries.emplace_back(entry); m_entries.emplace_back(api);
break; break;
case Priority::High: case Priority::High:
m_entries.emplace_front(entry); m_entries.emplace_front(api);
break; break;
} }
@@ -443,6 +443,11 @@ auto header_callback(char* b, size_t size, size_t nitems, void* userdata) -> siz
} }
auto DownloadInternal(CURL* curl, const Api& e) -> ApiResult { auto DownloadInternal(CURL* curl, const Api& e) -> ApiResult {
// check if stop has been requested before starting download
if (e.GetToken().stop_requested()) {
return {};
}
fs::FsPath tmp_buf; fs::FsPath tmp_buf;
const bool has_file = !e.GetPath().empty() && e.GetPath() != ""; const bool has_file = !e.GetPath().empty() && e.GetPath() != "";
const bool has_post = !e.GetFields().empty() && e.GetFields() != ""; const bool has_post = !e.GetFields().empty() && e.GetFields() != "";
@@ -596,23 +601,18 @@ void ThreadEntry::ThreadFunc(void* p) {
auto rc = waitSingle(waiterForUEvent(&data->m_uevent), UINT64_MAX); auto rc = waitSingle(waiterForUEvent(&data->m_uevent), UINT64_MAX);
// log_write("woke up\n"); // log_write("woke up\n");
if (!g_running) { if (!g_running) {
return; break;
} }
if (R_FAILED(rc)) { if (R_FAILED(rc)) {
continue; continue;
} }
#if 1
const auto result = DownloadInternal(data->m_curl, data->m_api); const auto result = DownloadInternal(data->m_curl, data->m_api);
if (g_running && data->m_api.GetOnComplete() && !data->m_api.GetToken().stop_requested()) { if (g_running && data->m_api.GetOnComplete() && !data->m_api.GetToken().stop_requested()) {
const DownloadEventData event_data{data->m_api.GetOnComplete(), result, data->m_api.GetToken()}; const DownloadEventData event_data{data->m_api.GetOnComplete(), result, data->m_api.GetToken()};
evman::push(std::move(event_data), false); evman::push(std::move(event_data), false);
} else {
break;
} }
#endif
// mutexLock(&data->m_mutex);
// ON_SCOPE_EXIT(mutexUnlock(&data->m_mutex));
data->m_in_progress = false; data->m_in_progress = false;
// notify the queue that there's a space free // notify the queue that there's a space free
@@ -750,39 +750,11 @@ auto ToFile(const Api& e) -> ApiResult {
} }
auto ToMemoryAsync(const Api& api) -> bool { auto ToMemoryAsync(const Api& api) -> bool {
#if USE_THREAD_QUEUE
return g_thread_queue.Add(api); return g_thread_queue.Add(api);
#else
// mutexLock(&g_thread_queue.m_mutex);
// ON_SCOPE_EXIT(mutexUnlock(&g_thread_queue.m_mutex));
for (auto& entry : g_threads) {
if (!entry.InProgress()) {
return entry.Setup(callback, url);
}
}
log_write("failed to start download, no avaliable threads\n");
return false;
#endif
} }
auto ToFileAsync(const Api& e) -> bool { auto ToFileAsync(const Api& e) -> bool {
#if USE_THREAD_QUEUE
return g_thread_queue.Add(e); return g_thread_queue.Add(e);
#else
// mutexLock(&g_thread_queue.m_mutex);
// ON_SCOPE_EXIT(mutexUnlock(&g_thread_queue.m_mutex));
for (auto& entry : g_threads) {
if (!entry.InProgress()) {
return entry.Setup(callback, url, out);
}
}
log_write("failed to start download, no avaliable threads\n");
return false;
#endif
} }
} // namespace sphaira::curl } // namespace sphaira::curl