K900 | diff --git a/nixos/lib/test-driver/test-driver.py b/nixos/lib/test-driver/test-driver.py
index 3ee8b3227c6..48831f53188 100755
--- a/nixos/lib/test-driver/test-driver.py
+++ b/nixos/lib/test-driver/test-driver.py
@@ -8,7 +8,6 @@ import queue
import io
import threading
import argparse
-import atexit
import base64
import codecs
import os
@@ -583,24 +582,37 @@ class Machine:
)
def execute(self, command: str) -> Tuple[int, str]:
+ status_code_magic = "|!=EOF"
+ status_code_magic_bytes = status_code_magic.encode()
+
self.connect()
- out_command = "( set -euo pipefail; {} ); echo '|!=EOF' $?\n".format(command)
+ out_command = (
+ f"( set -euo pipefail; {command} ); echo '{status_code_magic}' $?\n"
+ )
assert self.shell
self.shell.send(out_command.encode())
- output = ""
- status_code_pattern = re.compile(r"(.*)\|\!=EOF\s+(\d+)")
+ output = b""
while True:
- chunk = self.shell.recv(4096).decode(errors="ignore")
- match = status_code_pattern.match(chunk)
- if match:
- output += match[1]
- status_code = int(match[2])
- return (status_code, output)
+ chunk = self.shell.recv(4096)
output += chunk
+ if not chunk:
+ return (-255, output.decode())
+
+ print("current output is")
+ print(output.decode())
+
+ if status_code_magic_bytes in output:
+ output, status_code_b = output.rsplit(
+ status_code_magic_bytes, maxsplit=1
+ )
+ status_code = int(status_code_b.strip())
+ print("got status code", status_code)
+ return (status_code, output.decode())
+
def shell_interact(self) -> None:
"""Allows you to interact with the guest shell
@@ -1034,7 +1046,7 @@ class Machine:
assert self.monitor
assert self.serial_thread
- self.process.terminate()
+ self.process.kill()
self.shell.close()
self.monitor.close()
self.serial_thread.join()
@@ -1128,11 +1140,13 @@ class Driver:
for cmd in cmd(start_scripts)
]
- @atexit.register
- def clean_up() -> None:
- with rootlog.nested("clean up"):
- for machine in self.machines:
- machine.release()
+ def __enter__(self) -> "Driver":
+ return self
+
+ def __exit__(self, *_: Any) -> None:
+ with rootlog.nested("clean up"):
+ for machine in self.machines:
+ machine.release()
def subtest(self, name: str) -> Iterator[None]:
"""Group logs under a given test name"""
@@ -1307,14 +1321,13 @@ if __name__ == "__main__":
if not args.keep_vm_state:
rootlog.info("Machine state will be reset. To keep it, pass --keep-vm-state")
- driver = Driver(
+ with Driver(
args.start_scripts, args.vlans, args.testscript.read_text(), args.keep_vm_state
- )
-
- if args.interactive:
- ptpython.repl.embed(driver.test_symbols(), {})
- else:
- tic = time.time()
- driver.run_tests()
- toc = time.time()
- rootlog.info(f"test script finished in {(toc-tic):.2f}s")
+ ) as driver:
+ if args.interactive:
+ ptpython.repl.embed(driver.test_symbols(), {})
+ else:
+ tic = time.time()
+ driver.run_tests()
+ toc = time.time()
+ rootlog.info(f"test script finished in {(toc-tic):.2f}s")
| 20:43:02 |