multi-thread zip and unzip code. option to download appstore zip to mem. hasher mem support.

This commit is contained in:
ITotalJustice
2025-05-30 12:34:29 +01:00
parent 17b341d83a
commit 390c1e870d
9 changed files with 468 additions and 510 deletions

View File

@@ -6,15 +6,20 @@
#include <vector>
#include <algorithm>
#include <cstring>
#include <minizip/unzip.h>
#include <minizip/zip.h>
namespace sphaira::thread {
namespace {
constexpr u64 READ_BUFFER_MAX = 1024*1024*4;
// used for file based emummc and zip/unzip.
constexpr u64 SMALL_BUFFER_SIZE = 1024 * 512;
// used for everything else.
constexpr u64 NORMAL_BUFFER_SIZE = 1024*1024*4;
struct ThreadBuffer {
ThreadBuffer() {
buf.reserve(READ_BUFFER_MAX);
buf.reserve(NORMAL_BUFFER_SIZE);
}
std::vector<u8> buf;
@@ -65,7 +70,7 @@ public:
};
struct ThreadData {
ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, WriteCallback _wfunc);
ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, WriteCallback _wfunc, u64 buffer_size);
auto GetResults() -> Result;
void WakeAllThreads();
@@ -104,9 +109,9 @@ private:
private:
// these need to be copied
ui::ProgressBox* pbox{};
ReadCallback rfunc{};
WriteCallback wfunc{};
ui::ProgressBox* const pbox;
const ReadCallback rfunc;
const WriteCallback wfunc;
// these need to be created
Mutex mutex{};
@@ -121,21 +126,24 @@ private:
std::vector<u8> pull_buffer{};
s64 pull_buffer_offset{};
u64 read_buffer_size{};
u64 max_buffer_size{};
const u64 read_buffer_size;
const s64 write_size;
// these are shared between threads
volatile s64 read_offset{};
volatile s64 write_offset{};
volatile s64 write_size{};
volatile Result read_result{};
volatile Result write_result{};
volatile Result pull_result{};
};
ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, WriteCallback _wfunc)
: pbox{_pbox}, rfunc{_rfunc}, wfunc{_wfunc} {
ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, WriteCallback _wfunc, u64 buffer_size)
: pbox{_pbox}
, rfunc{_rfunc}
, wfunc{_wfunc}
, read_buffer_size{buffer_size}
, write_size{size} {
mutexInit(std::addressof(mutex));
mutexInit(std::addressof(pull_mutex));
@@ -143,16 +151,6 @@ ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, Wr
condvarInit(std::addressof(can_write));
condvarInit(std::addressof(can_pull));
condvarInit(std::addressof(can_pull_write));
write_size = size;
if (App::IsFileBaseEmummc()) {
read_buffer_size = 1024 * 512;
max_buffer_size = 1024 * 512;
} else {
read_buffer_size = READ_BUFFER_MAX;
max_buffer_size = READ_BUFFER_MAX;
}
}
auto ThreadData::GetResults() -> Result {
@@ -251,7 +249,7 @@ Result ThreadData::Pull(void* data, s64 size, u64* bytes_read) {
Result ThreadData::readFuncInternal() {
// the main buffer which data is read into.
std::vector<u8> buf;
buf.reserve(this->max_buffer_size);
buf.reserve(this->read_buffer_size);
while (this->read_offset < this->write_size && R_SUCCEEDED(this->GetResults())) {
// read more data
@@ -265,14 +263,14 @@ Result ThreadData::readFuncInternal() {
R_TRY(this->SetWriteBuf(buf, buf_size));
}
log_write("read success\n");
log_write("finished read thread success!\n");
R_SUCCEED();
}
// write thread writes data to the nca placeholder.
Result ThreadData::writeFuncInternal() {
std::vector<u8> buf;
buf.reserve(this->max_buffer_size);
buf.reserve(this->read_buffer_size);
while (this->write_offset < this->write_size && R_SUCCEEDED(this->GetResults())) {
s64 dummy_off;
@@ -288,7 +286,7 @@ Result ThreadData::writeFuncInternal() {
this->write_offset += size;
}
log_write("finished write thread!\n");
log_write("finished write thread success!\n");
R_SUCCEED();
}
@@ -308,87 +306,264 @@ auto GetAlternateCore(int id) {
return id == 1 ? 2 : 1;
}
Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, WriteCallback wfunc, StartCallback2 sfunc) {
App::SetAutoSleepDisabled(true);
ON_SCOPE_EXIT(App::SetAutoSleepDisabled(false));
Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, WriteCallback wfunc, StartCallback2 sfunc, Mode mode, u64 buffer_size = NORMAL_BUFFER_SIZE) {
const auto is_file_based_emummc = App::IsFileBaseEmummc();
const auto WRITE_THREAD_CORE = sfunc ? pbox->GetCpuId() : GetAlternateCore(pbox->GetCpuId());
const auto READ_THREAD_CORE = GetAlternateCore(WRITE_THREAD_CORE);
if (is_file_based_emummc) {
buffer_size = SMALL_BUFFER_SIZE;
}
ThreadData t_data{pbox, size, rfunc, wfunc};
if (mode == Mode::SingleThreadedIfSmaller) {
if (size <= buffer_size) {
mode = Mode::SingleThreaded;
} else {
mode = Mode::MultiThreaded;
}
}
Thread t_read{};
R_TRY(threadCreate(&t_read, readFunc, std::addressof(t_data), nullptr, 1024*64, PRIO_PREEMPTIVE, READ_THREAD_CORE));
ON_SCOPE_EXIT(threadClose(&t_read));
// single threaded pull buffer is not supported.
R_UNLESS(mode != Mode::MultiThreaded || !sfunc, 0x1);
Thread t_write{};
R_TRY(threadCreate(&t_write, writeFunc, std::addressof(t_data), nullptr, 1024*64, PRIO_PREEMPTIVE, WRITE_THREAD_CORE));
ON_SCOPE_EXIT(threadClose(&t_write));
// todo: support single threaded pull buffer.
if (mode == Mode::SingleThreaded) {
std::vector<u8> buf(buffer_size);
s64 offset{};
while (offset < size) {
R_TRY(pbox->ShouldExitResult());
u64 bytes_read;
const auto rsize = std::min<s64>(buf.size(), size - offset);
R_TRY(rfunc(buf.data(), offset, rsize, &bytes_read));
R_TRY(wfunc(buf.data(), offset, bytes_read));
offset += bytes_read;
pbox->UpdateTransfer(offset, size);
}
const auto start_threads = [&]() -> Result {
log_write("starting threads\n");
R_TRY(threadStart(std::addressof(t_read)));
R_TRY(threadStart(std::addressof(t_write)));
R_SUCCEED();
};
ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_read)));
ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_write)));
if (sfunc) {
t_data.SetPullResult(sfunc(start_threads, [&](void* data, s64 size, u64* bytes_read) -> Result {
R_TRY(t_data.GetResults());
return t_data.Pull(data, size, bytes_read);
}));
} else {
R_TRY(start_threads());
while (t_data.GetWriteOffset() != t_data.GetWriteSize() && R_SUCCEEDED(t_data.GetResults())) {
pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize());
svcSleepThread(1e+6);
}
}
else {
const auto WRITE_THREAD_CORE = sfunc ? pbox->GetCpuId() : GetAlternateCore(pbox->GetCpuId());
const auto READ_THREAD_CORE = GetAlternateCore(WRITE_THREAD_CORE);
// wait for all threads to close.
log_write("waiting for threads to close\n");
for (;;) {
t_data.WakeAllThreads();
pbox->Yield();
ThreadData t_data{pbox, size, rfunc, wfunc, buffer_size};
if (R_FAILED(waitSingleHandle(t_read.handle, 1000))) {
continue;
} else if (R_FAILED(waitSingleHandle(t_write.handle, 1000))) {
continue;
Thread t_read{};
R_TRY(threadCreate(&t_read, readFunc, std::addressof(t_data), nullptr, 1024*256, 0x3B, READ_THREAD_CORE));
ON_SCOPE_EXIT(threadClose(&t_read));
Thread t_write{};
R_TRY(threadCreate(&t_write, writeFunc, std::addressof(t_data), nullptr, 1024*256, 0x3B, WRITE_THREAD_CORE));
ON_SCOPE_EXIT(threadClose(&t_write));
const auto start_threads = [&]() -> Result {
log_write("starting threads\n");
R_TRY(threadStart(std::addressof(t_read)));
R_TRY(threadStart(std::addressof(t_write)));
R_SUCCEED();
};
ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_read)));
ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_write)));
if (sfunc) {
log_write("[THREAD] doing sfuncn\n");
t_data.SetPullResult(sfunc(start_threads, [&](void* data, s64 size, u64* bytes_read) -> Result {
R_TRY(t_data.GetResults());
return t_data.Pull(data, size, bytes_read);
}));
}
break;
}
log_write("threads closed\n");
else {
log_write("[THREAD] doing normal\n");
R_TRY(start_threads());
log_write("[THREAD] started threads\n");
// if any of the threads failed, wake up all threads so they can exit.
if (R_FAILED(t_data.GetResults())) {
log_write("some reads failed, waking threads\n");
log_write("returning due to fail\n");
while (t_data.GetWriteOffset() != t_data.GetWriteSize() && R_SUCCEEDED(t_data.GetResults())) {
pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize());
svcSleepThread(1e+6);
}
}
// wait for all threads to close.
log_write("waiting for threads to close\n");
for (;;) {
t_data.WakeAllThreads();
pbox->Yield();
if (R_FAILED(waitSingleHandle(t_read.handle, 1000))) {
continue;
} else if (R_FAILED(waitSingleHandle(t_write.handle, 1000))) {
continue;
}
break;
}
log_write("threads closed\n");
// if any of the threads failed, wake up all threads so they can exit.
if (R_FAILED(t_data.GetResults())) {
log_write("some reads failed, waking threads\n");
log_write("returning due to fail\n");
return t_data.GetResults();
}
log_write("returning from thread func\n");
return t_data.GetResults();
}
return t_data.GetResults();
}
} // namespace
Result Transfer(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, WriteCallback wfunc) {
return TransferInternal(pbox, size, rfunc, wfunc, nullptr);
Result Transfer(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, WriteCallback wfunc, Mode mode) {
return TransferInternal(pbox, size, rfunc, wfunc, nullptr, Mode::MultiThreaded);
}
Result TransferPull(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, StartCallback sfunc) {
Result TransferPull(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, StartCallback sfunc, Mode mode) {
return TransferInternal(pbox, size, rfunc, nullptr, [sfunc](StartThreadCallback start, PullCallback pull) -> Result {
R_TRY(start());
return sfunc(pull);
});
}, Mode::MultiThreaded);
}
Result TransferPull(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, StartCallback2 sfunc) {
return TransferInternal(pbox, size, rfunc, nullptr, sfunc);
Result TransferPull(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, StartCallback2 sfunc, Mode mode) {
return TransferInternal(pbox, size, rfunc, nullptr, sfunc, Mode::MultiThreaded);
}
Result TransferUnzip(ui::ProgressBox* pbox, void* zfile, fs::Fs* fs, const fs::FsPath& path, s64 size, u32 crc32) {
Result rc;
if (R_FAILED(rc = fs->CreateDirectoryRecursivelyWithPath(path)) && rc != FsError_PathAlreadyExists) {
log_write("failed to create folder: %s 0x%04X\n", path.s, rc);
R_THROW(rc);
}
if (R_FAILED(rc = fs->CreateFile(path, size, 0)) && rc != FsError_PathAlreadyExists) {
log_write("failed to create file: %s 0x%04X\n", path.s, rc);
R_THROW(rc);
}
fs::File f;
R_TRY(fs->OpenFile(path, FsOpenMode_Write, &f));
// only update the size if this is an existing file.
if (rc == FsError_PathAlreadyExists) {
R_TRY(f.SetSize(size));
}
// NOTES: do not use temp file with rename / delete after as it massively slows
// down small file transfers (RA 21s -> 50s).
u32 crc32_out{};
R_TRY(thread::TransferInternal(pbox, size,
[&](void* data, s64 off, s64 size, u64* bytes_read) -> Result {
const auto result = unzReadCurrentFile(zfile, data, size);
if (result <= 0) {
// log_write("failed to read zip file: %s\n", inzip.c_str());
R_THROW(0x1);
}
if (crc32) {
crc32_out = crc32CalculateWithSeed(crc32_out, data, result);
}
*bytes_read = result;
R_SUCCEED();
},
[&](const void* data, s64 off, s64 size) -> Result {
return f.Write(off, data, size, FsWriteOption_None);
},
nullptr, Mode::SingleThreadedIfSmaller, SMALL_BUFFER_SIZE
));
// validate crc32 (if set in the info).
R_UNLESS(!crc32 || crc32 == crc32_out, 0x1);
R_SUCCEED();
}
Result TransferZip(ui::ProgressBox* pbox, void* zfile, fs::Fs* fs, const fs::FsPath& path) {
fs::File f;
R_TRY(fs->OpenFile(path, FsOpenMode_Read, &f));
s64 file_size;
R_TRY(f.GetSize(&file_size));
return thread::TransferInternal(pbox, file_size,
[&](void* data, s64 off, s64 size, u64* bytes_read) -> Result {
return f.Read(off, data, size, FsReadOption_None, bytes_read);
},
[&](const void* data, s64 off, s64 size) -> Result {
if (ZIP_OK != zipWriteInFileInZip(zfile, data, size)) {
log_write("failed to write zip file: %s\n", path.s);
R_THROW(0x1);
}
R_SUCCEED();
},
nullptr, Mode::SingleThreadedIfSmaller, SMALL_BUFFER_SIZE
);
}
Result TransferUnzipAll(ui::ProgressBox* pbox, void* zfile, fs::Fs* fs, const fs::FsPath& base_path, UnzipAllFilter filter) {
unz_global_info64 ginfo;
if (UNZ_OK != unzGetGlobalInfo64(zfile, &ginfo)) {
R_THROW(0x1);
}
if (UNZ_OK != unzGoToFirstFile(zfile)) {
R_THROW(0x1);
}
for (s64 i = 0; i < ginfo.number_entry; i++) {
R_TRY(pbox->ShouldExitResult());
if (i > 0) {
if (UNZ_OK != unzGoToNextFile(zfile)) {
log_write("failed to unzGoToNextFile\n");
R_THROW(0x1);
}
}
if (UNZ_OK != unzOpenCurrentFile(zfile)) {
log_write("failed to open current file\n");
R_THROW(0x1);
}
ON_SCOPE_EXIT(unzCloseCurrentFile(zfile));
unz_file_info64 info;
fs::FsPath name;
if (UNZ_OK != unzGetCurrentFileInfo64(zfile, &info, name, sizeof(name), 0, 0, 0, 0)) {
log_write("failed to get current info\n");
R_THROW(0x1);
}
// check if we should skip this file.
// don't make const as to allow the function to modify the path
// this function is used for the updater to change sphaira.nro to exe path.
auto path = fs::AppendPath(base_path, name);
if (filter && !filter(name, path)) {
continue;
}
pbox->NewTransfer(name);
if (path[std::strlen(path) -1] == '/') {
Result rc;
if (R_FAILED(rc = fs->CreateDirectoryRecursively(path)) && rc != FsError_PathAlreadyExists) {
log_write("failed to create folder: %s 0x%04X\n", path.s, rc);
R_THROW(rc);
}
} else {
R_TRY(TransferUnzip(pbox, zfile, fs, path, info.uncompressed_size, info.crc));
}
}
R_SUCCEED();
}
Result TransferUnzipAll(ui::ProgressBox* pbox, const fs::FsPath& zip_out, fs::Fs* fs, const fs::FsPath& base_path, UnzipAllFilter filter) {
auto zfile = unzOpen64(zip_out);
R_UNLESS(zfile, 0x1);
ON_SCOPE_EXIT(unzClose(zfile));
return TransferUnzipAll(pbox, zfile, fs, base_path, filter);
}
} // namespace::thread