From 4421ac1ceb866b33a152cf793fb447b3cda91f3b Mon Sep 17 00:00:00 2001 From: ITotalJustice <47043333+ITotalJustice@users.noreply.github.com> Date: Sat, 19 Jul 2025 18:33:47 +0100 Subject: [PATCH] use uevent in transfer loops to reduce latency between exiting and updating progress time. --- sphaira/include/ui/progress_box.hpp | 6 ++ sphaira/source/threaded_file_transfer.cpp | 97 ++++++++++++++++---- sphaira/source/ui/progress_box.cpp | 5 ++ sphaira/source/yati/yati.cpp | 105 ++++++++++++++++++++-- 4 files changed, 189 insertions(+), 24 deletions(-) diff --git a/sphaira/include/ui/progress_box.hpp b/sphaira/include/ui/progress_box.hpp index cb52f26..a985588 100644 --- a/sphaira/include/ui/progress_box.hpp +++ b/sphaira/include/ui/progress_box.hpp @@ -64,6 +64,11 @@ struct ProgressBox final : Widget { }; } + // auto-clear = false + auto GetCancelEvent() { + return &m_uevent; + } + private: void FreeImage(); @@ -75,6 +80,7 @@ public: }; private: + UEvent m_uevent{}; Mutex m_mutex{}; Thread m_thread{}; ThreadData m_thread_data{}; diff --git a/sphaira/source/threaded_file_transfer.cpp b/sphaira/source/threaded_file_transfer.cpp index 237c3ff..0767c84 100644 --- a/sphaira/source/threaded_file_transfer.cpp +++ b/sphaira/source/threaded_file_transfer.cpp @@ -77,16 +77,8 @@ struct ThreadData { auto GetResults() volatile -> Result; void WakeAllThreads(); - void SetReadResult(Result result) { - read_result = result; - } - - void SetWriteResult(Result result) { - write_result = result; - } - - void SetPullResult(Result result) { - pull_result = result; + auto IsAnyRunning() volatile const -> bool { + return read_running || write_running; } auto GetWriteOffset() volatile const -> s64 { @@ -97,6 +89,33 @@ struct ThreadData { return write_size; } + auto GetDoneEvent() { + return &m_uevent_done; + } + + auto GetProgressEvent() { + return &m_uevent_progres; + } + + void SetReadResult(Result result) { + read_result = result; + if (R_FAILED(result)) { + ueventSignal(GetDoneEvent()); + } + } + + void SetWriteResult(Result result) { + write_result = result; + ueventSignal(GetDoneEvent()); + } + + void SetPullResult(Result result) { + pull_result = result; + if (R_FAILED(result)) { + ueventSignal(GetDoneEvent()); + } + } + Result Pull(void* data, s64 size, u64* bytes_read); Result readFuncInternal(); Result writeFuncInternal(); @@ -124,6 +143,9 @@ private: CondVar can_pull{}; CondVar can_pull_write{}; + UEvent m_uevent_done{}; + UEvent m_uevent_progres{}; + RingBuf<2> write_buffers{}; std::vector pull_buffer{}; s64 pull_buffer_offset{}; @@ -138,6 +160,9 @@ private: std::atomic read_result{}; std::atomic write_result{}; std::atomic pull_result{}; + + std::atomic_bool read_running{true}; + std::atomic_bool write_running{true}; }; ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, WriteCallback _wfunc, u64 buffer_size) @@ -153,10 +178,13 @@ ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, Wr condvarInit(std::addressof(can_write)); condvarInit(std::addressof(can_pull)); condvarInit(std::addressof(can_pull_write)); + + ueventCreate(&m_uevent_done, false); + ueventCreate(&m_uevent_progres, true); } auto ThreadData::GetResults() volatile -> Result { - R_UNLESS(!pbox->ShouldExit(), Result_TransferCancelled); + R_TRY(pbox->ShouldExitResult()); R_TRY(read_result.load()); R_TRY(write_result.load()); R_TRY(pull_result.load()); @@ -178,6 +206,9 @@ Result ThreadData::SetWriteBuf(std::vector& buf, s64 size) { mutexLock(std::addressof(mutex)); if (!write_buffers.ringbuf_free()) { + if (!write_running) { + R_SUCCEED(); + } R_TRY(condvarWait(std::addressof(can_read), std::addressof(mutex))); } @@ -190,6 +221,10 @@ Result ThreadData::SetWriteBuf(std::vector& buf, s64 size) { Result ThreadData::GetWriteBuf(std::vector& buf_out, s64& off_out) { mutexLock(std::addressof(mutex)); if (!write_buffers.ringbuf_size()) { + if (!read_running) { + buf_out.resize(0); + R_SUCCEED(); + } R_TRY(condvarWait(std::addressof(can_write), std::addressof(mutex))); } @@ -249,6 +284,8 @@ Result ThreadData::Pull(void* data, s64 size, u64* bytes_read) { // read thread reads all data from the source Result ThreadData::readFuncInternal() { + ON_SCOPE_EXIT( read_running = false; ); + // the main buffer which data is read into. std::vector buf; buf.reserve(this->read_buffer_size); @@ -260,8 +297,11 @@ Result ThreadData::readFuncInternal() { u64 bytes_read{}; buf.resize(read_size); R_TRY(this->Read(buf.data(), read_size, std::addressof(bytes_read))); - auto buf_size = bytes_read; + if (!bytes_read) { + break; + } + auto buf_size = bytes_read; R_TRY(this->SetWriteBuf(buf, buf_size)); } @@ -269,8 +309,10 @@ Result ThreadData::readFuncInternal() { R_SUCCEED(); } -// write thread writes data to the nca placeholder. +// write thread writes data to wfunc. Result ThreadData::writeFuncInternal() { + ON_SCOPE_EXIT( write_running = false; ); + std::vector buf; buf.reserve(this->read_buffer_size); @@ -278,6 +320,10 @@ Result ThreadData::writeFuncInternal() { s64 dummy_off; R_TRY(this->GetWriteBuf(buf, dummy_off)); const auto size = buf.size(); + if (!size) { + log_write("exiting write func early because no data was received\n"); + break; + } if (!this->wfunc) { R_TRY(this->SetPullBuf(buf, buf.size())); @@ -286,6 +332,7 @@ Result ThreadData::writeFuncInternal() { } this->write_offset += size; + ueventSignal(GetProgressEvent()); } log_write("finished write thread success!\n"); @@ -339,6 +386,10 @@ Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, Wri u64 bytes_read; const auto rsize = std::min(buf.size(), size - offset); R_TRY(rfunc(buf.data(), offset, rsize, &bytes_read)); + if (!bytes_read) { + break; + } + R_TRY(wfunc(buf.data(), offset, bytes_read)); offset += bytes_read; @@ -382,15 +433,27 @@ Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, Wri R_TRY(start_threads()); log_write("[THREAD] started threads\n"); - while (t_data.GetWriteOffset() != t_data.GetWriteSize() && R_SUCCEEDED(t_data.GetResults())) { - pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize()); - svcSleepThread(1e+6); + const auto waiter_progress = waiterForUEvent(t_data.GetProgressEvent()); + const auto waiter_cancel = waiterForUEvent(pbox->GetCancelEvent()); + const auto waiter_done = waiterForUEvent(t_data.GetDoneEvent()); + + for (;;) { + s32 idx; + if (R_FAILED(waitMulti(&idx, UINT64_MAX, waiter_progress, waiter_cancel, waiter_done))) { + break; + } + + if (!idx) { + pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize()); + } else { + break; + } } } // wait for all threads to close. log_write("waiting for threads to close\n"); - for (;;) { + while (t_data.IsAnyRunning()) { t_data.WakeAllThreads(); pbox->Yield(); diff --git a/sphaira/source/ui/progress_box.cpp b/sphaira/source/ui/progress_box.cpp index d6a74a5..bb5882e 100644 --- a/sphaira/source/ui/progress_box.cpp +++ b/sphaira/source/ui/progress_box.cpp @@ -43,6 +43,9 @@ ProgressBox::ProgressBox(int image, const std::string& action, const std::string m_action = action; m_image = image; + // create cancel event. + ueventCreate(&m_uevent, false); + m_cpuid = cpuid; m_thread_data.pbox = this; m_thread_data.callback = callback; @@ -55,6 +58,7 @@ ProgressBox::ProgressBox(int image, const std::string& action, const std::string } ProgressBox::~ProgressBox() { + ueventSignal(GetCancelEvent()); m_stop_source.request_stop(); if (R_FAILED(threadWaitForExit(&m_thread))) { @@ -250,6 +254,7 @@ auto ProgressBox::SetImageDataConst(std::span data) -> ProgressBox& { void ProgressBox::RequestExit() { m_stop_source.request_stop(); + ueventSignal(GetCancelEvent()); } auto ProgressBox::ShouldExit() -> bool { diff --git a/sphaira/source/yati/yati.cpp b/sphaira/source/yati/yati.cpp index f2b0b20..54648a8 100644 --- a/sphaira/source/yati/yati.cpp +++ b/sphaira/source/yati/yati.cpp @@ -141,6 +141,9 @@ struct ThreadData { condvarInit(std::addressof(can_decompress_write)); condvarInit(std::addressof(can_write)); + ueventCreate(&m_uevent_done, false); + ueventCreate(&m_uevent_progres, true); + sha256ContextCreate(&sha256); // this will be updated with the actual size from nca header. write_size = nca->size; @@ -158,6 +161,45 @@ struct ThreadData { auto GetResults() volatile -> Result; void WakeAllThreads(); + auto IsAnyRunning() volatile const -> bool { + return read_running || decompress_result || write_running; + } + + auto GetWriteOffset() volatile const -> s64 { + return write_offset; + } + + auto GetWriteSize() volatile const -> s64 { + return write_size; + } + + auto GetDoneEvent() { + return &m_uevent_done; + } + + auto GetProgressEvent() { + return &m_uevent_progres; + } + + void SetReadResult(Result result) { + read_result = result; + if (R_FAILED(result)) { + ueventSignal(GetDoneEvent()); + } + } + + void SetDecompressResult(Result result) { + decompress_result = result; + if (R_FAILED(result)) { + ueventSignal(GetDoneEvent()); + } + } + + void SetWriteResult(Result result) { + write_result = result; + ueventSignal(GetDoneEvent()); + } + Result Read(void* buf, s64 size, u64* bytes_read); Result SetDecompressBuf(std::vector& buf, s64 off, s64 size) { @@ -165,6 +207,9 @@ struct ThreadData { mutexLock(std::addressof(read_mutex)); if (!read_buffers.ringbuf_free()) { + if (!write_running) { + R_SUCCEED(); + } R_TRY(condvarWait(std::addressof(can_read), std::addressof(read_mutex))); } @@ -177,6 +222,10 @@ struct ThreadData { Result GetDecompressBuf(std::vector& buf_out, s64& off_out) { mutexLock(std::addressof(read_mutex)); if (!read_buffers.ringbuf_size()) { + if (!read_running) { + buf_out.resize(0); + R_SUCCEED(); + } R_TRY(condvarWait(std::addressof(can_decompress), std::addressof(read_mutex))); } @@ -194,6 +243,9 @@ struct ThreadData { mutexLock(std::addressof(write_mutex)); if (!write_buffers.ringbuf_free()) { + if (!decompress_running) { + R_SUCCEED(); + } R_TRY(condvarWait(std::addressof(can_decompress_write), std::addressof(write_mutex))); } @@ -206,6 +258,10 @@ struct ThreadData { Result GetWriteBuf(std::vector& buf_out, s64& off_out) { mutexLock(std::addressof(write_mutex)); if (!write_buffers.ringbuf_size()) { + if (!decompress_running) { + buf_out.resize(0); + R_SUCCEED(); + } R_TRY(condvarWait(std::addressof(can_write), std::addressof(write_mutex))); } @@ -229,6 +285,9 @@ struct ThreadData { CondVar can_decompress_write{}; CondVar can_write{}; + UEvent m_uevent_done{}; + UEvent m_uevent_progres{}; + RingBuf<4> read_buffers{}; RingBuf<4> write_buffers{}; @@ -250,6 +309,10 @@ struct ThreadData { std::atomic read_result{}; std::atomic decompress_result{}; std::atomic write_result{}; + + std::atomic_bool read_running{true}; + std::atomic_bool decompress_running{true}; + std::atomic_bool write_running{true}; }; struct Yati { @@ -371,6 +434,8 @@ Result HasRequiredTicket(const nca::Header& header, std::span tik // read thread reads all data from the source, it also handles // parsing ncz headers, sections and reading ncz blocks Result Yati::readFuncInternal(ThreadData* t) { + ON_SCOPE_EXIT( t->read_running = false; ); + // the main buffer which data is read into. std::vector buf; // workaround ncz block reading ahead. if block isn't found, we usually @@ -401,6 +466,9 @@ Result Yati::readFuncInternal(ThreadData* t) { buf.resize(buf_offset + read_size); R_TRY(t->Read(buf.data() + buf_offset, read_size, std::addressof(bytes_read))); auto buf_size = buf_offset + bytes_read; + if (!bytes_read) { + break; + } // read enough bytes for ncz, check magic if (t->read_offset == NCZ_SECTION_OFFSET) { @@ -454,6 +522,8 @@ Result Yati::readFuncInternal(ThreadData* t) { // decompress thread handles decrypting / modifying the nca header, decompressing ncz // and calculating the running sha256. Result Yati::decompressFuncInternal(ThreadData* t) { + ON_SCOPE_EXIT( t->decompress_running = false; ); + // only used for ncz files. auto dctx = ZSTD_createDCtx(); ON_SCOPE_EXIT(ZSTD_freeDCtx(dctx)); @@ -534,6 +604,9 @@ Result Yati::decompressFuncInternal(ThreadData* t) { while (t->decompress_offset < t->write_size && R_SUCCEEDED(t->GetResults())) { s64 decompress_buf_off{}; R_TRY(t->GetDecompressBuf(buf, decompress_buf_off)); + if (buf.empty()) { + break; + } // do we have an nsz? if so, setup buffers. if (!is_ncz && !t->ncz_sections.empty()) { @@ -714,6 +787,8 @@ Result Yati::decompressFuncInternal(ThreadData* t) { // write thread writes data to the nca placeholder. Result Yati::writeFuncInternal(ThreadData* t) { + ON_SCOPE_EXIT( t->write_running = false; ); + std::vector buf; buf.reserve(t->max_buffer_size); const auto is_file_based_emummc = App::IsFileBaseEmummc(); @@ -721,6 +796,9 @@ Result Yati::writeFuncInternal(ThreadData* t) { while (t->write_offset < t->write_size && R_SUCCEEDED(t->GetResults())) { s64 dummy_off; R_TRY(t->GetWriteBuf(buf, dummy_off)); + if (buf.empty()) { + break; + } s64 off{}; while (off < buf.size() && t->write_offset < t->write_size && R_SUCCEEDED(t->GetResults())) { @@ -729,6 +807,7 @@ Result Yati::writeFuncInternal(ThreadData* t) { off += wsize; t->write_offset += wsize; + ueventSignal(t->GetProgressEvent()); // todo: check how much time elapsed and sleep the diff // rather than always sleeping a fixed amount. @@ -745,20 +824,20 @@ Result Yati::writeFuncInternal(ThreadData* t) { void readFunc(void* d) { auto t = static_cast(d); - t->read_result = t->yati->readFuncInternal(t); + t->SetReadResult(t->yati->readFuncInternal(t)); log_write("read thread returned now\n"); } void decompressFunc(void* d) { log_write("hello decomp thread func\n"); auto t = static_cast(d); - t->decompress_result = t->yati->decompressFuncInternal(t); + t->SetDecompressResult(t->yati->decompressFuncInternal(t)); log_write("decompress thread returned now\n"); } void writeFunc(void* d) { auto t = static_cast(d); - t->write_result = t->yati->writeFuncInternal(t); + t->SetWriteResult(t->yati->writeFuncInternal(t)); log_write("write thread returned now\n"); } @@ -900,14 +979,26 @@ Result Yati::InstallNcaInternal(std::span tickets, NcaCollection& R_TRY(threadStart(std::addressof(t_write))); ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_write))); - while (t_data.write_offset != t_data.write_size && R_SUCCEEDED(t_data.GetResults())) { - pbox->UpdateTransfer(t_data.write_offset, t_data.write_size); - svcSleepThread(1e+6); + const auto waiter_progress = waiterForUEvent(t_data.GetProgressEvent()); + const auto waiter_cancel = waiterForUEvent(pbox->GetCancelEvent()); + const auto waiter_done = waiterForUEvent(t_data.GetDoneEvent()); + + for (;;) { + s32 idx; + if (R_FAILED(waitMulti(&idx, UINT64_MAX, waiter_progress, waiter_cancel, waiter_done))) { + break; + } + + if (!idx) { + pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize()); + } else { + break; + } } // wait for all threads to close. log_write("waiting for threads to close\n"); - for (;;) { + while (t_data.IsAnyRunning()) { t_data.WakeAllThreads(); pbox->Yield();