Loading behave_utils/proc.py +167 −26 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ from subprocess import DEVNULL from subprocess import PIPE from subprocess import CalledProcessError from typing import IO from typing import TYPE_CHECKING from typing import Any from typing import BinaryIO from typing import Callable Loading Loading @@ -112,6 +113,55 @@ def exec_io( return proc.returncode @overload async def aexec_io( cmd: Arguments, *, input: bytes = b'', deserialiser: Deserialiser[T], **kwargs: Any, ) -> T: ... @overload async def aexec_io( cmd: Arguments, *, input: bytes = b'', deserialiser: None = None, **kwargs: Any, ) -> int: ... async def aexec_io( cmd: Arguments, *, input: bytes = b"", deserialiser: Deserialiser[Any]|None = None, **kwargs: Any, ) -> Any: """ Execute a command asynchronously, handling writing output to sys.stdout and sys.stderr If input is provided it will be fed to the process' stdin. If a deserialiser is provided it will be used to parse stdout data from the process. Stderr and stdout (if no deserialiser is provided) will be written to `sys.stderr` and `sys.stdout` respectively. Note that the data is written, not redirected. If either `sys.stdout` or `sys.stderr` is changed to an IO-like object with no file descriptor, this will still work. """ if deserialiser and 'stdout' in kwargs: raise TypeError("Cannot provide 'deserialiser' with 'stdout' argument") if input and 'stdin' in kwargs: raise TypeError("Cannot provide 'input' with 'stdin' argument") stdout: IO[str]|IO[bytes]|int = io.BytesIO() if deserialiser else kwargs.pop('stdout', sys.stdout) stderr: IO[str]|IO[bytes]|int = kwargs.pop('stderr', sys.stderr) proc = await _exec_io(cmd, input, stdout, stderr, kwargs) if deserialiser: assert isinstance(stdout, io.BytesIO) return deserialiser(stdout.getbuffer()) return proc.returncode async def _exec_io( cmd: Arguments, data: bytes, Loading Loading @@ -172,7 +222,41 @@ async def _passthru(in_stream: trio.abc.ReceiveStream, out_stream: IO[str]|IO[by await write(data) class Executor(list[Argument]): class _ExecutorBase(list[Argument]): if TYPE_CHECKING: E = TypeVar("E", bound="_ExecutorBase") def __init__(self, *cmd: Argument): self[:] = cmd def get_arguments( self, cmd: Arguments, kwargs: MutableMapping[str, Any], has_input: bool, is_query: bool, deserialiser: Deserialiser[Any]|None, ) -> Arguments: """ Override to amend command arguments and kwargs for exec_io() prior to execution """ return cmd def subcommand(self: E, *args: Argument) -> E: """ Return a new Executor instance of the same class with additional arguments appended The returned instance is created as a shallow copy; if attribute values need to be copied, subclasses must implement __copy__(). (see https://docs.python.org/3/library/copy.html) """ new = copy(self) new.extend(args) return new class Executor(_ExecutorBase): """ Manage calling executables with composable argument lists Loading @@ -183,12 +267,6 @@ class Executor(list[Argument]): is called. """ T = TypeVar("T") E = TypeVar("E", bound="Executor") def __init__(self, *cmd: Argument): self[:] = cmd @overload def __call__( self, Loading Loading @@ -274,27 +352,90 @@ class Executor(list[Argument]): raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None def get_arguments( class AsyncExecutor(_ExecutorBase): @overload async def __call__( self, cmd: Arguments, kwargs: MutableMapping[str, Any], has_input: bool, is_query: bool, deserialiser: Deserialiser[Any]|None, ) -> Arguments: """ Override to amend command arguments and kwargs for exec_io() prior to execution """ return cmd *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: Deserialiser[T], query: Literal[False] = False, **kwargs: Any, ) -> T: ... def subcommand(self: E, *args: Argument) -> E: @overload async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[True], **kwargs: Any, ) -> int: ... @overload async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[False] = False, **kwargs: Any, ) -> None: ... async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = None, deserialiser: Deserialiser[Any]|None = None, query: bool = False, **kwargs: Any, ) -> Any: """ Return a new Executor instance of the same class with additional arguments appended Execute the configure command with the given arguments The returned instance is created as a shallow copy; if attribute values need to be copied, subclasses must implement __copy__(). (see https://docs.python.org/3/library/copy.html) Input: Any bytes passed as "input" will be fed into the process' stdin pipe. Output: If "deserialiser" is provided it will be called with a memoryview of a buffer containing any bytes from the process' stdout; whatever is returned by "deserialiser" will be returned. If "query" is true the return code of the process will be returned. Otherwise nothing is returned. Note that "deserialiser" and "query" are mutually exclusive; if debugging is enabled an AssertionError will be raised if both are non-None/non-False, otherwise "query" is ignored. Errors: If "query" is not true any non-zero return code will cause CalledProcessError to be raised. """ new = copy(self) new.extend(args) return new assert not deserialiser or not query data = ( b"" if input is None else input.encode() if isinstance(input, str) else bytes(input) ) cmd = self.get_arguments( [*self, *args], kwargs, has_input=bool(data), is_query=query, deserialiser=deserialiser, ) if deserialiser: return await aexec_io(cmd, input=data, deserialiser=deserialiser, **kwargs) rcode = await aexec_io(cmd, input=data, **kwargs) if query: return rcode if 0 != rcode: raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None tests/unit/proc/test_async.py 0 → 100644 +205 −0 Original line number Diff line number Diff line # Copyright 2022 Dominik Sekotill <dom.sekotill@kodo.org.uk> # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ Unit tests for the async features of behave_utils.proc """ import io import os import subprocess import sys from sys import executable as python from typing import Any from behave_utils import json from behave_utils import proc from .. import TestCase from ..trio import trio_test FIXTURE_CMD = [python, "-BESsm", "tests.unit.proc.fixture_output"] JSON_OUTPUT = [*FIXTURE_CMD, "json"] ECHO_OUTPUT = [*FIXTURE_CMD, "echo"] TEST_BYTES = b"""lorem ipsum dolorum""" class ExecIOTests(TestCase): """ Tests for the behave_utils.proc.exec_io function """ @trio_test() async def test_deserialiser(self) -> None: """ Check that calling with a deserialiser correctly deserialises output """ with self.subTest(deserialiser=bytes): output: Any = await proc.aexec_io(JSON_OUTPUT, deserialiser=bytes) self.assertIsInstance(output, bytes) with self.subTest(deserialiser=json.JSONObject): output = await proc.aexec_io(JSON_OUTPUT, deserialiser=json.JSONObject.from_string) self.assertIsInstance(output, json.JSONObject) @trio_test() async def test_deserialiser_with_stdout(self) -> None: """ Check that calling with both deserialiser and stdout raises TypeError """ with self.assertRaises(TypeError): await proc.aexec_io(ECHO_OUTPUT, stdout=sys.stdout, deserialiser=bytes) @trio_test() async def test_input(self) -> None: """ Check that calling with the "input" argument passes bytes to stdin """ output = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, deserialiser=bytes) self.assertEqual(output, TEST_BYTES) @trio_test() async def test_input_with_stdin(self) -> None: """ Check that calling with both "input" and "stdin" arguments raises TypeError """ with self.assertRaises(TypeError): await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdin=sys.stdin) @trio_test() async def test_stdout(self) -> None: """ Check that calling with the "stdout" argument receives bytes from stdout """ with self.subTest(stdout="BytesIO"): bbuff = io.BytesIO() code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=bbuff) bbuff.seek(0) self.assertEqual(code, 0) self.assertEqual(bbuff.read(), TEST_BYTES) with self.subTest(stdout="StringIO"): sbuff = io.StringIO() code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=sbuff) sbuff.seek(0) self.assertEqual(code, 0) self.assertEqual(sbuff.read(), TEST_BYTES.decode()) with self.subTest(stdout="pipe"): read_fd, write_fd = os.pipe() code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=write_fd) os.close(write_fd) with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) self.assertEqual(code, 0) class ExecutorTests(TestCase): """ Tests for the behave_utils.proc.AsyncExecutor class """ @trio_test() async def test_deserialiser(self) -> None: """ Check that calling with a deserialiser correctly deserialises output """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(deserialiser=bytes): output: Any = await exe("json", deserialiser=bytes) self.assertIsInstance(output, bytes) with self.subTest(deserialiser=json.JSONObject): output = await exe("json", deserialiser=json.JSONObject.from_string) self.assertIsInstance(output, json.JSONObject) @trio_test() async def test_input(self) -> None: """ Check that calling with the "input" argument passes bytes to stdin """ exe = proc.AsyncExecutor(*FIXTURE_CMD) output = await exe("echo", input=TEST_BYTES, deserialiser=bytes) self.assertEqual(output, TEST_BYTES) @trio_test() async def test_stdout(self) -> None: """ Check that calling with the "stdout" argument receives bytes from stdout """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(stdout="BytesIO"): bbuff = io.BytesIO() await exe("echo", input=TEST_BYTES, stdout=bbuff) bbuff.seek(0) self.assertEqual(bbuff.read(), TEST_BYTES) with self.subTest(stdout="StringIO"): sbuff = io.StringIO() await exe("echo", input=TEST_BYTES, stdout=sbuff) sbuff.seek(0) self.assertEqual(sbuff.read(), TEST_BYTES.decode()) with self.subTest(stdout="pipe"): read_fd, write_fd = os.pipe() await exe("echo", input=TEST_BYTES, stdout=write_fd) os.close(write_fd) with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) @trio_test() async def test_return_code(self) -> None: """ Check that the "query" argument behaves as expected """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(query=True): await exe("rcode", "--code=3", query=True) with self.subTest(query=False), self.assertRaises(subprocess.CalledProcessError): await exe("rcode", "--code=3", query=False) with self.subTest(query=None), self.assertRaises(subprocess.CalledProcessError): await exe("rcode", "--code=3") @trio_test() async def test_subcommand(self) -> None: """ Check that the subcommand method returns a new instance with appended arguments """ class NewExecutor(proc.AsyncExecutor): ... with self.subTest(cls=proc.AsyncExecutor): exe = proc.AsyncExecutor("foo", "bar").subcommand("baz") self.assertIsInstance(exe, proc.AsyncExecutor) self.assertListEqual(exe, ["foo", "bar", "baz"]) with self.subTest(cls=NewExecutor): exe = NewExecutor("foo", "bar").subcommand("baz") self.assertIsInstance(exe, NewExecutor) self.assertListEqual(exe, ["foo", "bar", "baz"]) tests/unit/trio.py 0 → 100644 +41 −0 Original line number Diff line number Diff line # Copyright 2022 Dominik Sekotill <dom.sekotill@kodo.org.uk> # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ Utilities for writing trio-enabled async test code """ from __future__ import annotations from collections.abc import Awaitable from collections.abc import Callable from functools import partial from functools import wraps from typing import TYPE_CHECKING from trio import run from trio.abc import Clock if TYPE_CHECKING: from typing import TypeVar from typing_extensions import ParamSpec T = TypeVar("T") P = ParamSpec("P") def trio_test(clock: Clock|None = None) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, T]]: """ Return a decorator that wraps async test functions with a runner """ def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, T]: @wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: # https://github.com/python-trio/trio-typing/issues/58 return run(partial(func, *args, **kwargs), clock=clock) # type: ignore return wrapper return decorator Loading
behave_utils/proc.py +167 −26 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ from subprocess import DEVNULL from subprocess import PIPE from subprocess import CalledProcessError from typing import IO from typing import TYPE_CHECKING from typing import Any from typing import BinaryIO from typing import Callable Loading Loading @@ -112,6 +113,55 @@ def exec_io( return proc.returncode @overload async def aexec_io( cmd: Arguments, *, input: bytes = b'', deserialiser: Deserialiser[T], **kwargs: Any, ) -> T: ... @overload async def aexec_io( cmd: Arguments, *, input: bytes = b'', deserialiser: None = None, **kwargs: Any, ) -> int: ... async def aexec_io( cmd: Arguments, *, input: bytes = b"", deserialiser: Deserialiser[Any]|None = None, **kwargs: Any, ) -> Any: """ Execute a command asynchronously, handling writing output to sys.stdout and sys.stderr If input is provided it will be fed to the process' stdin. If a deserialiser is provided it will be used to parse stdout data from the process. Stderr and stdout (if no deserialiser is provided) will be written to `sys.stderr` and `sys.stdout` respectively. Note that the data is written, not redirected. If either `sys.stdout` or `sys.stderr` is changed to an IO-like object with no file descriptor, this will still work. """ if deserialiser and 'stdout' in kwargs: raise TypeError("Cannot provide 'deserialiser' with 'stdout' argument") if input and 'stdin' in kwargs: raise TypeError("Cannot provide 'input' with 'stdin' argument") stdout: IO[str]|IO[bytes]|int = io.BytesIO() if deserialiser else kwargs.pop('stdout', sys.stdout) stderr: IO[str]|IO[bytes]|int = kwargs.pop('stderr', sys.stderr) proc = await _exec_io(cmd, input, stdout, stderr, kwargs) if deserialiser: assert isinstance(stdout, io.BytesIO) return deserialiser(stdout.getbuffer()) return proc.returncode async def _exec_io( cmd: Arguments, data: bytes, Loading Loading @@ -172,7 +222,41 @@ async def _passthru(in_stream: trio.abc.ReceiveStream, out_stream: IO[str]|IO[by await write(data) class Executor(list[Argument]): class _ExecutorBase(list[Argument]): if TYPE_CHECKING: E = TypeVar("E", bound="_ExecutorBase") def __init__(self, *cmd: Argument): self[:] = cmd def get_arguments( self, cmd: Arguments, kwargs: MutableMapping[str, Any], has_input: bool, is_query: bool, deserialiser: Deserialiser[Any]|None, ) -> Arguments: """ Override to amend command arguments and kwargs for exec_io() prior to execution """ return cmd def subcommand(self: E, *args: Argument) -> E: """ Return a new Executor instance of the same class with additional arguments appended The returned instance is created as a shallow copy; if attribute values need to be copied, subclasses must implement __copy__(). (see https://docs.python.org/3/library/copy.html) """ new = copy(self) new.extend(args) return new class Executor(_ExecutorBase): """ Manage calling executables with composable argument lists Loading @@ -183,12 +267,6 @@ class Executor(list[Argument]): is called. """ T = TypeVar("T") E = TypeVar("E", bound="Executor") def __init__(self, *cmd: Argument): self[:] = cmd @overload def __call__( self, Loading Loading @@ -274,27 +352,90 @@ class Executor(list[Argument]): raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None def get_arguments( class AsyncExecutor(_ExecutorBase): @overload async def __call__( self, cmd: Arguments, kwargs: MutableMapping[str, Any], has_input: bool, is_query: bool, deserialiser: Deserialiser[Any]|None, ) -> Arguments: """ Override to amend command arguments and kwargs for exec_io() prior to execution """ return cmd *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: Deserialiser[T], query: Literal[False] = False, **kwargs: Any, ) -> T: ... def subcommand(self: E, *args: Argument) -> E: @overload async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[True], **kwargs: Any, ) -> int: ... @overload async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[False] = False, **kwargs: Any, ) -> None: ... async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = None, deserialiser: Deserialiser[Any]|None = None, query: bool = False, **kwargs: Any, ) -> Any: """ Return a new Executor instance of the same class with additional arguments appended Execute the configure command with the given arguments The returned instance is created as a shallow copy; if attribute values need to be copied, subclasses must implement __copy__(). (see https://docs.python.org/3/library/copy.html) Input: Any bytes passed as "input" will be fed into the process' stdin pipe. Output: If "deserialiser" is provided it will be called with a memoryview of a buffer containing any bytes from the process' stdout; whatever is returned by "deserialiser" will be returned. If "query" is true the return code of the process will be returned. Otherwise nothing is returned. Note that "deserialiser" and "query" are mutually exclusive; if debugging is enabled an AssertionError will be raised if both are non-None/non-False, otherwise "query" is ignored. Errors: If "query" is not true any non-zero return code will cause CalledProcessError to be raised. """ new = copy(self) new.extend(args) return new assert not deserialiser or not query data = ( b"" if input is None else input.encode() if isinstance(input, str) else bytes(input) ) cmd = self.get_arguments( [*self, *args], kwargs, has_input=bool(data), is_query=query, deserialiser=deserialiser, ) if deserialiser: return await aexec_io(cmd, input=data, deserialiser=deserialiser, **kwargs) rcode = await aexec_io(cmd, input=data, **kwargs) if query: return rcode if 0 != rcode: raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None
tests/unit/proc/test_async.py 0 → 100644 +205 −0 Original line number Diff line number Diff line # Copyright 2022 Dominik Sekotill <dom.sekotill@kodo.org.uk> # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ Unit tests for the async features of behave_utils.proc """ import io import os import subprocess import sys from sys import executable as python from typing import Any from behave_utils import json from behave_utils import proc from .. import TestCase from ..trio import trio_test FIXTURE_CMD = [python, "-BESsm", "tests.unit.proc.fixture_output"] JSON_OUTPUT = [*FIXTURE_CMD, "json"] ECHO_OUTPUT = [*FIXTURE_CMD, "echo"] TEST_BYTES = b"""lorem ipsum dolorum""" class ExecIOTests(TestCase): """ Tests for the behave_utils.proc.exec_io function """ @trio_test() async def test_deserialiser(self) -> None: """ Check that calling with a deserialiser correctly deserialises output """ with self.subTest(deserialiser=bytes): output: Any = await proc.aexec_io(JSON_OUTPUT, deserialiser=bytes) self.assertIsInstance(output, bytes) with self.subTest(deserialiser=json.JSONObject): output = await proc.aexec_io(JSON_OUTPUT, deserialiser=json.JSONObject.from_string) self.assertIsInstance(output, json.JSONObject) @trio_test() async def test_deserialiser_with_stdout(self) -> None: """ Check that calling with both deserialiser and stdout raises TypeError """ with self.assertRaises(TypeError): await proc.aexec_io(ECHO_OUTPUT, stdout=sys.stdout, deserialiser=bytes) @trio_test() async def test_input(self) -> None: """ Check that calling with the "input" argument passes bytes to stdin """ output = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, deserialiser=bytes) self.assertEqual(output, TEST_BYTES) @trio_test() async def test_input_with_stdin(self) -> None: """ Check that calling with both "input" and "stdin" arguments raises TypeError """ with self.assertRaises(TypeError): await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdin=sys.stdin) @trio_test() async def test_stdout(self) -> None: """ Check that calling with the "stdout" argument receives bytes from stdout """ with self.subTest(stdout="BytesIO"): bbuff = io.BytesIO() code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=bbuff) bbuff.seek(0) self.assertEqual(code, 0) self.assertEqual(bbuff.read(), TEST_BYTES) with self.subTest(stdout="StringIO"): sbuff = io.StringIO() code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=sbuff) sbuff.seek(0) self.assertEqual(code, 0) self.assertEqual(sbuff.read(), TEST_BYTES.decode()) with self.subTest(stdout="pipe"): read_fd, write_fd = os.pipe() code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=write_fd) os.close(write_fd) with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) self.assertEqual(code, 0) class ExecutorTests(TestCase): """ Tests for the behave_utils.proc.AsyncExecutor class """ @trio_test() async def test_deserialiser(self) -> None: """ Check that calling with a deserialiser correctly deserialises output """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(deserialiser=bytes): output: Any = await exe("json", deserialiser=bytes) self.assertIsInstance(output, bytes) with self.subTest(deserialiser=json.JSONObject): output = await exe("json", deserialiser=json.JSONObject.from_string) self.assertIsInstance(output, json.JSONObject) @trio_test() async def test_input(self) -> None: """ Check that calling with the "input" argument passes bytes to stdin """ exe = proc.AsyncExecutor(*FIXTURE_CMD) output = await exe("echo", input=TEST_BYTES, deserialiser=bytes) self.assertEqual(output, TEST_BYTES) @trio_test() async def test_stdout(self) -> None: """ Check that calling with the "stdout" argument receives bytes from stdout """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(stdout="BytesIO"): bbuff = io.BytesIO() await exe("echo", input=TEST_BYTES, stdout=bbuff) bbuff.seek(0) self.assertEqual(bbuff.read(), TEST_BYTES) with self.subTest(stdout="StringIO"): sbuff = io.StringIO() await exe("echo", input=TEST_BYTES, stdout=sbuff) sbuff.seek(0) self.assertEqual(sbuff.read(), TEST_BYTES.decode()) with self.subTest(stdout="pipe"): read_fd, write_fd = os.pipe() await exe("echo", input=TEST_BYTES, stdout=write_fd) os.close(write_fd) with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) @trio_test() async def test_return_code(self) -> None: """ Check that the "query" argument behaves as expected """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(query=True): await exe("rcode", "--code=3", query=True) with self.subTest(query=False), self.assertRaises(subprocess.CalledProcessError): await exe("rcode", "--code=3", query=False) with self.subTest(query=None), self.assertRaises(subprocess.CalledProcessError): await exe("rcode", "--code=3") @trio_test() async def test_subcommand(self) -> None: """ Check that the subcommand method returns a new instance with appended arguments """ class NewExecutor(proc.AsyncExecutor): ... with self.subTest(cls=proc.AsyncExecutor): exe = proc.AsyncExecutor("foo", "bar").subcommand("baz") self.assertIsInstance(exe, proc.AsyncExecutor) self.assertListEqual(exe, ["foo", "bar", "baz"]) with self.subTest(cls=NewExecutor): exe = NewExecutor("foo", "bar").subcommand("baz") self.assertIsInstance(exe, NewExecutor) self.assertListEqual(exe, ["foo", "bar", "baz"])
tests/unit/trio.py 0 → 100644 +41 −0 Original line number Diff line number Diff line # Copyright 2022 Dominik Sekotill <dom.sekotill@kodo.org.uk> # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ Utilities for writing trio-enabled async test code """ from __future__ import annotations from collections.abc import Awaitable from collections.abc import Callable from functools import partial from functools import wraps from typing import TYPE_CHECKING from trio import run from trio.abc import Clock if TYPE_CHECKING: from typing import TypeVar from typing_extensions import ParamSpec T = TypeVar("T") P = ParamSpec("P") def trio_test(clock: Clock|None = None) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, T]]: """ Return a decorator that wraps async test functions with a runner """ def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, T]: @wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: # https://github.com/python-trio/trio-typing/issues/58 return run(partial(func, *args, **kwargs), clock=clock) # type: ignore return wrapper return decorator