huge changes to everything (see below).
Changelog: - re-enable use in release build. - remove ftpsrv and untitled from builtin ghdl options, as both packages are available in the appstore. - add image viewer (png, jpg, bmp) - add music player (bfstm, bfwav, mp3, wav, ogg) - add idv3 tag parsing support for mp3. - add "decyption" of GTA Vice City mp3. - add usbdvd support for music playback and file browsing. - add nsz export support (solid, block, ldm). - add xcz export support (same as above). - add nro fs proper mount support (romfs, nacp, icon). - add program nca fs support. - add bfsar fs support. - re-write the usb protocol, still wip. replaces tinfoil protocol. - all threads are now create with pre-emptive support with the proper affinity mask set. - fix oob crash in libpulsar when a bfwav was opened that had more than 2 channels. - bump yyjson version. - bump usbhsfs version. - disable nvjpg. - add support for theme music of any supported playback type (bfstm, bfwav, mp3, wav, ogg). - add support for setting background music. - add async exit to blocking threads (download, nxlink, ftpsrv) to reduce exit time. - add support for dumping to pc via usb. - add null, deflate, zstd hash options, mainly used for benchmarking. - add sidebar slider (currently unused). - file_viwer can now be used with any filesystem. - filebrowser will only ever stat file once. previously it would keep stat'ing until it succeeded. - disabled themezer due to the api breaking and i am not willing to keep maintaining it. - disable zlt handling in usbds as it's not needed for my api's because the size is always known. - remove usbds enums and GetSpeed() as i pr'd it to libnx. - added support for mounting nca's from any source, including files, memory, nsps, xcis etc. - split the lru cache into it's own header as it's now used in multiple places (nsz, all mounted options). - add support for fetching and decrypting es personalised tickets. - fix es common ticket converting where i forgot to also convert the cert chain as well. - remove the download default music option. - improve performance of libpulsar when opening a bfsar by remove the large setvbuf option. instead, use the default 1k buffer and handle large buffers manually in sphaira by using a lru cache (todo: just write my own bfsar parser). - during app init and exit, load times have been halved as i now load/exit async. timestamps have also been added to measure how long everything takes. - download now async loads / exits the etag json file to improve init times. - add custom zip io to dumper to support writing a zip to any dest (such as usb). - dumper now returns a proper error if the transfer was cancelled by the user. - fatfs mount now sets the timestamp for files. - fatfs mount handles folders with the archive bit by reporting them as a file. - ftpsrv config is async loaded to speed up load times. - nxlink now tries attempt to connect/accept by handling blocking rather than just bailing out. - added support for minini floats. - thread_file_transfer now spawns 3 threads rather than 2, to have the middle thread be a optional processor (mainly used for compressing/decompressing). - added spinner to progress box, taken from nvg demo. - progress box disables sleep mode on init. - add gamecard detection to game menu to detect a refresh. - handle xci that have the key area prepended. - change gamecard mount fs to use the xci mount code instead of native fs, that way we can see all the partitions rather than just secure. - reformat the ghdl entries to show the timestamp first. - support for exporting saves to pc via usb. - zip fs now uses lru cache.
This commit is contained in:
6
tools/requirements.txt
Normal file
6
tools/requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
# any version is fine.
|
||||
pyusb
|
||||
# 4.0 needed for part_only to handle split archives.
|
||||
rarfile >= 4.0
|
||||
# used to verify packets are valid.
|
||||
crc32c
|
||||
89
tools/usb_common.py
Normal file
89
tools/usb_common.py
Normal file
@@ -0,0 +1,89 @@
|
||||
import struct
|
||||
import usb.core
|
||||
import usb.util
|
||||
import time
|
||||
|
||||
# magic number (SPH0) for the script and switch.
|
||||
MAGIC = 0x53504830
|
||||
|
||||
# commands
|
||||
CMD_QUIT = 0
|
||||
CMD_OPEN = 1
|
||||
CMD_EXPORT = 1
|
||||
|
||||
# results
|
||||
RESULT_OK = 0
|
||||
RESULT_ERROR = 1
|
||||
|
||||
# flags
|
||||
FLAG_NONE = 0
|
||||
FLAG_STREAM = 1 << 0
|
||||
|
||||
# disabled, see usbds.cpp usbDsEndpoint_SetZlt
|
||||
ENABLE_ZLT = 0
|
||||
|
||||
class Usb:
|
||||
def __init__(self):
|
||||
self.__out_ep = None
|
||||
self.__in_ep = None
|
||||
self.__packet_size = 0
|
||||
|
||||
def wait_for_connect(self) -> None:
|
||||
print("waiting for switch")
|
||||
|
||||
dev = None
|
||||
while (dev is None):
|
||||
dev = usb.core.find(idVendor=0x057E, idProduct=0x3000)
|
||||
if (dev is None):
|
||||
time.sleep(0.5)
|
||||
|
||||
print("found the switch!\n")
|
||||
cfg = None
|
||||
|
||||
try:
|
||||
cfg = dev.get_active_configuration()
|
||||
print("found active config")
|
||||
except usb.core.USBError:
|
||||
print("no currently active config")
|
||||
cfg = None
|
||||
|
||||
if cfg is None:
|
||||
dev.reset()
|
||||
dev.set_configuration()
|
||||
cfg = dev.get_active_configuration()
|
||||
|
||||
is_out_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT
|
||||
is_in_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN
|
||||
self.__out_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_out_ep)
|
||||
self.__in_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_in_ep)
|
||||
assert self.__out_ep is not None
|
||||
assert self.__in_ep is not None
|
||||
|
||||
print("iManufacturer: {} iProduct: {} iSerialNumber: {}".format(dev.manufacturer, dev.product, dev.serial_number))
|
||||
print("bcdUSB: {} bMaxPacketSize0: {}".format(hex(dev.bcdUSB), dev.bMaxPacketSize0))
|
||||
self.__packet_size = 1 << dev.bMaxPacketSize0
|
||||
|
||||
def read(self, size: int, timeout: int = 0) -> bytes:
|
||||
if (ENABLE_ZLT and size and (size % self.__packet_size) == 0):
|
||||
size += 1
|
||||
return self.__in_ep.read(size, timeout)
|
||||
|
||||
def write(self, buf: bytes, timeout: int = 0) -> int:
|
||||
return self.__out_ep.write(data=buf, timeout=timeout)
|
||||
|
||||
def get_send_header(self) -> tuple[int, int, int]:
|
||||
header = self.read(16)
|
||||
[magic, arg2, arg3, arg4] = struct.unpack('<IIII', header)
|
||||
|
||||
if magic != MAGIC:
|
||||
raise Exception("Unexpected magic {}".format(magic))
|
||||
|
||||
return arg2, arg3, arg4
|
||||
|
||||
def get_send_data_header(self) -> tuple[int, int, int]:
|
||||
header = self.read(16)
|
||||
return struct.unpack('<QII', header)
|
||||
|
||||
def send_result(self, result: int, arg3: int = 0, arg4: int = 0) -> None:
|
||||
send_data = struct.pack('<IIII', MAGIC, result, arg3, arg4)
|
||||
self.write(send_data)
|
||||
92
tools/usb_export.py
Normal file
92
tools/usb_export.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import crc32c
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
from usb_common import *
|
||||
|
||||
def get_file_name(usb: Usb, name_length: int) -> str:
|
||||
return bytes(usb.read(name_length)).decode('utf-8')
|
||||
|
||||
def create_file_folder(root: Path, file_path: Path) -> Path:
|
||||
# todo: check if it already exists.
|
||||
full_path = Path(root + "/" + file_path)
|
||||
full_path.parent.mkdir(exist_ok=True, parents=True)
|
||||
print("created folder")
|
||||
|
||||
return full_path
|
||||
|
||||
def wait_for_input(usb: Usb, path: Path) -> None:
|
||||
print("now waiting for intput\n")
|
||||
|
||||
with open(path, "wb") as file:
|
||||
print("opened file {}".format(path))
|
||||
|
||||
while True:
|
||||
[off, size, crc32c_want] = usb.get_send_data_header()
|
||||
|
||||
# todo: this isn't needed really.
|
||||
usb.send_result(RESULT_OK)
|
||||
|
||||
# check if we should finish now.
|
||||
if (off == 0 and size == 0):
|
||||
break
|
||||
|
||||
# read the buffer and calculate the crc32c.
|
||||
buf = usb.read(size)
|
||||
crc32c_got = crc32c.crc32c(buf)
|
||||
|
||||
# validate the crc32c matches.
|
||||
if (crc32c_want != crc32c_got):
|
||||
usb.send_result(RESULT_ERROR)
|
||||
continue
|
||||
|
||||
try:
|
||||
file.seek(off)
|
||||
file.write(buf)
|
||||
usb.send_result(RESULT_OK)
|
||||
except BlockingIOError as e:
|
||||
print("Error: failed to write: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e)))
|
||||
usb.send_result(RESULT_ERROR)
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("hello world")
|
||||
|
||||
# check which mode the user has selected.
|
||||
args = len(sys.argv)
|
||||
if (args != 2):
|
||||
print("pass the folder path")
|
||||
sys.exit(1)
|
||||
|
||||
root_path = sys.argv[1]
|
||||
|
||||
if (not os.path.isdir(root_path)):
|
||||
raise ValueError('must be a dir!')
|
||||
|
||||
usb: Usb = Usb()
|
||||
|
||||
try:
|
||||
# get usb endpoints.
|
||||
usb.wait_for_connect()
|
||||
|
||||
# wait for command.
|
||||
while True:
|
||||
[cmd, arg3, arg4] = usb.get_send_header()
|
||||
|
||||
if (cmd == CMD_QUIT):
|
||||
usb.send_result(RESULT_OK)
|
||||
break
|
||||
elif (cmd == CMD_EXPORT):
|
||||
usb.send_result(RESULT_OK)
|
||||
|
||||
# todo: handle and return errors here.
|
||||
file_name = get_file_name(usb, arg3)
|
||||
full_path = create_file_folder(root_path, file_name)
|
||||
usb.send_result(RESULT_OK)
|
||||
|
||||
wait_for_input(usb, full_path)
|
||||
else:
|
||||
usb.send_result(RESULT_ERROR)
|
||||
break
|
||||
|
||||
except Exception as inst:
|
||||
print("An exception occurred " + str(inst))
|
||||
162
tools/usb_install.py
Normal file
162
tools/usb_install.py
Normal file
@@ -0,0 +1,162 @@
|
||||
import crc32c
|
||||
import glob
|
||||
from io import BufferedReader
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
from usb_common import *
|
||||
|
||||
try:
|
||||
import rarfile
|
||||
has_rar_support: bool = True
|
||||
except:
|
||||
has_rar_support: bool = False
|
||||
|
||||
# list of installable exts that sphaira supports.
|
||||
INSTALLABLE_EXTS = (".nsp", ".xci", ".nsz", ".xcz")
|
||||
# list of supported extensions passed via args.
|
||||
ACCEPTED_EXTS = INSTALLABLE_EXTS + tuple(".rar")
|
||||
|
||||
# real path, internal path (same if not .rar)
|
||||
paths: list[tuple[str, str]] = []
|
||||
|
||||
def send_file_info_result(usb: Usb, result: int, file_size: int, flags: int):
|
||||
size_lsb = file_size & 0xFFFFFFFF
|
||||
size_msb = ((file_size >> 32) & 0xFFFF) | (flags << 16)
|
||||
usb.send_result(result, size_msb, size_lsb)
|
||||
|
||||
def file_transfer_loop(usb: Usb, file: BufferedReader, flags: int) -> None:
|
||||
print("inside file transfer loop now")
|
||||
|
||||
while True:
|
||||
# get offset + size.
|
||||
[off, size, _] = usb.get_send_data_header()
|
||||
|
||||
# check if we should finish now.
|
||||
if (off == 0 and size == 0):
|
||||
usb.send_result(RESULT_OK)
|
||||
break
|
||||
|
||||
# if we cannot seek, ensure that sphaira doesn't try to seek backwards.
|
||||
if (flags & FLAG_STREAM) and off < file.tell():
|
||||
print("Error: tried to seek on file without random access.")
|
||||
usb.send_result(RESULT_ERROR)
|
||||
continue
|
||||
|
||||
# read file and calculate the hash.
|
||||
try:
|
||||
file.seek(off)
|
||||
buf = file.read(size)
|
||||
except BlockingIOError as e:
|
||||
print("Error: failed to read: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e)))
|
||||
usb.send_result(RESULT_ERROR)
|
||||
continue
|
||||
|
||||
# respond back with the length of the data and the crc32c.
|
||||
usb.send_result(RESULT_OK, len(buf), crc32c.crc32c(buf))
|
||||
|
||||
# send the data.
|
||||
usb.write(buf)
|
||||
|
||||
def wait_for_input(usb: Usb, file_index: int) -> None:
|
||||
print("now waiting for intput\n")
|
||||
|
||||
# open file / rar. (todo: learn how to make a class with inheritance)
|
||||
try:
|
||||
[path, internal_path] = paths[file_index]
|
||||
flags: int = FLAG_NONE
|
||||
|
||||
if path.endswith(".rar"):
|
||||
with rarfile.RarFile(path, part_only=True) as rf:
|
||||
info = rf.getinfo(internal_path)
|
||||
with rf.open(internal_path) as file:
|
||||
# if the file is compressed, disable seek.
|
||||
if info.compress_type != rarfile.RAR_M0:
|
||||
flags |= FLAG_STREAM
|
||||
|
||||
print("opened file: {} flags: {}".format(internal_path, flags))
|
||||
send_file_info_result(usb, RESULT_OK, info.file_size, flags)
|
||||
file_transfer_loop(usb, file, flags)
|
||||
else:
|
||||
with open(path, "rb") as file:
|
||||
print("opened file {}".format(path))
|
||||
file.seek(0, os.SEEK_END)
|
||||
file_size = file.tell()
|
||||
send_file_info_result(usb, RESULT_OK, file_size, flags)
|
||||
file_transfer_loop(usb, file, flags)
|
||||
|
||||
except OSError as e:
|
||||
print("Error: failed to open: {} error: {}".format(e.filename, str(e)))
|
||||
usb.send_result(RESULT_ERROR)
|
||||
|
||||
def add_file_to_install_list(path: str) -> None:
|
||||
# if the type if a rar, check if it contains a support ext internally.
|
||||
if path.endswith(".rar"):
|
||||
if has_rar_support:
|
||||
with rarfile.RarFile(path, part_only=True) as rf:
|
||||
for f in rf.infolist():
|
||||
if f.filename.endswith(INSTALLABLE_EXTS):
|
||||
print("Adding file: {} type: RAR".format(f.filename))
|
||||
paths.append([path, f.filename])
|
||||
break
|
||||
else:
|
||||
print("Warning: rar support disabled as rarfile is not installed")
|
||||
print("To enable rar support, enable with `pip install rarfile` and install unrar")
|
||||
|
||||
elif path.endswith(INSTALLABLE_EXTS):
|
||||
print("Adding file: {} type: FILE".format(path))
|
||||
paths.append([path, path])
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("hello world")
|
||||
|
||||
# check which mode the user has selected.
|
||||
args = len(sys.argv)
|
||||
if (args != 2):
|
||||
print("either run python usb_total.py game.nsp OR drag and drop the game onto the python file (if python is in your path)")
|
||||
sys.exit(1)
|
||||
|
||||
# build a list of files to install.
|
||||
path = sys.argv[1]
|
||||
if os.path.isfile(path):
|
||||
add_file_to_install_list(path)
|
||||
elif os.path.isdir(path):
|
||||
for f in glob.glob(path + "/**/*.*", recursive=True):
|
||||
if os.path.isfile(f):
|
||||
add_file_to_install_list(f)
|
||||
else:
|
||||
raise ValueError('must be a file!')
|
||||
|
||||
usb: Usb = Usb()
|
||||
|
||||
try:
|
||||
# get usb endpoints.
|
||||
usb.wait_for_connect()
|
||||
|
||||
# build string table.
|
||||
string_table: bytes
|
||||
for [_, path] in paths:
|
||||
string_table += bytes(Path(path).name.__str__(), 'utf8') + b'\n'
|
||||
|
||||
# this reads the send header and checks the magic.
|
||||
usb.get_send_header()
|
||||
|
||||
# send recv and string table.
|
||||
usb.send_result(RESULT_OK, len(string_table))
|
||||
usb.write(string_table)
|
||||
|
||||
# wait for command.
|
||||
while True:
|
||||
[cmd, arg3, arg4] = usb.get_send_header()
|
||||
|
||||
if cmd == CMD_QUIT:
|
||||
usb.send_result(RESULT_OK)
|
||||
break
|
||||
elif cmd == CMD_OPEN:
|
||||
wait_for_input(usb, arg3)
|
||||
else:
|
||||
usb.send_result(RESULT_ERROR)
|
||||
break
|
||||
|
||||
except Exception as inst:
|
||||
print("An exception occurred " + str(inst))
|
||||
Reference in New Issue
Block a user