From b6b1af595983994dc6bcda75523295ed0bfbc6ca Mon Sep 17 00:00:00 2001 From: ITotalJustice <47043333+ITotalJustice@users.noreply.github.com> Date: Sun, 31 Aug 2025 04:27:53 +0100 Subject: [PATCH] Revert "Clean up python (#212)" This reverts commit 0a8bc018705795eb5fb84924d58899cbf42f5caf. --- tools/usb_common.py | 134 +++++++++++++------------------------------ tools/usb_export.py | 76 ++++++++++++------------ tools/usb_install.py | 54 ++++++++--------- 3 files changed, 102 insertions(+), 162 deletions(-) diff --git a/tools/usb_common.py b/tools/usb_common.py index 492a51e..2fa306c 100644 --- a/tools/usb_common.py +++ b/tools/usb_common.py @@ -1,96 +1,50 @@ import struct -from time import sleep -from usb.core +import usb.core import usb.util +import time -SPLASH = """ - :@@@@@@@@@@@@@@@@@@@@@@@@@@: - #@ @# - #@ @# - #@ @# - #@ @@@@@@ @@@@@@ @# - #@ @@@@@@ @@@@@@ @# - #@ @@@@@@ @@@@@@ @# - #@ @# - #@ @# - #@ @# - @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ - @ @% @@@@@@@@@@@@ - #@ *@@@@* @% @@@@@@@@@@@@@ - #@ @@@@@@@@ @% @@@@@@@@@@@@@ - #@ @@@@@@@@ @% @@@@@@@@@@@@@ - #@ *@@@@* @% @@@@@@@@@@@@@ - #@ @% @@@@@@@@@@@@@ - #@ @% @@@@@@@@@@@@@ - #@ @% @@@@@@@@@@@@@ - #@ @% @@@@@@@@@@@@@ - #@ @% @@@@@@@@@@@@@ - #@ @% @@@@@=--=@@@@ - #@ @% @@@- -@@ - #@ @% @@@ @@ - #@ @% @@@- -@@ - =@@ @% @@@@@=--=@@@@ - =@@ @% @@@@@@@@@@@@ - @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ -""" +# magic number (SPH0) for the script and switch. +MAGIC = 0x53504830 -class USB_ENUM: - # (SPH0) for the script and Switch - MAGIC = 0x53504830 +# commands +CMD_QUIT = 0 +CMD_OPEN = 1 +CMD_EXPORT = 1 - # Commands - CMD_QUIT = 0 - CMD_OPEN = 1 - CMD_EXPORT = 1 +# results +RESULT_OK = 0 +RESULT_ERROR = 1 - # Result Codes - RESULT_OK = 0 - RESULT_ERROR = 1 +# flags +FLAG_NONE = 0 +FLAG_STREAM = 1 << 0 - # Flags - FLAG_NONE = 0 - FLAG_STREAM = 1 << 0 - - # Switch Vendor / Product ID - VENDOR_ID = 0x057E - PRODUCT_ID = 0x3000 - - ENABLE_ZLT = 0 - - -def find_switch() -> object | None: - return usb.core.find( - idVendor=USB_ENUM.VENDOR_ID, - idProduct=USB_ENUM.PRODUCT_ID - ) +# disabled, see usbds.cpp usbDsEndpoint_SetZlt +ENABLE_ZLT = 0 class Usb: def __init__(self): - self._out_ep = None - self._in_ep = None - self._packet_size = 0 - self._packet_index = 0 + self.__out_ep = None + self.__in_ep = None + self.__packet_size = 0 def wait_for_connect(self) -> None: - print(SPLASH) - print("Waiting for Switch...", end="") + print("waiting for switch") dev = None - while dev is None: - if (dev := find_switch()): - break - print(".", end="") - sleep(0.5) + while (dev is None): + dev = usb.core.find(idVendor=0x057E, idProduct=0x3000) + if (dev is None): + time.sleep(0.5) - print("Found the Switch!\n") + print("found the switch!\n") cfg = None - print("Getting configuration...") try: cfg = dev.get_active_configuration() - print("Found active config") + print("found active config") except usb.core.USBError: - print("No currently active config") + print("no currently active config") cfg = None if cfg is None: @@ -100,36 +54,26 @@ class Usb: is_out_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT is_in_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN - self._out_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_out_ep) - self._in_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_in_ep) + self.__out_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_out_ep) + self.__in_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_in_ep) + assert self.__out_ep is not None + assert self.__in_ep is not None - if not self._out_ep: - raise ValueError("Failed to get USB OUT address") - if not self._in_ep: - raise ValueError("Failed to get USB IN address") - - print(f"iManufacturer: {dev.manufacturer} \ - iProduct: {dev.product} \ - iSerialNumber: {dev.serial_number}") - - print(f"bcdUSB: {hex(dev.bcdUSB)} \ - bMaxPacketSize0: {dev.bMaxPacketSize0}") - self._packet_size = 1 << dev.bMaxPacketSize0 + print("iManufacturer: {} iProduct: {} iSerialNumber: {}".format(dev.manufacturer, dev.product, dev.serial_number)) + print("bcdUSB: {} bMaxPacketSize0: {}".format(hex(dev.bcdUSB), dev.bMaxPacketSize0)) + self.__packet_size = 1 << dev.bMaxPacketSize0 def read(self, size: int, timeout: int = 0) -> bytes: - if (ENABLE_ZLT and size and not (size % self._packet_size)): + if (ENABLE_ZLT and size and (size % self.__packet_size) == 0): size += 1 - return self._in_ep.read(size, timeout) + return self.__in_ep.read(size, timeout) def write(self, buf: bytes, timeout: int = 0) -> int: - packet = self._packet_index - self._packet_index += 1 - # Todo, implement packet index long - return self._out_ep.write(data=buf, timeout=timeout) + return self.__out_ep.write(data=buf, timeout=timeout) def get_send_header(self) -> tuple[int, int, int]: header = self.read(16) - magic, arg2, arg3, arg4 = struct.unpack(' None: send_data = struct.pack(' str: return bytes(usb.read(name_length)).decode('utf-8') -def create_file_folder(root: os.PathLike, target: os.PathLike) -> bool, os.PathLike: - """ - Creates a recursive folder structure at a given location - Returns a boolean indicating if it already exists and the absolute path - """ - path = os.abspath(os.path.join(root, target)) - parent = os.path.dirname(path) - if not (os.path.exists(parent)): - os.makedirs(path) - print(f"Created folder {path}") - else: - print(f"Parent folder already exists {path}") - exists_already = os.path.exists(path) - return exists_already, path +def create_file_folder(root: Path, file_path: Path) -> Path: + # todo: check if it already exists. + full_path = Path(root + "/" + file_path) + full_path.parent.mkdir(exist_ok=True, parents=True) + print("created folder") -def wait_for_input(usb: Usb, path: os.PathLike) -> None: + return full_path + +def wait_for_input(usb: Usb, path: Path) -> None: print("now waiting for intput\n") - with open(path, "wb") as f: - print(f"Opened file {path}") + with open(path, "wb") as file: + print("opened file {}".format(path)) while True: [off, size, crc32c_want] = usb.get_send_data_header() # todo: this isn't needed really. - usb.send_result(UE.RESULT_OK) + usb.send_result(RESULT_OK) # check if we should finish now. - if off == 0 and size == 0: + if (off == 0 and size == 0): break # read the buffer and calculate the crc32c. @@ -42,31 +36,33 @@ def wait_for_input(usb: Usb, path: os.PathLike) -> None: crc32c_got = crc32c.crc32c(buf) # validate the crc32c matches. - if crc32c_want != crc32c_got: - usb.send_result(UE.RESULT_ERROR) + if (crc32c_want != crc32c_got): + usb.send_result(RESULT_ERROR) continue try: - f.seek(off) - f.write(buf) - usb.send_result(UE.RESULT_OK) + file.seek(off) + file.write(buf) + usb.send_result(RESULT_OK) except BlockingIOError as e: print("Error: failed to write: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e))) - usb.send_result(UE.RESULT_ERROR) + usb.send_result(RESULT_ERROR) if __name__ == '__main__': - print(SPLASH) + print("hello world") - if not len(args) == 2: - print("Pass root path as argument.") + # check which mode the user has selected. + args = len(sys.argv) + if (args != 2): + print("pass the folder path") sys.exit(1) root_path = sys.argv[1] if (not os.path.isdir(root_path)): - raise ValueError('') + raise ValueError('must be a dir!') - usb = Usb() + usb: Usb = Usb() try: # get usb endpoints. @@ -76,21 +72,21 @@ if __name__ == '__main__': while True: [cmd, arg3, arg4] = usb.get_send_header() - if (cmd == UE.CMD_QUIT): - usb.send_result(UE.RESULT_OK) + if (cmd == CMD_QUIT): + usb.send_result(RESULT_OK) break - elif (cmd == UE.CMD_EXPORT): - usb.send_result(UE.RESULT_OK) + elif (cmd == CMD_EXPORT): + usb.send_result(RESULT_OK) # todo: handle and return errors here. file_name = get_file_name(usb, arg3) - exists_already, full_path = create_file_folder(root_path, file_name) - usb.send_result(UE.RESULT_OK) + full_path = create_file_folder(root_path, file_name) + usb.send_result(RESULT_OK) wait_for_input(usb, full_path) else: - usb.send_result(UE.RESULT_ERROR) + usb.send_result(RESULT_ERROR) break - except Exception as e: - print(f"An exception occurred - {e} ") + except Exception as inst: + print("An exception occurred " + str(inst)) diff --git a/tools/usb_install.py b/tools/usb_install.py index 9164f5b..e95a15e 100644 --- a/tools/usb_install.py +++ b/tools/usb_install.py @@ -4,13 +4,13 @@ from io import BufferedReader import sys import os from pathlib import Path -from usb_common import Usb, USB_ENUM as UE +from usb_common import * try: import rarfile - has_rar_support = True + has_rar_support: bool = True except: - has_rar_support = False + has_rar_support: bool = False # list of installable exts that sphaira supports. INSTALLABLE_EXTS = (".nsp", ".xci", ".nsz", ".xcz") @@ -26,21 +26,21 @@ def send_file_info_result(usb: Usb, result: int, file_size: int, flags: int): usb.send_result(result, size_msb, size_lsb) def file_transfer_loop(usb: Usb, file: BufferedReader, flags: int) -> None: - print("> Transfer Loop") + print("inside file transfer loop now") while True: # get offset + size. [off, size, _] = usb.get_send_data_header() # check if we should finish now. - if off == 0 and size == 0: - usb.send_result(UE.RESULT_OK) + if (off == 0 and size == 0): + usb.send_result(RESULT_OK) break # if we cannot seek, ensure that sphaira doesn't try to seek backwards. - if (flags & UE.FLAG_STREAM) and off < file.tell(): - print(">> Error: Tried to seek on file without random access.") - usb.send_result(UE.RESULT_ERROR) + if (flags & FLAG_STREAM) and off < file.tell(): + print("Error: tried to seek on file without random access.") + usb.send_result(RESULT_ERROR) continue # read file and calculate the hash. @@ -48,12 +48,12 @@ def file_transfer_loop(usb: Usb, file: BufferedReader, flags: int) -> None: file.seek(off) buf = file.read(size) except BlockingIOError as e: - print(f">> Error: Failed to read: {e.filename} at: {off} size: {size} error: {e}") - usb.send_result(UE.RESULT_ERROR) + print("Error: failed to read: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e))) + usb.send_result(RESULT_ERROR) continue # respond back with the length of the data and the crc32c. - usb.send_result(UE.RESULT_OK, len(buf), crc32c.crc32c(buf)) + usb.send_result(RESULT_OK, len(buf), crc32c.crc32c(buf)) # send the data. usb.write(buf) @@ -63,8 +63,8 @@ def wait_for_input(usb: Usb, file_index: int) -> None: # open file / rar. (todo: learn how to make a class with inheritance) try: - path, internal_path = paths[file_index] - flags: int = UE.FLAG_NONE + [path, internal_path] = paths[file_index] + flags: int = FLAG_NONE if path.endswith(".rar"): with rarfile.RarFile(path, part_only=True) as rf: @@ -72,22 +72,22 @@ def wait_for_input(usb: Usb, file_index: int) -> None: with rf.open(internal_path) as file: # if the file is compressed, disable seek. if info.compress_type != rarfile.RAR_M0: - flags |= UE.FLAG_STREAM + flags |= FLAG_STREAM print("opened file: {} flags: {}".format(internal_path, flags)) - send_file_info_result(usb, UE.RESULT_OK, info.file_size, flags) + send_file_info_result(usb, RESULT_OK, info.file_size, flags) file_transfer_loop(usb, file, flags) else: with open(path, "rb") as file: print("opened file {}".format(path)) file.seek(0, os.SEEK_END) file_size = file.tell() - send_file_info_result(usb, UE.RESULT_OK, file_size, flags) + send_file_info_result(usb, RESULT_OK, file_size, flags) file_transfer_loop(usb, file, flags) except OSError as e: print("Error: failed to open: {} error: {}".format(e.filename, str(e))) - usb.send_result(UE.RESULT_ERROR) + usb.send_result(RESULT_ERROR) def add_file_to_install_list(path: str) -> None: # if the type if a rar, check if it contains a support ext internally. @@ -108,7 +108,7 @@ def add_file_to_install_list(path: str) -> None: paths.append([path, path]) if __name__ == '__main__': - print(SPLASH) + print("hello world") # check which mode the user has selected. args = len(sys.argv) @@ -125,7 +125,7 @@ if __name__ == '__main__': if os.path.isfile(f): add_file_to_install_list(f) else: - raise ValueError('Must be a file!') + raise ValueError('must be a file!') usb: Usb = Usb() @@ -135,27 +135,27 @@ if __name__ == '__main__': # build string table. string_table: bytes - for _, path in paths: + for [_, path] in paths: string_table += bytes(Path(path).name.__str__(), 'utf8') + b'\n' # this reads the send header and checks the magic. usb.get_send_header() # send recv and string table. - usb.send_result(UE.RESULT_OK, len(string_table)) + usb.send_result(RESULT_OK, len(string_table)) usb.write(string_table) # wait for command. while True: - cmd, arg3, arg4 = usb.get_send_header() + [cmd, arg3, arg4] = usb.get_send_header() - if cmd == UE.CMD_QUIT: - usb.send_result(UE.RESULT_OK) + if cmd == CMD_QUIT: + usb.send_result(RESULT_OK) break - elif cmd == UE.CMD_OPEN: + elif cmd == CMD_OPEN: wait_for_input(usb, arg3) else: - usb.send_result(UE.RESULT_ERROR) + usb.send_result(RESULT_ERROR) break except Exception as inst: