tests: move location of usb tests and update workflows for the new paths.
This commit is contained in:
4
.github/workflows/python-usb-export.yml
vendored
4
.github/workflows/python-usb-export.yml
vendored
@@ -3,7 +3,7 @@ name: USB Export Python Tests
|
|||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
paths: &python_usb_export_paths
|
paths: &python_usb_export_paths
|
||||||
- 'tools/test_usb_export.py'
|
- 'tools/tests/test_usb_export.py'
|
||||||
- 'tools/usb_export.py'
|
- 'tools/usb_export.py'
|
||||||
- 'tools/usb_common.py'
|
- 'tools/usb_common.py'
|
||||||
- 'tools/requirements.txt'
|
- 'tools/requirements.txt'
|
||||||
@@ -30,4 +30,4 @@ jobs:
|
|||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: |
|
run: |
|
||||||
python3 tools/test_usb_export.py
|
python3 tools/tests/test_usb_export.py
|
||||||
|
|||||||
4
.github/workflows/python-usb-install.yml
vendored
4
.github/workflows/python-usb-install.yml
vendored
@@ -3,7 +3,7 @@ name: USB Install Python Tests
|
|||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
paths: &python_usb_install_paths
|
paths: &python_usb_install_paths
|
||||||
- 'tools/test_usb_install.py'
|
- 'tools/tests/test_usb_install.py'
|
||||||
- 'tools/usb_install.py'
|
- 'tools/usb_install.py'
|
||||||
- 'tools/usb_common.py'
|
- 'tools/usb_common.py'
|
||||||
- 'tools/requirements.txt'
|
- 'tools/requirements.txt'
|
||||||
@@ -30,4 +30,4 @@ jobs:
|
|||||||
|
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: |
|
run: |
|
||||||
python3 tools/test_usb_install.py
|
python3 tools/tests/test_usb_install.py
|
||||||
|
|||||||
@@ -1,114 +0,0 @@
|
|||||||
import unittest
|
|
||||||
import crc32c
|
|
||||||
import os
|
|
||||||
|
|
||||||
from usb_common import CMD_EXPORT, CMD_QUIT, RESULT_OK, RESULT_ERROR
|
|
||||||
|
|
||||||
class FakeUsb:
|
|
||||||
def __init__(self, files=None):
|
|
||||||
# files: list of tuples (filename: str, data: bytes)
|
|
||||||
self.files = files or [("testfile.bin", b"testdata")]
|
|
||||||
self._cmd_index = 0
|
|
||||||
self._file_index = 0
|
|
||||||
self._data_index = 0
|
|
||||||
self.results = []
|
|
||||||
self._reading_filename = True
|
|
||||||
self._reading_data = False
|
|
||||||
self._current_data = b""
|
|
||||||
self._current_data_offset = 0
|
|
||||||
self._current_data_sent = 0
|
|
||||||
self._current_file = None
|
|
||||||
self._send_data_header_calls = 0
|
|
||||||
|
|
||||||
def wait_for_connect(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_send_header(self):
|
|
||||||
# Simulate command sequence: export for each file, then quit
|
|
||||||
if self._cmd_index < len(self.files):
|
|
||||||
filename, data = self.files[self._cmd_index]
|
|
||||||
self._current_file = (filename, data)
|
|
||||||
self._cmd_index += 1
|
|
||||||
self._reading_filename = True
|
|
||||||
self._reading_data = False
|
|
||||||
self._current_data = data
|
|
||||||
self._current_data_offset = 0
|
|
||||||
self._current_data_sent = 0
|
|
||||||
self._send_data_header_calls = 0
|
|
||||||
return [CMD_EXPORT, len(filename.encode("utf-8")), 0]
|
|
||||||
else:
|
|
||||||
return [CMD_QUIT, 0, 0]
|
|
||||||
|
|
||||||
def read(self, size):
|
|
||||||
# Simulate reading file name or data
|
|
||||||
if self._reading_filename:
|
|
||||||
filename = self._current_file[0].encode("utf-8")
|
|
||||||
self._reading_filename = False
|
|
||||||
self._reading_data = True
|
|
||||||
return filename[:size]
|
|
||||||
elif self._reading_data:
|
|
||||||
# Return file data for export
|
|
||||||
data = self._current_data[self._current_data_sent:self._current_data_sent+size]
|
|
||||||
self._current_data_sent += len(data)
|
|
||||||
return data
|
|
||||||
else:
|
|
||||||
return b""
|
|
||||||
|
|
||||||
def get_send_data_header(self):
|
|
||||||
# Simulate sending data in one chunk, then finish
|
|
||||||
if self._send_data_header_calls == 0:
|
|
||||||
self._send_data_header_calls += 1
|
|
||||||
data = self._current_data
|
|
||||||
crc = crc32c.crc32c(data)
|
|
||||||
return [0, len(data), crc]
|
|
||||||
else:
|
|
||||||
return [0, 0, 0] # End of transfer
|
|
||||||
|
|
||||||
def send_result(self, result):
|
|
||||||
self.results.append(result)
|
|
||||||
|
|
||||||
# test case for usb_export.py
|
|
||||||
class TestUsbExport(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
self.root = "test_output"
|
|
||||||
os.makedirs(self.root, exist_ok=True)
|
|
||||||
# 100 files named test1.bin, test2.bin, ..., test10.bin, each with different sizes
|
|
||||||
self.files = [
|
|
||||||
(f"test{i+1}.bin", bytes([65 + i]) * (i * 100 + 1)) for i in range(100)
|
|
||||||
]
|
|
||||||
self.fake_usb = FakeUsb(files=self.files)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
# Clean up created files/folders
|
|
||||||
for f in os.listdir(self.root):
|
|
||||||
os.remove(os.path.join(self.root, f))
|
|
||||||
os.rmdir(self.root)
|
|
||||||
|
|
||||||
def test_export_multiple_files(self):
|
|
||||||
from usb_export import get_file_name, create_file_folder, wait_for_input
|
|
||||||
|
|
||||||
# Simulate the main loop for all files
|
|
||||||
for filename, data in self.files:
|
|
||||||
cmd, name_len, _ = self.fake_usb.get_send_header()
|
|
||||||
self.assertEqual(cmd, CMD_EXPORT)
|
|
||||||
|
|
||||||
file_name = get_file_name(self.fake_usb, name_len)
|
|
||||||
self.assertEqual(file_name, filename)
|
|
||||||
|
|
||||||
full_path = create_file_folder(self.root, file_name)
|
|
||||||
self.fake_usb.send_result(RESULT_OK)
|
|
||||||
|
|
||||||
wait_for_input(self.fake_usb, full_path)
|
|
||||||
|
|
||||||
# Check file was created and contents match
|
|
||||||
with open(full_path, "rb") as f:
|
|
||||||
filedata = f.read()
|
|
||||||
self.assertEqual(filedata, data)
|
|
||||||
|
|
||||||
# After all files, should get CMD_QUIT
|
|
||||||
cmd, _, _ = self.fake_usb.get_send_header()
|
|
||||||
self.assertEqual(cmd, CMD_QUIT)
|
|
||||||
self.assertIn(RESULT_OK, self.fake_usb.results)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
@@ -1,103 +0,0 @@
|
|||||||
import unittest
|
|
||||||
import tempfile
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
import crc32c
|
|
||||||
|
|
||||||
from usb_common import RESULT_OK
|
|
||||||
|
|
||||||
# Simpler FakeUsb for file transfer
|
|
||||||
class FakeUsb:
|
|
||||||
def __init__(self, filedata):
|
|
||||||
self.filedata = filedata # bytes
|
|
||||||
self.results = []
|
|
||||||
self.writes = []
|
|
||||||
self._send_data_header_calls = 0
|
|
||||||
self._current_data = filedata
|
|
||||||
|
|
||||||
def wait_for_connect(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_send_data_header(self):
|
|
||||||
# Simulate sending the file in one chunk, then finish
|
|
||||||
if self._send_data_header_calls == 0:
|
|
||||||
self._send_data_header_calls += 1
|
|
||||||
data = self._current_data
|
|
||||||
crc = crc32c.crc32c(data)
|
|
||||||
return [0, len(data), crc]
|
|
||||||
else:
|
|
||||||
return [0, 0, 0]
|
|
||||||
|
|
||||||
def send_result(self, result, arg2=0, arg3=0):
|
|
||||||
self.results.append((result, arg2, arg3))
|
|
||||||
|
|
||||||
def write(self, data):
|
|
||||||
self.writes.append(data)
|
|
||||||
|
|
||||||
class TestUsbInstall(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
import random
|
|
||||||
self.tempdir = tempfile.mkdtemp()
|
|
||||||
# 100 files named test1.nsp, test2.xci, test3.nsz, test4.xcz, ..., cycling extensions, each with random sizes (0-2048 bytes)
|
|
||||||
extensions = ["nsp", "xci", "nsz", "xcz"]
|
|
||||||
self.files = [
|
|
||||||
(f"test{i+1}.{extensions[i % 4]}", os.urandom(random.randint(0, 2048))) for i in range(100)
|
|
||||||
]
|
|
||||||
self.filepaths = []
|
|
||||||
|
|
||||||
for fname, data in self.files:
|
|
||||||
fpath = os.path.join(self.tempdir, fname)
|
|
||||||
with open(fpath, "wb") as f:
|
|
||||||
f.write(data)
|
|
||||||
self.filepaths.append(fpath)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
shutil.rmtree(self.tempdir)
|
|
||||||
|
|
||||||
def test_multiple_file_install(self):
|
|
||||||
from usb_install import add_file_to_install_list, paths, wait_for_input
|
|
||||||
paths.clear()
|
|
||||||
|
|
||||||
for fpath in self.filepaths:
|
|
||||||
add_file_to_install_list(fpath)
|
|
||||||
|
|
||||||
for idx, (fname, data) in enumerate(self.files):
|
|
||||||
fake_usb = FakeUsb(data)
|
|
||||||
wait_for_input(fake_usb, idx)
|
|
||||||
|
|
||||||
# Check that the file on disk matches expected data
|
|
||||||
with open(self.filepaths[idx], "rb") as f:
|
|
||||||
filedata = f.read()
|
|
||||||
self.assertEqual(filedata, data)
|
|
||||||
found = False
|
|
||||||
|
|
||||||
for result, arg2, arg3 in fake_usb.results:
|
|
||||||
if result == RESULT_OK and arg2 == len(data):
|
|
||||||
found = True
|
|
||||||
self.assertTrue(found)
|
|
||||||
|
|
||||||
def test_directory_install(self):
|
|
||||||
from usb_install import add_file_to_install_list, paths, wait_for_input
|
|
||||||
paths.clear()
|
|
||||||
|
|
||||||
for fpath in self.filepaths:
|
|
||||||
add_file_to_install_list(fpath)
|
|
||||||
|
|
||||||
for idx, (fname, data) in enumerate(self.files):
|
|
||||||
fake_usb = FakeUsb(data)
|
|
||||||
wait_for_input(fake_usb, idx)
|
|
||||||
|
|
||||||
# Check that the file on disk matches expected data
|
|
||||||
with open(self.filepaths[idx], "rb") as f:
|
|
||||||
filedata = f.read()
|
|
||||||
|
|
||||||
self.assertEqual(filedata, data)
|
|
||||||
found = False
|
|
||||||
|
|
||||||
for result, arg2, arg3 in fake_usb.results:
|
|
||||||
if result == RESULT_OK and arg2 == len(data):
|
|
||||||
found = True
|
|
||||||
self.assertTrue(found)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
116
tools/tests/test_usb_export.py
Normal file
116
tools/tests/test_usb_export.py
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
import sys, os
|
||||||
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import crc32c
|
||||||
|
|
||||||
|
from usb_common import CMD_EXPORT, CMD_QUIT, RESULT_OK, RESULT_ERROR
|
||||||
|
|
||||||
|
class FakeUsb:
|
||||||
|
def __init__(self, files=None):
|
||||||
|
# files: list of tuples (filename: str, data: bytes)
|
||||||
|
self.files = files or [("testfile.bin", b"testdata")]
|
||||||
|
self._cmd_index = 0
|
||||||
|
self._file_index = 0
|
||||||
|
self._data_index = 0
|
||||||
|
self.results = []
|
||||||
|
self._reading_filename = True
|
||||||
|
self._reading_data = False
|
||||||
|
self._current_data = b""
|
||||||
|
self._current_data_offset = 0
|
||||||
|
self._current_data_sent = 0
|
||||||
|
self._current_file = None
|
||||||
|
self._send_data_header_calls = 0
|
||||||
|
|
||||||
|
def wait_for_connect(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_send_header(self):
|
||||||
|
# Simulate command sequence: export for each file, then quit
|
||||||
|
if self._cmd_index < len(self.files):
|
||||||
|
filename, data = self.files[self._cmd_index]
|
||||||
|
self._current_file = (filename, data)
|
||||||
|
self._cmd_index += 1
|
||||||
|
self._reading_filename = True
|
||||||
|
self._reading_data = False
|
||||||
|
self._current_data = data
|
||||||
|
self._current_data_offset = 0
|
||||||
|
self._current_data_sent = 0
|
||||||
|
self._send_data_header_calls = 0
|
||||||
|
return [CMD_EXPORT, len(filename.encode("utf-8")), 0]
|
||||||
|
else:
|
||||||
|
return [CMD_QUIT, 0, 0]
|
||||||
|
|
||||||
|
def read(self, size):
|
||||||
|
# Simulate reading file name or data
|
||||||
|
if self._reading_filename:
|
||||||
|
filename = self._current_file[0].encode("utf-8")
|
||||||
|
self._reading_filename = False
|
||||||
|
self._reading_data = True
|
||||||
|
return filename[:size]
|
||||||
|
elif self._reading_data:
|
||||||
|
# Return file data for export
|
||||||
|
data = self._current_data[self._current_data_sent:self._current_data_sent+size]
|
||||||
|
self._current_data_sent += len(data)
|
||||||
|
return data
|
||||||
|
else:
|
||||||
|
return b""
|
||||||
|
|
||||||
|
def get_send_data_header(self):
|
||||||
|
# Simulate sending data in one chunk, then finish
|
||||||
|
if self._send_data_header_calls == 0:
|
||||||
|
self._send_data_header_calls += 1
|
||||||
|
data = self._current_data
|
||||||
|
crc = crc32c.crc32c(data)
|
||||||
|
return [0, len(data), crc]
|
||||||
|
else:
|
||||||
|
return [0, 0, 0] # End of transfer
|
||||||
|
|
||||||
|
def send_result(self, result):
|
||||||
|
self.results.append(result)
|
||||||
|
|
||||||
|
# test case for usb_export.py
|
||||||
|
class TestUsbExport(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.root = "test_output"
|
||||||
|
os.makedirs(self.root, exist_ok=True)
|
||||||
|
# 100 files named test1.bin, test2.bin, ..., test10.bin, each with different sizes
|
||||||
|
self.files = [
|
||||||
|
(f"test{i+1}.bin", bytes([65 + i]) * (i * 100 + 1)) for i in range(100)
|
||||||
|
]
|
||||||
|
self.fake_usb = FakeUsb(files=self.files)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
# Clean up created files/folders
|
||||||
|
for f in os.listdir(self.root):
|
||||||
|
os.remove(os.path.join(self.root, f))
|
||||||
|
os.rmdir(self.root)
|
||||||
|
|
||||||
|
def test_export_multiple_files(self):
|
||||||
|
from usb_export import get_file_name, create_file_folder, wait_for_input
|
||||||
|
|
||||||
|
# Simulate the main loop for all files
|
||||||
|
for filename, data in self.files:
|
||||||
|
cmd, name_len, _ = self.fake_usb.get_send_header()
|
||||||
|
self.assertEqual(cmd, CMD_EXPORT)
|
||||||
|
|
||||||
|
file_name = get_file_name(self.fake_usb, name_len)
|
||||||
|
self.assertEqual(file_name, filename)
|
||||||
|
|
||||||
|
full_path = create_file_folder(self.root, file_name)
|
||||||
|
self.fake_usb.send_result(RESULT_OK)
|
||||||
|
|
||||||
|
wait_for_input(self.fake_usb, full_path)
|
||||||
|
|
||||||
|
# Check file was created and contents match
|
||||||
|
with open(full_path, "rb") as f:
|
||||||
|
filedata = f.read()
|
||||||
|
self.assertEqual(filedata, data)
|
||||||
|
|
||||||
|
# After all files, should get CMD_QUIT
|
||||||
|
cmd, _, _ = self.fake_usb.get_send_header()
|
||||||
|
self.assertEqual(cmd, CMD_QUIT)
|
||||||
|
self.assertIn(RESULT_OK, self.fake_usb.results)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
105
tools/tests/test_usb_install.py
Normal file
105
tools/tests/test_usb_install.py
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
import sys, os
|
||||||
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import tempfile
|
||||||
|
import shutil
|
||||||
|
import crc32c
|
||||||
|
|
||||||
|
from usb_common import RESULT_OK
|
||||||
|
|
||||||
|
# Simpler FakeUsb for file transfer
|
||||||
|
class FakeUsb:
|
||||||
|
def __init__(self, filedata):
|
||||||
|
self.filedata = filedata # bytes
|
||||||
|
self.results = []
|
||||||
|
self.writes = []
|
||||||
|
self._send_data_header_calls = 0
|
||||||
|
self._current_data = filedata
|
||||||
|
|
||||||
|
def wait_for_connect(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_send_data_header(self):
|
||||||
|
# Simulate sending the file in one chunk, then finish
|
||||||
|
if self._send_data_header_calls == 0:
|
||||||
|
self._send_data_header_calls += 1
|
||||||
|
data = self._current_data
|
||||||
|
crc = crc32c.crc32c(data)
|
||||||
|
return [0, len(data), crc]
|
||||||
|
else:
|
||||||
|
return [0, 0, 0]
|
||||||
|
|
||||||
|
def send_result(self, result, arg2=0, arg3=0):
|
||||||
|
self.results.append((result, arg2, arg3))
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
self.writes.append(data)
|
||||||
|
|
||||||
|
class TestUsbInstall(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
import random
|
||||||
|
self.tempdir = tempfile.mkdtemp()
|
||||||
|
# 100 files named test1.nsp, test2.xci, test3.nsz, test4.xcz, ..., cycling extensions, each with random sizes (0-2048 bytes)
|
||||||
|
extensions = ["nsp", "xci", "nsz", "xcz"]
|
||||||
|
self.files = [
|
||||||
|
(f"test{i+1}.{extensions[i % 4]}", os.urandom(random.randint(0, 2048))) for i in range(100)
|
||||||
|
]
|
||||||
|
self.filepaths = []
|
||||||
|
|
||||||
|
for fname, data in self.files:
|
||||||
|
fpath = os.path.join(self.tempdir, fname)
|
||||||
|
with open(fpath, "wb") as f:
|
||||||
|
f.write(data)
|
||||||
|
self.filepaths.append(fpath)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
shutil.rmtree(self.tempdir)
|
||||||
|
|
||||||
|
def test_multiple_file_install(self):
|
||||||
|
from usb_install import add_file_to_install_list, paths, wait_for_input
|
||||||
|
paths.clear()
|
||||||
|
|
||||||
|
for fpath in self.filepaths:
|
||||||
|
add_file_to_install_list(fpath)
|
||||||
|
|
||||||
|
for idx, (fname, data) in enumerate(self.files):
|
||||||
|
fake_usb = FakeUsb(data)
|
||||||
|
wait_for_input(fake_usb, idx)
|
||||||
|
|
||||||
|
# Check that the file on disk matches expected data
|
||||||
|
with open(self.filepaths[idx], "rb") as f:
|
||||||
|
filedata = f.read()
|
||||||
|
self.assertEqual(filedata, data)
|
||||||
|
found = False
|
||||||
|
|
||||||
|
for result, arg2, arg3 in fake_usb.results:
|
||||||
|
if result == RESULT_OK and arg2 == len(data):
|
||||||
|
found = True
|
||||||
|
self.assertTrue(found)
|
||||||
|
|
||||||
|
def test_directory_install(self):
|
||||||
|
from usb_install import add_file_to_install_list, paths, wait_for_input
|
||||||
|
paths.clear()
|
||||||
|
|
||||||
|
for fpath in self.filepaths:
|
||||||
|
add_file_to_install_list(fpath)
|
||||||
|
|
||||||
|
for idx, (fname, data) in enumerate(self.files):
|
||||||
|
fake_usb = FakeUsb(data)
|
||||||
|
wait_for_input(fake_usb, idx)
|
||||||
|
|
||||||
|
# Check that the file on disk matches expected data
|
||||||
|
with open(self.filepaths[idx], "rb") as f:
|
||||||
|
filedata = f.read()
|
||||||
|
|
||||||
|
self.assertEqual(filedata, data)
|
||||||
|
found = False
|
||||||
|
|
||||||
|
for result, arg2, arg3 in fake_usb.results:
|
||||||
|
if result == RESULT_OK and arg2 == len(data):
|
||||||
|
found = True
|
||||||
|
self.assertTrue(found)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user