diff --git a/tools/usb_common.py b/tools/usb_common.py index 2fa306c..492a51e 100644 --- a/tools/usb_common.py +++ b/tools/usb_common.py @@ -1,50 +1,96 @@ import struct -import usb.core +from time import sleep +from usb.core import usb.util -import time -# magic number (SPH0) for the script and switch. -MAGIC = 0x53504830 +SPLASH = """ + :@@@@@@@@@@@@@@@@@@@@@@@@@@: + #@ @# + #@ @# + #@ @# + #@ @@@@@@ @@@@@@ @# + #@ @@@@@@ @@@@@@ @# + #@ @@@@@@ @@@@@@ @# + #@ @# + #@ @# + #@ @# + @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ + @ @% @@@@@@@@@@@@ + #@ *@@@@* @% @@@@@@@@@@@@@ + #@ @@@@@@@@ @% @@@@@@@@@@@@@ + #@ @@@@@@@@ @% @@@@@@@@@@@@@ + #@ *@@@@* @% @@@@@@@@@@@@@ + #@ @% @@@@@@@@@@@@@ + #@ @% @@@@@@@@@@@@@ + #@ @% @@@@@@@@@@@@@ + #@ @% @@@@@@@@@@@@@ + #@ @% @@@@@@@@@@@@@ + #@ @% @@@@@=--=@@@@ + #@ @% @@@- -@@ + #@ @% @@@ @@ + #@ @% @@@- -@@ + =@@ @% @@@@@=--=@@@@ + =@@ @% @@@@@@@@@@@@ + @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ +""" -# commands -CMD_QUIT = 0 -CMD_OPEN = 1 -CMD_EXPORT = 1 +class USB_ENUM: + # (SPH0) for the script and Switch + MAGIC = 0x53504830 -# results -RESULT_OK = 0 -RESULT_ERROR = 1 + # Commands + CMD_QUIT = 0 + CMD_OPEN = 1 + CMD_EXPORT = 1 -# flags -FLAG_NONE = 0 -FLAG_STREAM = 1 << 0 + # Result Codes + RESULT_OK = 0 + RESULT_ERROR = 1 -# disabled, see usbds.cpp usbDsEndpoint_SetZlt -ENABLE_ZLT = 0 + # Flags + FLAG_NONE = 0 + FLAG_STREAM = 1 << 0 + + # Switch Vendor / Product ID + VENDOR_ID = 0x057E + PRODUCT_ID = 0x3000 + + ENABLE_ZLT = 0 + + +def find_switch() -> object | None: + return usb.core.find( + idVendor=USB_ENUM.VENDOR_ID, + idProduct=USB_ENUM.PRODUCT_ID + ) class Usb: def __init__(self): - self.__out_ep = None - self.__in_ep = None - self.__packet_size = 0 + self._out_ep = None + self._in_ep = None + self._packet_size = 0 + self._packet_index = 0 def wait_for_connect(self) -> None: - print("waiting for switch") + print(SPLASH) + print("Waiting for Switch...", end="") dev = None - while (dev is None): - dev = usb.core.find(idVendor=0x057E, idProduct=0x3000) - if (dev is None): - time.sleep(0.5) + while dev is None: + if (dev := find_switch()): + break + print(".", end="") + sleep(0.5) - print("found the switch!\n") + print("Found the Switch!\n") cfg = None + print("Getting configuration...") try: cfg = dev.get_active_configuration() - print("found active config") + print("Found active config") except usb.core.USBError: - print("no currently active config") + print("No currently active config") cfg = None if cfg is None: @@ -54,26 +100,36 @@ class Usb: is_out_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT is_in_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN - self.__out_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_out_ep) - self.__in_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_in_ep) - assert self.__out_ep is not None - assert self.__in_ep is not None + self._out_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_out_ep) + self._in_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_in_ep) - print("iManufacturer: {} iProduct: {} iSerialNumber: {}".format(dev.manufacturer, dev.product, dev.serial_number)) - print("bcdUSB: {} bMaxPacketSize0: {}".format(hex(dev.bcdUSB), dev.bMaxPacketSize0)) - self.__packet_size = 1 << dev.bMaxPacketSize0 + if not self._out_ep: + raise ValueError("Failed to get USB OUT address") + if not self._in_ep: + raise ValueError("Failed to get USB IN address") + + print(f"iManufacturer: {dev.manufacturer} \ + iProduct: {dev.product} \ + iSerialNumber: {dev.serial_number}") + + print(f"bcdUSB: {hex(dev.bcdUSB)} \ + bMaxPacketSize0: {dev.bMaxPacketSize0}") + self._packet_size = 1 << dev.bMaxPacketSize0 def read(self, size: int, timeout: int = 0) -> bytes: - if (ENABLE_ZLT and size and (size % self.__packet_size) == 0): + if (ENABLE_ZLT and size and not (size % self._packet_size)): size += 1 - return self.__in_ep.read(size, timeout) + return self._in_ep.read(size, timeout) def write(self, buf: bytes, timeout: int = 0) -> int: - return self.__out_ep.write(data=buf, timeout=timeout) + packet = self._packet_index + self._packet_index += 1 + # Todo, implement packet index long + return self._out_ep.write(data=buf, timeout=timeout) def get_send_header(self) -> tuple[int, int, int]: header = self.read(16) - [magic, arg2, arg3, arg4] = struct.unpack(' None: send_data = struct.pack(' str: return bytes(usb.read(name_length)).decode('utf-8') -def create_file_folder(root: Path, file_path: Path) -> Path: - # todo: check if it already exists. - full_path = Path(root + "/" + file_path) - full_path.parent.mkdir(exist_ok=True, parents=True) - print("created folder") +def create_file_folder(root: os.PathLike, target: os.PathLike) -> bool, os.PathLike: + """ + Creates a recursive folder structure at a given location + Returns a boolean indicating if it already exists and the absolute path + """ + path = os.abspath(os.path.join(root, target)) + parent = os.path.dirname(path) + if not (os.path.exists(parent)): + os.makedirs(path) + print(f"Created folder {path}") + else: + print(f"Parent folder already exists {path}") + exists_already = os.path.exists(path) + return exists_already, path - return full_path - -def wait_for_input(usb: Usb, path: Path) -> None: +def wait_for_input(usb: Usb, path: os.PathLike) -> None: print("now waiting for intput\n") - with open(path, "wb") as file: - print("opened file {}".format(path)) + with open(path, "wb") as f: + print(f"Opened file {path}") while True: [off, size, crc32c_want] = usb.get_send_data_header() # todo: this isn't needed really. - usb.send_result(RESULT_OK) + usb.send_result(UE.RESULT_OK) # check if we should finish now. - if (off == 0 and size == 0): + if off == 0 and size == 0: break # read the buffer and calculate the crc32c. @@ -36,33 +42,31 @@ def wait_for_input(usb: Usb, path: Path) -> None: crc32c_got = crc32c.crc32c(buf) # validate the crc32c matches. - if (crc32c_want != crc32c_got): - usb.send_result(RESULT_ERROR) + if crc32c_want != crc32c_got: + usb.send_result(UE.RESULT_ERROR) continue try: - file.seek(off) - file.write(buf) - usb.send_result(RESULT_OK) + f.seek(off) + f.write(buf) + usb.send_result(UE.RESULT_OK) except BlockingIOError as e: print("Error: failed to write: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e))) - usb.send_result(RESULT_ERROR) + usb.send_result(UE.RESULT_ERROR) if __name__ == '__main__': - print("hello world") + print(SPLASH) - # check which mode the user has selected. - args = len(sys.argv) - if (args != 2): - print("pass the folder path") + if not len(args) == 2: + print("Pass root path as argument.") sys.exit(1) root_path = sys.argv[1] if (not os.path.isdir(root_path)): - raise ValueError('must be a dir!') + raise ValueError('') - usb: Usb = Usb() + usb = Usb() try: # get usb endpoints. @@ -72,21 +76,21 @@ if __name__ == '__main__': while True: [cmd, arg3, arg4] = usb.get_send_header() - if (cmd == CMD_QUIT): - usb.send_result(RESULT_OK) + if (cmd == UE.CMD_QUIT): + usb.send_result(UE.RESULT_OK) break - elif (cmd == CMD_EXPORT): - usb.send_result(RESULT_OK) + elif (cmd == UE.CMD_EXPORT): + usb.send_result(UE.RESULT_OK) # todo: handle and return errors here. file_name = get_file_name(usb, arg3) - full_path = create_file_folder(root_path, file_name) - usb.send_result(RESULT_OK) + exists_already, full_path = create_file_folder(root_path, file_name) + usb.send_result(UE.RESULT_OK) wait_for_input(usb, full_path) else: - usb.send_result(RESULT_ERROR) + usb.send_result(UE.RESULT_ERROR) break - except Exception as inst: - print("An exception occurred " + str(inst)) + except Exception as e: + print(f"An exception occurred - {e} ") diff --git a/tools/usb_install.py b/tools/usb_install.py index e95a15e..9164f5b 100644 --- a/tools/usb_install.py +++ b/tools/usb_install.py @@ -4,13 +4,13 @@ from io import BufferedReader import sys import os from pathlib import Path -from usb_common import * +from usb_common import Usb, USB_ENUM as UE try: import rarfile - has_rar_support: bool = True + has_rar_support = True except: - has_rar_support: bool = False + has_rar_support = False # list of installable exts that sphaira supports. INSTALLABLE_EXTS = (".nsp", ".xci", ".nsz", ".xcz") @@ -26,21 +26,21 @@ def send_file_info_result(usb: Usb, result: int, file_size: int, flags: int): usb.send_result(result, size_msb, size_lsb) def file_transfer_loop(usb: Usb, file: BufferedReader, flags: int) -> None: - print("inside file transfer loop now") + print("> Transfer Loop") while True: # get offset + size. [off, size, _] = usb.get_send_data_header() # check if we should finish now. - if (off == 0 and size == 0): - usb.send_result(RESULT_OK) + if off == 0 and size == 0: + usb.send_result(UE.RESULT_OK) break # if we cannot seek, ensure that sphaira doesn't try to seek backwards. - if (flags & FLAG_STREAM) and off < file.tell(): - print("Error: tried to seek on file without random access.") - usb.send_result(RESULT_ERROR) + if (flags & UE.FLAG_STREAM) and off < file.tell(): + print(">> Error: Tried to seek on file without random access.") + usb.send_result(UE.RESULT_ERROR) continue # read file and calculate the hash. @@ -48,12 +48,12 @@ def file_transfer_loop(usb: Usb, file: BufferedReader, flags: int) -> None: file.seek(off) buf = file.read(size) except BlockingIOError as e: - print("Error: failed to read: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e))) - usb.send_result(RESULT_ERROR) + print(f">> Error: Failed to read: {e.filename} at: {off} size: {size} error: {e}") + usb.send_result(UE.RESULT_ERROR) continue # respond back with the length of the data and the crc32c. - usb.send_result(RESULT_OK, len(buf), crc32c.crc32c(buf)) + usb.send_result(UE.RESULT_OK, len(buf), crc32c.crc32c(buf)) # send the data. usb.write(buf) @@ -63,8 +63,8 @@ def wait_for_input(usb: Usb, file_index: int) -> None: # open file / rar. (todo: learn how to make a class with inheritance) try: - [path, internal_path] = paths[file_index] - flags: int = FLAG_NONE + path, internal_path = paths[file_index] + flags: int = UE.FLAG_NONE if path.endswith(".rar"): with rarfile.RarFile(path, part_only=True) as rf: @@ -72,22 +72,22 @@ def wait_for_input(usb: Usb, file_index: int) -> None: with rf.open(internal_path) as file: # if the file is compressed, disable seek. if info.compress_type != rarfile.RAR_M0: - flags |= FLAG_STREAM + flags |= UE.FLAG_STREAM print("opened file: {} flags: {}".format(internal_path, flags)) - send_file_info_result(usb, RESULT_OK, info.file_size, flags) + send_file_info_result(usb, UE.RESULT_OK, info.file_size, flags) file_transfer_loop(usb, file, flags) else: with open(path, "rb") as file: print("opened file {}".format(path)) file.seek(0, os.SEEK_END) file_size = file.tell() - send_file_info_result(usb, RESULT_OK, file_size, flags) + send_file_info_result(usb, UE.RESULT_OK, file_size, flags) file_transfer_loop(usb, file, flags) except OSError as e: print("Error: failed to open: {} error: {}".format(e.filename, str(e))) - usb.send_result(RESULT_ERROR) + usb.send_result(UE.RESULT_ERROR) def add_file_to_install_list(path: str) -> None: # if the type if a rar, check if it contains a support ext internally. @@ -108,7 +108,7 @@ def add_file_to_install_list(path: str) -> None: paths.append([path, path]) if __name__ == '__main__': - print("hello world") + print(SPLASH) # check which mode the user has selected. args = len(sys.argv) @@ -125,7 +125,7 @@ if __name__ == '__main__': if os.path.isfile(f): add_file_to_install_list(f) else: - raise ValueError('must be a file!') + raise ValueError('Must be a file!') usb: Usb = Usb() @@ -135,27 +135,27 @@ if __name__ == '__main__': # build string table. string_table: bytes - for [_, path] in paths: + for _, path in paths: string_table += bytes(Path(path).name.__str__(), 'utf8') + b'\n' # this reads the send header and checks the magic. usb.get_send_header() # send recv and string table. - usb.send_result(RESULT_OK, len(string_table)) + usb.send_result(UE.RESULT_OK, len(string_table)) usb.write(string_table) # wait for command. while True: - [cmd, arg3, arg4] = usb.get_send_header() + cmd, arg3, arg4 = usb.get_send_header() - if cmd == CMD_QUIT: - usb.send_result(RESULT_OK) + if cmd == UE.CMD_QUIT: + usb.send_result(UE.RESULT_OK) break - elif cmd == CMD_OPEN: + elif cmd == UE.CMD_OPEN: wait_for_input(usb, arg3) else: - usb.send_result(RESULT_ERROR) + usb.send_result(UE.RESULT_ERROR) break except Exception as inst: