add support for streamed usb upload, multi thread usb uploads / dumps.

Some notes i made when adding stream support:
The tinfoil API makes it hard / impossible to multi thread the file upload because each data transfer passes the the file name and offset, meaning that it can and will change files and offset in the middle of a transfer
So that prevents the read thread from running freely in the background and the pull thread pulling data when requested.
The extension adds a flag to the usb header which if set, enables stream mode (same as ftp installs). This removes random access, but allows for multi threading as the data will be requested in order.
This commit is contained in:
ITotalJustice
2025-05-21 13:19:46 +01:00
parent a67171e2b8
commit fe2a1a3a80
10 changed files with 244 additions and 68 deletions

View File

@@ -65,7 +65,7 @@ public:
};
struct ThreadData {
ThreadData(ui::ProgressBox* _pbox, s64 size, ReadFunctionCallback _rfunc, WriteFunctionCallback _wfunc);
ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, WriteCallback _wfunc);
auto GetResults() -> Result;
void WakeAllThreads();
@@ -105,8 +105,8 @@ private:
private:
// these need to be copied
ui::ProgressBox* pbox{};
ReadFunctionCallback rfunc{};
WriteFunctionCallback wfunc{};
ReadCallback rfunc{};
WriteCallback wfunc{};
// these need to be created
Mutex mutex{};
@@ -134,7 +134,7 @@ private:
volatile Result pull_result{};
};
ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadFunctionCallback _rfunc, WriteFunctionCallback _wfunc)
ThreadData::ThreadData(ui::ProgressBox* _pbox, s64 size, ReadCallback _rfunc, WriteCallback _wfunc)
: pbox{_pbox}, rfunc{_rfunc}, wfunc{_wfunc} {
mutexInit(std::addressof(mutex));
mutexInit(std::addressof(pull_mutex));
@@ -302,7 +302,7 @@ auto GetAlternateCore(int id) {
return id == 1 ? 2 : 1;
}
Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadFunctionCallback rfunc, WriteFunctionCallback wfunc, StartFunctionCallback sfunc) {
Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, WriteCallback wfunc, StartCallback2 sfunc) {
App::SetAutoSleepDisabled(true);
ON_SCOPE_EXIT(App::SetAutoSleepDisabled(false));
@@ -319,19 +319,24 @@ Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadFunctionCallback rf
R_TRY(threadCreate(&t_write, writeFunc, std::addressof(t_data), nullptr, 1024*64, 0x20, WRITE_THREAD_CORE));
ON_SCOPE_EXIT(threadClose(&t_write));
log_write("starting threads\n");
R_TRY(threadStart(std::addressof(t_read)));
ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_read)));
const auto start_threads = [&]() -> Result {
log_write("starting threads\n");
R_TRY(threadStart(std::addressof(t_read)));
R_TRY(threadStart(std::addressof(t_write)));
R_SUCCEED();
};
R_TRY(threadStart(std::addressof(t_write)));
ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_read)));
ON_SCOPE_EXIT(threadWaitForExit(std::addressof(t_write)));
if (sfunc) {
t_data.SetPullResult(sfunc([&](void* data, s64 size, u64* bytes_read) -> Result {
t_data.SetPullResult(sfunc(start_threads, [&](void* data, s64 size, u64* bytes_read) -> Result {
R_TRY(t_data.GetResults());
return t_data.Pull(data, size, bytes_read);
}));
} else {
R_TRY(start_threads());
while (t_data.GetWriteOffset() != t_data.GetWriteSize() && R_SUCCEEDED(t_data.GetResults())) {
pbox->UpdateTransfer(t_data.GetWriteOffset(), t_data.GetWriteSize());
svcSleepThread(1e+6);
@@ -365,11 +370,18 @@ Result TransferInternal(ui::ProgressBox* pbox, s64 size, ReadFunctionCallback rf
} // namespace
Result Transfer(ui::ProgressBox* pbox, s64 size, ReadFunctionCallback rfunc, WriteFunctionCallback wfunc) {
Result Transfer(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, WriteCallback wfunc) {
return TransferInternal(pbox, size, rfunc, wfunc, nullptr);
}
Result TransferPull(ui::ProgressBox* pbox, s64 size, ReadFunctionCallback rfunc, StartFunctionCallback sfunc) {
Result TransferPull(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, StartCallback sfunc) {
return TransferInternal(pbox, size, rfunc, nullptr, [sfunc](StartThreadCallback start, PullCallback pull) -> Result {
R_TRY(start());
return sfunc(pull);
});
}
Result TransferPull(ui::ProgressBox* pbox, s64 size, ReadCallback rfunc, StartCallback2 sfunc) {
return TransferInternal(pbox, size, rfunc, nullptr, sfunc);
}

View File

@@ -1215,7 +1215,7 @@ void Menu::UploadFiles() {
[&](void* data, s64 off, s64 size, u64* bytes_read) -> Result {
return fsFileRead(&file, off, data, size, FsReadOption_None, bytes_read);
},
[&](thread::PullFunctionCallback pull) -> Result {
[&](thread::PullCallback pull) -> Result {
s64 offset{};
const auto result = curl::Api().FromMemory(
CURL_LOCATION_TO_API(loc),

View File

@@ -20,8 +20,10 @@
#include "yati/nx/es.hpp"
#include "yati/container/base.hpp"
#include "yati/container/nsp.hpp"
#include "yati/source/stream.hpp"
#include "usb/usb_uploader.hpp"
#include "usb/tinfoil.hpp"
#include <utility>
#include <cstring>
@@ -250,16 +252,31 @@ private:
std::span<NspEntry> m_entries{};
};
struct UsbTest final : usb::upload::Usb {
struct UsbTest final : usb::upload::Usb, yati::source::Stream {
UsbTest(ProgressBox* pbox, std::span<NspEntry> entries) : Usb{UINT64_MAX} {
m_source = std::make_unique<NspSource>(entries);
m_pbox = pbox;
}
Result ReadChunk(void* buf, s64 size, u64* bytes_read) override {
R_TRY(m_pull(buf, size, bytes_read));
m_pull_offset += *bytes_read;
R_SUCCEED();
}
Result Read(const std::string& path, void* buf, s64 off, s64 size, u64* bytes_read) override {
if (m_pull) {
return Stream::Read(buf, off, size, bytes_read);
} else {
return ReadInternal(path, buf, off, size, bytes_read);
}
}
Result ReadInternal(const std::string& path, void* buf, s64 off, s64 size, u64* bytes_read) {
if (m_path != path) {
m_path = path;
m_progress = 0;
m_pull_offset = 0;
m_size = m_source->GetSize(path);
m_pbox->SetTitle(m_source->GetName(path));
m_pbox->NewTransfer(m_path);
@@ -274,13 +291,27 @@ struct UsbTest final : usb::upload::Usb {
R_SUCCEED();
}
void SetPullCallback(thread::PullCallback pull) {
m_pull = pull;
}
auto* GetSource() {
return m_source.get();
}
auto GetPullOffset() const {
return m_pull_offset;
}
private:
std::unique_ptr<NspSource> m_source{};
ProgressBox* m_pbox{};
std::string m_path{};
thread::PullCallback m_pull{};
s64 m_offset{};
s64 m_size{};
s64 m_progress{};
s64 m_pull_offset{};
};
Result DumpNspToFile(ProgressBox* pbox, std::span<NspEntry> entries) {
@@ -328,6 +359,44 @@ Result DumpNspToFile(ProgressBox* pbox, std::span<NspEntry> entries) {
R_SUCCEED();
}
Result DumpNspToUsbS2SStream(ProgressBox* pbox, UsbTest* usb, std::span<const std::string> file_list, std::span<NspEntry> entries) {
auto source = usb->GetSource();
for (auto& path : file_list) {
const auto file_size = source->GetSize(path);
R_TRY(thread::TransferPull(pbox, file_size,
[&](void* data, s64 off, s64 size, u64* bytes_read) -> Result {
return usb->ReadInternal(path, data, off, size, bytes_read);
},
[&](thread::StartThreadCallback start, thread::PullCallback pull) -> Result {
usb->SetPullCallback(pull);
R_TRY(start());
while (!pbox->ShouldExit()) {
R_TRY(usb->PollCommands());
if (usb->GetPullOffset() >= file_size) {
R_SUCCEED();
}
}
R_THROW(0xFFFF);
}
));
}
R_SUCCEED();
}
Result DumpNspToUsbS2SRandom(ProgressBox* pbox, UsbTest* usb, std::span<const std::string> file_list, std::span<NspEntry> entries) {
while (!pbox->ShouldExit()) {
R_TRY(usb->PollCommands());
}
R_THROW(0xFFFF);
}
Result DumpNspToUsbS2S(ProgressBox* pbox, std::span<NspEntry> entries) {
std::vector<std::string> file_list;
for (auto& e : entries) {
@@ -337,30 +406,38 @@ Result DumpNspToUsbS2S(ProgressBox* pbox, std::span<NspEntry> entries) {
auto usb = std::make_unique<UsbTest>(pbox, entries);
constexpr u64 timeout = 1e+9;
// todo: display progress bar during usb transfer.
while (!pbox->ShouldExit()) {
if (R_SUCCEEDED(usb->IsUsbConnected(timeout))) {
pbox->NewTransfer("USB connected, sending file list");
if (R_SUCCEEDED(usb->WaitForConnection(timeout, file_list))) {
const u8 flags = usb::tinfoil::USBFlag_STREAM;
if (R_SUCCEEDED(usb->WaitForConnection(timeout, flags, file_list))) {
pbox->NewTransfer("Sent file list, waiting for command...");
while (!pbox->ShouldExit()) {
const auto rc = usb->PollCommands();
if (rc == usb->Result_Exit) {
log_write("got exit command\n");
R_SUCCEED();
}
R_TRY(rc);
Result rc;
if (flags & usb::tinfoil::USBFlag_STREAM) {
rc = DumpNspToUsbS2SStream(pbox, usb.get(), file_list, entries);
} else {
rc = DumpNspToUsbS2SRandom(pbox, usb.get(), file_list, entries);
}
}
// wait for exit command.
if (R_SUCCEEDED(rc)) {
rc = usb->PollCommands();
}
if (rc == usb->Result_Exit) {
log_write("got exit command\n");
R_SUCCEED();
}
return rc;
}
} else {
pbox->NewTransfer("waiting for usb connection...");
}
}
R_SUCCEED();
R_THROW(0xFFFF);
}
Result DumpNspToDevNull(ProgressBox* pbox, std::span<NspEntry> entries) {
@@ -400,7 +477,7 @@ Result DumpNspToNetwork(ProgressBox* pbox, const location::Entry& loc, std::span
[&](void* data, s64 off, s64 size, u64* bytes_read) -> Result {
return source->Read(path, data, off, size, bytes_read);
},
[&](thread::PullFunctionCallback pull) -> Result {
[&](thread::PullCallback pull) -> Result {
s64 offset{};
const auto result = curl::Api().FromMemory(
CURL_LOCATION_TO_API(loc),

View File

@@ -5,7 +5,9 @@
#include "yati/yati.hpp"
#include "yati/nx/nca.hpp"
#include "yati/source/stream.hpp"
#include "usb/usb_uploader.hpp"
#include "usb/tinfoil.hpp"
#include "app.hpp"
#include "defines.hpp"
@@ -234,15 +236,30 @@ private:
}
};
struct UsbTest final : usb::upload::Usb {
struct UsbTest final : usb::upload::Usb, yati::source::Stream {
UsbTest(ProgressBox* pbox, XciEntry& entry) : Usb{UINT64_MAX}, m_entry{entry} {
m_pbox = pbox;
}
Result ReadChunk(void* buf, s64 size, u64* bytes_read) override {
R_TRY(m_pull(buf, size, bytes_read));
m_pull_offset += *bytes_read;
R_SUCCEED();
}
Result Read(const std::string& path, void* buf, s64 off, s64 size, u64* bytes_read) override {
if (m_pull) {
return Stream::Read(buf, off, size, bytes_read);
} else {
return ReadInternal(path, buf, off, size, bytes_read);
}
}
Result ReadInternal(const std::string& path, void* buf, s64 off, s64 size, u64* bytes_read) {
if (m_path != path) {
m_path = path;
m_progress = 0;
m_pull_offset = 0;
m_size = m_entry.GetSize(path);
m_pbox->SetTitle(m_entry.GetName(path));
m_pbox->NewTransfer(m_path);
@@ -257,13 +274,23 @@ struct UsbTest final : usb::upload::Usb {
R_SUCCEED();
}
void SetPullCallback(thread::PullCallback pull) {
m_pull = pull;
}
auto GetPullOffset() const {
return m_pull_offset;
}
private:
XciEntry& m_entry;
ProgressBox* m_pbox{};
std::string m_path{};
thread::PullCallback m_pull{};
s64 m_offset{};
s64 m_size{};
s64 m_progress{};
s64 m_pull_offset{};
};
struct HashStr {
@@ -321,6 +348,42 @@ Result DumpNspToFile(ProgressBox* pbox, std::span<const fs::FsPath> paths, XciEn
R_SUCCEED();
}
Result DumpNspToUsbS2SStream(ProgressBox* pbox, UsbTest* usb, std::span<const std::string> file_list, XciEntry& e) {
for (auto& path : file_list) {
const auto file_size = e.GetSize(path);
R_TRY(thread::TransferPull(pbox, file_size,
[&](void* data, s64 off, s64 size, u64* bytes_read) -> Result {
return usb->ReadInternal(path, data, off, size, bytes_read);
},
[&](thread::StartThreadCallback start, thread::PullCallback pull) -> Result {
usb->SetPullCallback(pull);
R_TRY(start());
while (!pbox->ShouldExit()) {
R_TRY(usb->PollCommands());
if (usb->GetPullOffset() >= file_size) {
R_SUCCEED();
}
}
R_THROW(0xFFFF);
}
));
}
R_SUCCEED();
}
Result DumpNspToUsbS2SRandom(ProgressBox* pbox, UsbTest* usb, std::span<const std::string> file_list, XciEntry& e) {
while (!pbox->ShouldExit()) {
R_TRY(usb->PollCommands());
}
R_THROW(0xFFFF);
}
Result DumpNspToUsbS2S(ProgressBox* pbox, std::span<const fs::FsPath> paths, XciEntry& e) {
std::vector<std::string> file_list;
for (auto& path : paths) {
@@ -330,30 +393,38 @@ Result DumpNspToUsbS2S(ProgressBox* pbox, std::span<const fs::FsPath> paths, Xci
auto usb = std::make_unique<UsbTest>(pbox, e);
constexpr u64 timeout = 1e+9;
// todo: display progress bar during usb transfer.
while (!pbox->ShouldExit()) {
if (R_SUCCEEDED(usb->IsUsbConnected(timeout))) {
pbox->NewTransfer("USB connected, sending file list");
if (R_SUCCEEDED(usb->WaitForConnection(timeout, file_list))) {
const u8 flags = usb::tinfoil::USBFlag_STREAM;
if (R_SUCCEEDED(usb->WaitForConnection(timeout, flags, file_list))) {
pbox->NewTransfer("Sent file list, waiting for command...");
while (!pbox->ShouldExit()) {
const auto rc = usb->PollCommands();
if (rc == usb->Result_Exit) {
log_write("got exit command\n");
R_SUCCEED();
}
R_TRY(rc);
Result rc;
if (flags & usb::tinfoil::USBFlag_STREAM) {
rc = DumpNspToUsbS2SStream(pbox, usb.get(), file_list, e);
} else {
rc = DumpNspToUsbS2SRandom(pbox, usb.get(), file_list, e);
}
}
// wait for exit command.
if (R_SUCCEEDED(rc)) {
rc = usb->PollCommands();
}
if (rc == usb->Result_Exit) {
log_write("got exit command\n");
R_SUCCEED();
}
return rc;
}
} else {
pbox->NewTransfer("waiting for usb connection...");
}
}
R_SUCCEED();
R_THROW(0xFFFF);
}
Result DumpNspToDevNull(ProgressBox* pbox, std::span<const fs::FsPath> paths, XciEntry& e) {
@@ -389,7 +460,7 @@ Result DumpNspToNetwork(ProgressBox* pbox, const location::Entry& loc, std::span
[&](void* data, s64 off, s64 size, u64* bytes_read) -> Result {
return e.Read(path, data, off, size, bytes_read);
},
[&](thread::PullFunctionCallback pull) -> Result {
[&](thread::PullCallback pull) -> Result {
s64 offset{};
const auto result = curl::Api().FromMemory(
CURL_LOCATION_TO_API(loc),

View File

@@ -45,7 +45,7 @@ Usb::Usb(u64 transfer_timeout) {
Usb::~Usb() {
}
Result Usb::WaitForConnection(u64 timeout, std::span<const std::string> names) {
Result Usb::WaitForConnection(u64 timeout, u8 flags, std::span<const std::string> names) {
R_TRY(m_usb->IsUsbConnected(timeout));
std::string names_list;
@@ -56,6 +56,7 @@ Result Usb::WaitForConnection(u64 timeout, std::span<const std::string> names) {
tinfoil::TUSHeader header{};
header.magic = tinfoil::Magic_List0;
header.nspListSize = names_list.length();
header.flags = flags;
R_TRY(m_usb->TransferAll(false, &header, sizeof(header), timeout));
R_TRY(m_usb->TransferAll(false, names_list.data(), names_list.length(), timeout));

View File

@@ -1,20 +1,3 @@
/*
* Copyright (c) Atmosphère-NX
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU General Public License,
* version 2, as published by the Free Software Foundation.
*
* This program is distributed in the hope it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// The USB transfer code was taken from Haze (part of Atmosphere).
// The USB protocol was taken from Tinfoil, by Adubbz.
#include "yati/source/usb.hpp"
@@ -42,7 +25,8 @@ Result Usb::WaitForConnection(u64 timeout, std::vector<std::string>& out_names)
R_TRY(m_usb->TransferAll(true, &header, sizeof(header), timeout));
R_UNLESS(header.magic == tinfoil::Magic_List0, Result_BadMagic);
R_UNLESS(header.nspListSize > 0, Result_BadCount);
log_write("USB got header\n");
m_flags = header.flags;
log_write("[USB] got header, flags: 0x%X\n", m_flags);
std::vector<char> names(header.nspListSize);
R_TRY(m_usb->TransferAll(true, names.data(), names.size(), timeout));
@@ -99,6 +83,10 @@ Result Usb::Finished(u64 timeout) {
return SendCmdHeader(tinfoil::USBCmdId::EXIT, 0, timeout);
}
bool Usb::IsStream() const {
return (m_flags & tinfoil::USBFlag_STREAM);
}
Result Usb::Read(void* buf, s64 off, s64 size, u64* bytes_read) {
R_TRY(GetOpenResult());
R_TRY(SendFileRangeCmd(off, size, m_usb->GetTransferTimeout()));