use uevent in transfer loops to reduce latency between exiting and updating progress time.

This commit is contained in:
ITotalJustice
2025-07-19 18:33:47 +01:00
parent 159abfa246
commit 4421ac1ceb
4 changed files with 189 additions and 24 deletions

View File

@@ -64,6 +64,11 @@ struct ProgressBox final : Widget {
};
}
// auto-clear = false
auto GetCancelEvent() {
return &m_uevent;
}
private:
void FreeImage();
@@ -75,6 +80,7 @@ public:
};
private:
UEvent m_uevent{};
Mutex m_mutex{};
Thread m_thread{};
ThreadData m_thread_data{};

View File

@@ -77,16 +77,8 @@ struct ThreadData {
auto GetResults() volatile -> Result;
void WakeAllThreads();
void SetReadResult(Result result) {
read_result = result;
}
void SetWriteResult(Result result) {
write_result = result;
}
void SetPullResult(Result result) {
pull_result = result;
auto IsAnyRunning() volatile const -> bool {
return read_running || write_running;
}
auto GetWriteOffset() volatile const -> s64 {
@@ -97,6 +89,33 @@ struct ThreadData {
return write_size;
}
auto GetDoneEvent() {
return &m_uevent_done;
}
auto GetProgressEvent() {
return &m_uevent_progres;
}
void SetReadResult(Result result) {
read_result = result;
if (R_FAILED(result)) {
ueventSignal(GetDoneEvent());
}
}
void SetWriteResult(Result result) {
write_result = result;
ueventSignal(GetDoneEvent());
}
void SetPullResult(Result result) {
pull_result = result;
if (R_FAILED(result)) {
ueventSignal(GetDoneEvent());
}
}
Result Pull(void* data, s64 size, u64* bytes_read);
Result readFuncInternal();
Result writeFuncInternal();
@@ -124,6 +143,9 @@ private:
CondVar can_pull{};
CondVar can_pull_write{};
UEvent m_uevent_done{};
UEvent m_uevent_progres{};
RingBuf<2> write_buffers{};
std::vector<u8> pull_buffer{};
s64 pull_buffer_offset{};
@@ -138,6 +160,9 @@ private:
std::atomic<Result> read_result{};
std::atomic<Result> write_result{};
std::atomic<Result> pull_result{};
std::atomic_bool read_running{true};
std::atomic_bool write_running{true};
};
ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, WriteCallback _wfunc, u64 buffer_size)
@@ -153,10 +178,13 @@ ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, Wr
condvarInit(std::addressof(can_write));
condvarInit(std::addressof(can_pull));
condvarInit(std::addressof(can_pull_write));
ueventCreate(&m_uevent_done, false);
ueventCreate(&m_uevent_progres, true);
}
auto ThreadData::GetResults() volatile -> Result {
R_UNLESS(!pbox->ShouldExit(), Result_TransferCancelled);
R_TRY(pbox->ShouldExitResult());
R_TRY(read_result.load());
R_TRY(write_result.load());
R_TRY(pull_result.load());
@@ -178,6 +206,9 @@ Result ThreadData::SetWriteBuf(std::vector<u8>& buf, s64 size) {
mutexLock(std::addressof(mutex));
if (!write_buffers.ringbuf_free()) {
if (!write_running) {
R_SUCCEED();
}
R_TRY(condvarWait(std::addressof(can_read), std::addressof(mutex)));
}
@@ -190,6 +221,10 @@ Result ThreadData::SetWriteBuf(std::vector<u8>& buf, s64 size) {
Result ThreadData::GetWriteBuf(std::vector<u8>& buf_out, s64& off_out) {
mutexLock(std::addressof(mutex));
if (!write_buffers.ringbuf_size()) {
if (!read_running) {
buf_out.resize(0);
R_SUCCEED();
}
R_TRY(condvarWait(std::addressof(can_write), std::addressof(mutex)));
}
@@ -249,6 +284,8 @@ Result ThreadData::Pull(void* data, s64 size, u64* bytes_read) {
// read thread reads all data from the source
Result ThreadData::readFuncInternal() {
ON_SCOPE_EXIT( read_running = false; );
// the main buffer which data is read into.
std::vector<u8> buf;
buf.reserve(this->read_buffer_size);
@@ -260,8 +297,11 @@ Result ThreadData::readFuncInternal() {
u64 bytes_read{};
buf.resize(read_size);
R_TRY(this->Read(buf.data(), read_size, std::addressof(bytes_read)));
auto buf_size = bytes_read;
if (!bytes_read) {
break;
}
auto buf_size = bytes_read;
R_TRY(this->SetWriteBuf(buf, buf_size));
}
@@ -269,8 +309,10 @@ Result ThreadData::readFuncInternal() {
R_SUCCEED();
}
// write thread writes data to the nca placeholder.
// write thread writes data to wfunc.
Result ThreadData::writeFuncInternal() {
ON_SCOPE_EXIT( write_running = false; );
std::vector<u8> buf;
buf.reserve(this->read_buffer_size);
@@ -278,6 +320,10 @@ Result ThreadData::writeFuncInternal() {
s64 dummy_off;
R_TRY(this->GetWriteBuf(buf, dummy_off));
const auto size = buf.size();
if (!size) {
log_write("exiting write func early because no data was received\n");
break;
}
if (!this->wfunc) {
R_TRY(this->SetPullBuf(buf, buf.size()));
@@ -286,6 +332,7 @@ Result ThreadData::writeFuncInternal() {
}
this->write_offset += size;
ueventSignal(GetProgressEvent());
}
log_write("finished write thread success!\n");
@@ -339,6 +386,10 @@ Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, Wri
u64 bytes_read;
const auto rsize = std::min<s64>(buf.size(), size - offset);
R_TRY(rfunc(buf.data(), offset, rsize, &bytes_read));
if (!bytes_read) {
break;
}
R_TRY(wfunc(buf.data(), offset, bytes_read));
offset += bytes_read;
@@ -382,15 +433,27 @@ Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, Wri
R_TRY(start_threads());
log_write("[THREAD] started threads\n");
while (t_data.GetWriteOffset() != t_data.GetWriteSize() && R_SUCCEEDED(t_data.GetResults())) {
pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize());
svcSleepThread(1e+6);
const auto waiter_progress = waiterForUEvent(t_data.GetProgressEvent());
const auto waiter_cancel = waiterForUEvent(pbox->GetCancelEvent());
const auto waiter_done = waiterForUEvent(t_data.GetDoneEvent());
for (;;) {
s32 idx;
if (R_FAILED(waitMulti(&idx, UINT64_MAX, waiter_progress, waiter_cancel, waiter_done))) {
break;
}
if (!idx) {
pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize());
} else {
break;
}
}
}
// wait for all threads to close.
log_write("waiting for threads to close\n");
for (;;) {
while (t_data.IsAnyRunning()) {
t_data.WakeAllThreads();
pbox->Yield();

View File

@@ -43,6 +43,9 @@ ProgressBox::ProgressBox(int image, const std::string& action, const std::string
m_action = action;
m_image = image;
// create cancel event.
ueventCreate(&m_uevent, false);
m_cpuid = cpuid;
m_thread_data.pbox = this;
m_thread_data.callback = callback;
@@ -55,6 +58,7 @@ ProgressBox::ProgressBox(int image, const std::string& action, const std::string
}
ProgressBox::~ProgressBox() {
ueventSignal(GetCancelEvent());
m_stop_source.request_stop();
if (R_FAILED(threadWaitForExit(&m_thread))) {
@@ -250,6 +254,7 @@ auto ProgressBox::SetImageDataConst(std::span<const u8> data) -> ProgressBox& {
void ProgressBox::RequestExit() {
m_stop_source.request_stop();
ueventSignal(GetCancelEvent());
}
auto ProgressBox::ShouldExit() -> bool {

View File

@@ -141,6 +141,9 @@ struct ThreadData {
condvarInit(std::addressof(can_decompress_write));
condvarInit(std::addressof(can_write));
ueventCreate(&m_uevent_done, false);
ueventCreate(&m_uevent_progres, true);
sha256ContextCreate(&sha256);
// this will be updated with the actual size from nca header.
write_size = nca->size;
@@ -158,6 +161,45 @@ struct ThreadData {
auto GetResults() volatile -> Result;
void WakeAllThreads();
auto IsAnyRunning() volatile const -> bool {
return read_running || decompress_result || write_running;
}
auto GetWriteOffset() volatile const -> s64 {
return write_offset;
}
auto GetWriteSize() volatile const -> s64 {
return write_size;
}
auto GetDoneEvent() {
return &m_uevent_done;
}
auto GetProgressEvent() {
return &m_uevent_progres;
}
void SetReadResult(Result result) {
read_result = result;
if (R_FAILED(result)) {
ueventSignal(GetDoneEvent());
}
}
void SetDecompressResult(Result result) {
decompress_result = result;
if (R_FAILED(result)) {
ueventSignal(GetDoneEvent());
}
}
void SetWriteResult(Result result) {
write_result = result;
ueventSignal(GetDoneEvent());
}
Result Read(void* buf, s64 size, u64* bytes_read);
Result SetDecompressBuf(std::vector<u8>& buf, s64 off, s64 size) {
@@ -165,6 +207,9 @@ struct ThreadData {
mutexLock(std::addressof(read_mutex));
if (!read_buffers.ringbuf_free()) {
if (!write_running) {
R_SUCCEED();
}
R_TRY(condvarWait(std::addressof(can_read), std::addressof(read_mutex)));
}
@@ -177,6 +222,10 @@ struct ThreadData {
Result GetDecompressBuf(std::vector<u8>& buf_out, s64& off_out) {
mutexLock(std::addressof(read_mutex));
if (!read_buffers.ringbuf_size()) {
if (!read_running) {
buf_out.resize(0);
R_SUCCEED();
}
R_TRY(condvarWait(std::addressof(can_decompress), std::addressof(read_mutex)));
}
@@ -194,6 +243,9 @@ struct ThreadData {
mutexLock(std::addressof(write_mutex));
if (!write_buffers.ringbuf_free()) {
if (!decompress_running) {
R_SUCCEED();
}
R_TRY(condvarWait(std::addressof(can_decompress_write), std::addressof(write_mutex)));
}
@@ -206,6 +258,10 @@ struct ThreadData {
Result GetWriteBuf(std::vector<u8>& buf_out, s64& off_out) {
mutexLock(std::addressof(write_mutex));
if (!write_buffers.ringbuf_size()) {
if (!decompress_running) {
buf_out.resize(0);
R_SUCCEED();
}
R_TRY(condvarWait(std::addressof(can_write), std::addressof(write_mutex)));
}
@@ -229,6 +285,9 @@ struct ThreadData {
CondVar can_decompress_write{};
CondVar can_write{};
UEvent m_uevent_done{};
UEvent m_uevent_progres{};
RingBuf<4> read_buffers{};
RingBuf<4> write_buffers{};
@@ -250,6 +309,10 @@ struct ThreadData {
std::atomic<Result> read_result{};
std::atomic<Result> decompress_result{};
std::atomic<Result> write_result{};
std::atomic_bool read_running{true};
std::atomic_bool decompress_running{true};
std::atomic_bool write_running{true};
};
struct Yati {
@@ -371,6 +434,8 @@ Result HasRequiredTicket(const nca::Header& header, std::span<TikCollection> tik
// read thread reads all data from the source, it also handles
// parsing ncz headers, sections and reading ncz blocks
Result Yati::readFuncInternal(ThreadData* t) {
ON_SCOPE_EXIT( t->read_running = false; );
// the main buffer which data is read into.
std::vector<u8> buf;
// workaround ncz block reading ahead. if block isn't found, we usually
@@ -401,6 +466,9 @@ Result Yati::readFuncInternal(ThreadData* t) {
buf.resize(buf_offset + read_size);
R_TRY(t->Read(buf.data() + buf_offset, read_size, std::addressof(bytes_read)));
auto buf_size = buf_offset + bytes_read;
if (!bytes_read) {
break;
}
// read enough bytes for ncz, check magic
if (t->read_offset == NCZ_SECTION_OFFSET) {
@@ -454,6 +522,8 @@ Result Yati::readFuncInternal(ThreadData* t) {
// decompress thread handles decrypting / modifying the nca header, decompressing ncz
// and calculating the running sha256.
Result Yati::decompressFuncInternal(ThreadData* t) {
ON_SCOPE_EXIT( t->decompress_running = false; );
// only used for ncz files.
auto dctx = ZSTD_createDCtx();
ON_SCOPE_EXIT(ZSTD_freeDCtx(dctx));
@@ -534,6 +604,9 @@ Result Yati::decompressFuncInternal(ThreadData* t) {
while (t->decompress_offset < t->write_size && R_SUCCEEDED(t->GetResults())) {
s64 decompress_buf_off{};
R_TRY(t->GetDecompressBuf(buf, decompress_buf_off));
if (buf.empty()) {
break;
}
// do we have an nsz? if so, setup buffers.
if (!is_ncz && !t->ncz_sections.empty()) {
@@ -714,6 +787,8 @@ Result Yati::decompressFuncInternal(ThreadData* t) {
// write thread writes data to the nca placeholder.
Result Yati::writeFuncInternal(ThreadData* t) {
ON_SCOPE_EXIT( t->write_running = false; );
std::vector<u8> buf;
buf.reserve(t->max_buffer_size);
const auto is_file_based_emummc = App::IsFileBaseEmummc();
@@ -721,6 +796,9 @@ Result Yati::writeFuncInternal(ThreadData* t) {
while (t->write_offset < t->write_size && R_SUCCEEDED(t->GetResults())) {
s64 dummy_off;
R_TRY(t->GetWriteBuf(buf, dummy_off));
if (buf.empty()) {
break;
}
s64 off{};
while (off < buf.size() && t->write_offset < t->write_size && R_SUCCEEDED(t->GetResults())) {
@@ -729,6 +807,7 @@ Result Yati::writeFuncInternal(ThreadData* t) {
off += wsize;
t->write_offset += wsize;
ueventSignal(t->GetProgressEvent());
// todo: check how much time elapsed and sleep the diff
// rather than always sleeping a fixed amount.
@@ -745,20 +824,20 @@ Result Yati::writeFuncInternal(ThreadData* t) {
void readFunc(void* d) {
auto t = static_cast<ThreadData*>(d);
t->read_result = t->yati->readFuncInternal(t);
t->SetReadResult(t->yati->readFuncInternal(t));
log_write("read thread returned now\n");
}
void decompressFunc(void* d) {
log_write("hello decomp thread func\n");
auto t = static_cast<ThreadData*>(d);
t->decompress_result = t->yati->decompressFuncInternal(t);
t->SetDecompressResult(t->yati->decompressFuncInternal(t));
log_write("decompress thread returned now\n");
}
void writeFunc(void* d) {
auto t = static_cast<ThreadData*>(d);
t->write_result = t->yati->writeFuncInternal(t);
t->SetWriteResult(t->yati->writeFuncInternal(t));
log_write("write thread returned now\n");
}
@@ -900,14 +979,26 @@ Result Yati::InstallNcaInternal(std::span<TikCollection> tickets, NcaCollection&
R_TRY(threadStart(std::addressof(t_write)));
ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_write)));
while (t_data.write_offset != t_data.write_size && R_SUCCEEDED(t_data.GetResults())) {
pbox->UpdateTransfer(t_data.write_offset, t_data.write_size);
svcSleepThread(1e+6);
const auto waiter_progress = waiterForUEvent(t_data.GetProgressEvent());
const auto waiter_cancel = waiterForUEvent(pbox->GetCancelEvent());
const auto waiter_done = waiterForUEvent(t_data.GetDoneEvent());
for (;;) {
s32 idx;
if (R_FAILED(waitMulti(&idx, UINT64_MAX, waiter_progress, waiter_cancel, waiter_done))) {
break;
}
if (!idx) {
pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize());
} else {
break;
}
}
// wait for all threads to close.
log_write("waiting for threads to close\n");
for (;;) {
while (t_data.IsAnyRunning()) {
t_data.WakeAllThreads();
pbox->Yield();