Clean up python (#212)

* Python cleanup
This commit is contained in:
AndrewSpangler
2025-08-30 19:48:37 -07:00
committed by GitHub
parent f0bdc01156
commit 0a8bc01870
3 changed files with 162 additions and 102 deletions

View File

@@ -1,50 +1,96 @@
import struct import struct
import usb.core from time import sleep
from usb.core
import usb.util import usb.util
import time
# magic number (SPH0) for the script and switch. SPLASH = """
MAGIC = 0x53504830 :@@@@@@@@@@@@@@@@@@@@@@@@@@:
#@ @#
#@ @#
#@ @#
#@ @@@@@@ @@@@@@ @#
#@ @@@@@@ @@@@@@ @#
#@ @@@@@@ @@@@@@ @#
#@ @#
#@ @#
#@ @#
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@ @% @@@@@@@@@@@@
#@ *@@@@* @% @@@@@@@@@@@@@
#@ @@@@@@@@ @% @@@@@@@@@@@@@
#@ @@@@@@@@ @% @@@@@@@@@@@@@
#@ *@@@@* @% @@@@@@@@@@@@@
#@ @% @@@@@@@@@@@@@
#@ @% @@@@@@@@@@@@@
#@ @% @@@@@@@@@@@@@
#@ @% @@@@@@@@@@@@@
#@ @% @@@@@@@@@@@@@
#@ @% @@@@@=--=@@@@
#@ @% @@@- -@@
#@ @% @@@ @@
#@ @% @@@- -@@
=@@ @% @@@@@=--=@@@@
=@@ @% @@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
"""
# commands class USB_ENUM:
CMD_QUIT = 0 # (SPH0) for the script and Switch
CMD_OPEN = 1 MAGIC = 0x53504830
CMD_EXPORT = 1
# results # Commands
RESULT_OK = 0 CMD_QUIT = 0
RESULT_ERROR = 1 CMD_OPEN = 1
CMD_EXPORT = 1
# flags # Result Codes
FLAG_NONE = 0 RESULT_OK = 0
FLAG_STREAM = 1 << 0 RESULT_ERROR = 1
# disabled, see usbds.cpp usbDsEndpoint_SetZlt # Flags
ENABLE_ZLT = 0 FLAG_NONE = 0
FLAG_STREAM = 1 << 0
# Switch Vendor / Product ID
VENDOR_ID = 0x057E
PRODUCT_ID = 0x3000
ENABLE_ZLT = 0
def find_switch() -> object | None:
return usb.core.find(
idVendor=USB_ENUM.VENDOR_ID,
idProduct=USB_ENUM.PRODUCT_ID
)
class Usb: class Usb:
def __init__(self): def __init__(self):
self.__out_ep = None self._out_ep = None
self.__in_ep = None self._in_ep = None
self.__packet_size = 0 self._packet_size = 0
self._packet_index = 0
def wait_for_connect(self) -> None: def wait_for_connect(self) -> None:
print("waiting for switch") print(SPLASH)
print("Waiting for Switch...", end="")
dev = None dev = None
while (dev is None): while dev is None:
dev = usb.core.find(idVendor=0x057E, idProduct=0x3000) if (dev := find_switch()):
if (dev is None): break
time.sleep(0.5) print(".", end="")
sleep(0.5)
print("found the switch!\n") print("Found the Switch!\n")
cfg = None cfg = None
print("Getting configuration...")
try: try:
cfg = dev.get_active_configuration() cfg = dev.get_active_configuration()
print("found active config") print("Found active config")
except usb.core.USBError: except usb.core.USBError:
print("no currently active config") print("No currently active config")
cfg = None cfg = None
if cfg is None: if cfg is None:
@@ -54,26 +100,36 @@ class Usb:
is_out_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT is_out_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT
is_in_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN is_in_ep = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN
self.__out_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_out_ep) self._out_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_out_ep)
self.__in_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_in_ep) self._in_ep = usb.util.find_descriptor(cfg[(0,0)], custom_match=is_in_ep)
assert self.__out_ep is not None
assert self.__in_ep is not None
print("iManufacturer: {} iProduct: {} iSerialNumber: {}".format(dev.manufacturer, dev.product, dev.serial_number)) if not self._out_ep:
print("bcdUSB: {} bMaxPacketSize0: {}".format(hex(dev.bcdUSB), dev.bMaxPacketSize0)) raise ValueError("Failed to get USB OUT address")
self.__packet_size = 1 << dev.bMaxPacketSize0 if not self._in_ep:
raise ValueError("Failed to get USB IN address")
print(f"iManufacturer: {dev.manufacturer} \
iProduct: {dev.product} \
iSerialNumber: {dev.serial_number}")
print(f"bcdUSB: {hex(dev.bcdUSB)} \
bMaxPacketSize0: {dev.bMaxPacketSize0}")
self._packet_size = 1 << dev.bMaxPacketSize0
def read(self, size: int, timeout: int = 0) -> bytes: def read(self, size: int, timeout: int = 0) -> bytes:
if (ENABLE_ZLT and size and (size % self.__packet_size) == 0): if (ENABLE_ZLT and size and not (size % self._packet_size)):
size += 1 size += 1
return self.__in_ep.read(size, timeout) return self._in_ep.read(size, timeout)
def write(self, buf: bytes, timeout: int = 0) -> int: def write(self, buf: bytes, timeout: int = 0) -> int:
return self.__out_ep.write(data=buf, timeout=timeout) packet = self._packet_index
self._packet_index += 1
# Todo, implement packet index long
return self._out_ep.write(data=buf, timeout=timeout)
def get_send_header(self) -> tuple[int, int, int]: def get_send_header(self) -> tuple[int, int, int]:
header = self.read(16) header = self.read(16)
[magic, arg2, arg3, arg4] = struct.unpack('<IIII', header) magic, arg2, arg3, arg4 = struct.unpack('<IIII', header)
if magic != MAGIC: if magic != MAGIC:
raise Exception("Unexpected magic {}".format(magic)) raise Exception("Unexpected magic {}".format(magic))
@@ -86,4 +142,4 @@ class Usb:
def send_result(self, result: int, arg3: int = 0, arg4: int = 0) -> None: def send_result(self, result: int, arg3: int = 0, arg4: int = 0) -> None:
send_data = struct.pack('<IIII', MAGIC, result, arg3, arg4) send_data = struct.pack('<IIII', MAGIC, result, arg3, arg4)
self.write(send_data) self.write(send_data)

View File

@@ -1,34 +1,40 @@
import crc32c import crc32c
import sys import sys
import os import os
from pathlib import Path from usb_common import Usb, USB_ENUM as UE, SPLASH
from usb_common import *
def get_file_name(usb: Usb, name_length: int) -> str: def get_file_name(usb: Usb, name_length: int) -> str:
return bytes(usb.read(name_length)).decode('utf-8') return bytes(usb.read(name_length)).decode('utf-8')
def create_file_folder(root: Path, file_path: Path) -> Path: def create_file_folder(root: os.PathLike, target: os.PathLike) -> bool, os.PathLike:
# todo: check if it already exists. """
full_path = Path(root + "/" + file_path) Creates a recursive folder structure at a given location
full_path.parent.mkdir(exist_ok=True, parents=True) Returns a boolean indicating if it already exists and the absolute path
print("created folder") """
path = os.abspath(os.path.join(root, target))
parent = os.path.dirname(path)
if not (os.path.exists(parent)):
os.makedirs(path)
print(f"Created folder {path}")
else:
print(f"Parent folder already exists {path}")
exists_already = os.path.exists(path)
return exists_already, path
return full_path def wait_for_input(usb: Usb, path: os.PathLike) -> None:
def wait_for_input(usb: Usb, path: Path) -> None:
print("now waiting for intput\n") print("now waiting for intput\n")
with open(path, "wb") as file: with open(path, "wb") as f:
print("opened file {}".format(path)) print(f"Opened file {path}")
while True: while True:
[off, size, crc32c_want] = usb.get_send_data_header() [off, size, crc32c_want] = usb.get_send_data_header()
# todo: this isn't needed really. # todo: this isn't needed really.
usb.send_result(RESULT_OK) usb.send_result(UE.RESULT_OK)
# check if we should finish now. # check if we should finish now.
if (off == 0 and size == 0): if off == 0 and size == 0:
break break
# read the buffer and calculate the crc32c. # read the buffer and calculate the crc32c.
@@ -36,33 +42,31 @@ def wait_for_input(usb: Usb, path: Path) -> None:
crc32c_got = crc32c.crc32c(buf) crc32c_got = crc32c.crc32c(buf)
# validate the crc32c matches. # validate the crc32c matches.
if (crc32c_want != crc32c_got): if crc32c_want != crc32c_got:
usb.send_result(RESULT_ERROR) usb.send_result(UE.RESULT_ERROR)
continue continue
try: try:
file.seek(off) f.seek(off)
file.write(buf) f.write(buf)
usb.send_result(RESULT_OK) usb.send_result(UE.RESULT_OK)
except BlockingIOError as e: except BlockingIOError as e:
print("Error: failed to write: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e))) print("Error: failed to write: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e)))
usb.send_result(RESULT_ERROR) usb.send_result(UE.RESULT_ERROR)
if __name__ == '__main__': if __name__ == '__main__':
print("hello world") print(SPLASH)
# check which mode the user has selected. if not len(args) == 2:
args = len(sys.argv) print("Pass root path as argument.")
if (args != 2):
print("pass the folder path")
sys.exit(1) sys.exit(1)
root_path = sys.argv[1] root_path = sys.argv[1]
if (not os.path.isdir(root_path)): if (not os.path.isdir(root_path)):
raise ValueError('must be a dir!') raise ValueError('')
usb: Usb = Usb() usb = Usb()
try: try:
# get usb endpoints. # get usb endpoints.
@@ -72,21 +76,21 @@ if __name__ == '__main__':
while True: while True:
[cmd, arg3, arg4] = usb.get_send_header() [cmd, arg3, arg4] = usb.get_send_header()
if (cmd == CMD_QUIT): if (cmd == UE.CMD_QUIT):
usb.send_result(RESULT_OK) usb.send_result(UE.RESULT_OK)
break break
elif (cmd == CMD_EXPORT): elif (cmd == UE.CMD_EXPORT):
usb.send_result(RESULT_OK) usb.send_result(UE.RESULT_OK)
# todo: handle and return errors here. # todo: handle and return errors here.
file_name = get_file_name(usb, arg3) file_name = get_file_name(usb, arg3)
full_path = create_file_folder(root_path, file_name) exists_already, full_path = create_file_folder(root_path, file_name)
usb.send_result(RESULT_OK) usb.send_result(UE.RESULT_OK)
wait_for_input(usb, full_path) wait_for_input(usb, full_path)
else: else:
usb.send_result(RESULT_ERROR) usb.send_result(UE.RESULT_ERROR)
break break
except Exception as inst: except Exception as e:
print("An exception occurred " + str(inst)) print(f"An exception occurred - {e} ")

View File

@@ -4,13 +4,13 @@ from io import BufferedReader
import sys import sys
import os import os
from pathlib import Path from pathlib import Path
from usb_common import * from usb_common import Usb, USB_ENUM as UE
try: try:
import rarfile import rarfile
has_rar_support: bool = True has_rar_support = True
except: except:
has_rar_support: bool = False has_rar_support = False
# list of installable exts that sphaira supports. # list of installable exts that sphaira supports.
INSTALLABLE_EXTS = (".nsp", ".xci", ".nsz", ".xcz") INSTALLABLE_EXTS = (".nsp", ".xci", ".nsz", ".xcz")
@@ -26,21 +26,21 @@ def send_file_info_result(usb: Usb, result: int, file_size: int, flags: int):
usb.send_result(result, size_msb, size_lsb) usb.send_result(result, size_msb, size_lsb)
def file_transfer_loop(usb: Usb, file: BufferedReader, flags: int) -> None: def file_transfer_loop(usb: Usb, file: BufferedReader, flags: int) -> None:
print("inside file transfer loop now") print("> Transfer Loop")
while True: while True:
# get offset + size. # get offset + size.
[off, size, _] = usb.get_send_data_header() [off, size, _] = usb.get_send_data_header()
# check if we should finish now. # check if we should finish now.
if (off == 0 and size == 0): if off == 0 and size == 0:
usb.send_result(RESULT_OK) usb.send_result(UE.RESULT_OK)
break break
# if we cannot seek, ensure that sphaira doesn't try to seek backwards. # if we cannot seek, ensure that sphaira doesn't try to seek backwards.
if (flags & FLAG_STREAM) and off < file.tell(): if (flags & UE.FLAG_STREAM) and off < file.tell():
print("Error: tried to seek on file without random access.") print(">> Error: Tried to seek on file without random access.")
usb.send_result(RESULT_ERROR) usb.send_result(UE.RESULT_ERROR)
continue continue
# read file and calculate the hash. # read file and calculate the hash.
@@ -48,12 +48,12 @@ def file_transfer_loop(usb: Usb, file: BufferedReader, flags: int) -> None:
file.seek(off) file.seek(off)
buf = file.read(size) buf = file.read(size)
except BlockingIOError as e: except BlockingIOError as e:
print("Error: failed to read: {} at: {} size: {} error: {}".format(e.filename, off, size, str(e))) print(f">> Error: Failed to read: {e.filename} at: {off} size: {size} error: {e}")
usb.send_result(RESULT_ERROR) usb.send_result(UE.RESULT_ERROR)
continue continue
# respond back with the length of the data and the crc32c. # respond back with the length of the data and the crc32c.
usb.send_result(RESULT_OK, len(buf), crc32c.crc32c(buf)) usb.send_result(UE.RESULT_OK, len(buf), crc32c.crc32c(buf))
# send the data. # send the data.
usb.write(buf) usb.write(buf)
@@ -63,8 +63,8 @@ def wait_for_input(usb: Usb, file_index: int) -> None:
# open file / rar. (todo: learn how to make a class with inheritance) # open file / rar. (todo: learn how to make a class with inheritance)
try: try:
[path, internal_path] = paths[file_index] path, internal_path = paths[file_index]
flags: int = FLAG_NONE flags: int = UE.FLAG_NONE
if path.endswith(".rar"): if path.endswith(".rar"):
with rarfile.RarFile(path, part_only=True) as rf: with rarfile.RarFile(path, part_only=True) as rf:
@@ -72,22 +72,22 @@ def wait_for_input(usb: Usb, file_index: int) -> None:
with rf.open(internal_path) as file: with rf.open(internal_path) as file:
# if the file is compressed, disable seek. # if the file is compressed, disable seek.
if info.compress_type != rarfile.RAR_M0: if info.compress_type != rarfile.RAR_M0:
flags |= FLAG_STREAM flags |= UE.FLAG_STREAM
print("opened file: {} flags: {}".format(internal_path, flags)) print("opened file: {} flags: {}".format(internal_path, flags))
send_file_info_result(usb, RESULT_OK, info.file_size, flags) send_file_info_result(usb, UE.RESULT_OK, info.file_size, flags)
file_transfer_loop(usb, file, flags) file_transfer_loop(usb, file, flags)
else: else:
with open(path, "rb") as file: with open(path, "rb") as file:
print("opened file {}".format(path)) print("opened file {}".format(path))
file.seek(0, os.SEEK_END) file.seek(0, os.SEEK_END)
file_size = file.tell() file_size = file.tell()
send_file_info_result(usb, RESULT_OK, file_size, flags) send_file_info_result(usb, UE.RESULT_OK, file_size, flags)
file_transfer_loop(usb, file, flags) file_transfer_loop(usb, file, flags)
except OSError as e: except OSError as e:
print("Error: failed to open: {} error: {}".format(e.filename, str(e))) print("Error: failed to open: {} error: {}".format(e.filename, str(e)))
usb.send_result(RESULT_ERROR) usb.send_result(UE.RESULT_ERROR)
def add_file_to_install_list(path: str) -> None: def add_file_to_install_list(path: str) -> None:
# if the type if a rar, check if it contains a support ext internally. # if the type if a rar, check if it contains a support ext internally.
@@ -108,7 +108,7 @@ def add_file_to_install_list(path: str) -> None:
paths.append([path, path]) paths.append([path, path])
if __name__ == '__main__': if __name__ == '__main__':
print("hello world") print(SPLASH)
# check which mode the user has selected. # check which mode the user has selected.
args = len(sys.argv) args = len(sys.argv)
@@ -125,7 +125,7 @@ if __name__ == '__main__':
if os.path.isfile(f): if os.path.isfile(f):
add_file_to_install_list(f) add_file_to_install_list(f)
else: else:
raise ValueError('must be a file!') raise ValueError('Must be a file!')
usb: Usb = Usb() usb: Usb = Usb()
@@ -135,27 +135,27 @@ if __name__ == '__main__':
# build string table. # build string table.
string_table: bytes string_table: bytes
for [_, path] in paths: for _, path in paths:
string_table += bytes(Path(path).name.__str__(), 'utf8') + b'\n' string_table += bytes(Path(path).name.__str__(), 'utf8') + b'\n'
# this reads the send header and checks the magic. # this reads the send header and checks the magic.
usb.get_send_header() usb.get_send_header()
# send recv and string table. # send recv and string table.
usb.send_result(RESULT_OK, len(string_table)) usb.send_result(UE.RESULT_OK, len(string_table))
usb.write(string_table) usb.write(string_table)
# wait for command. # wait for command.
while True: while True:
[cmd, arg3, arg4] = usb.get_send_header() cmd, arg3, arg4 = usb.get_send_header()
if cmd == CMD_QUIT: if cmd == UE.CMD_QUIT:
usb.send_result(RESULT_OK) usb.send_result(UE.RESULT_OK)
break break
elif cmd == CMD_OPEN: elif cmd == UE.CMD_OPEN:
wait_for_input(usb, arg3) wait_for_input(usb, arg3)
else: else:
usb.send_result(RESULT_ERROR) usb.send_result(UE.RESULT_ERROR)
break break
except Exception as inst: except Exception as inst: