From b0daed635320170b1e9018cc9c5a64a5c0744290 Mon Sep 17 00:00:00 2001 From: Dom Sekotill Date: Wed, 27 Apr 2022 00:43:28 +0100 Subject: [PATCH 1/2] Add async .proc.aexec_io function --- behave_utils/proc.py | 49 ++++++++++++++++ tests/unit/proc/test_async.py | 104 ++++++++++++++++++++++++++++++++++ tests/unit/trio.py | 41 ++++++++++++++ 3 files changed, 194 insertions(+) create mode 100644 tests/unit/proc/test_async.py create mode 100644 tests/unit/trio.py diff --git a/behave_utils/proc.py b/behave_utils/proc.py index 93923db..c7f0f6c 100644 --- a/behave_utils/proc.py +++ b/behave_utils/proc.py @@ -112,6 +112,55 @@ def exec_io( return proc.returncode +@overload +async def aexec_io( + cmd: Arguments, *, + input: bytes = b'', + deserialiser: Deserialiser[T], + **kwargs: Any, +) -> T: ... + + +@overload +async def aexec_io( + cmd: Arguments, *, + input: bytes = b'', + deserialiser: None = None, + **kwargs: Any, +) -> int: ... + + +async def aexec_io( + cmd: Arguments, *, + input: bytes = b"", + deserialiser: Deserialiser[Any]|None = None, + **kwargs: Any, +) -> Any: + """ + Execute a command asynchronously, handling writing output to sys.stdout and sys.stderr + + If input is provided it will be fed to the process' stdin. + If a deserialiser is provided it will be used to parse stdout data from the process. + + Stderr and stdout (if no deserialiser is provided) will be written to `sys.stderr` and + `sys.stdout` respectively. + + Note that the data is written, not redirected. If either `sys.stdout` or `sys.stderr` + is changed to an IO-like object with no file descriptor, this will still work. + """ + if deserialiser and 'stdout' in kwargs: + raise TypeError("Cannot provide 'deserialiser' with 'stdout' argument") + if input and 'stdin' in kwargs: + raise TypeError("Cannot provide 'input' with 'stdin' argument") + stdout: IO[str]|IO[bytes]|int = io.BytesIO() if deserialiser else kwargs.pop('stdout', sys.stdout) + stderr: IO[str]|IO[bytes]|int = kwargs.pop('stderr', sys.stderr) + proc = await _exec_io(cmd, input, stdout, stderr, kwargs) + if deserialiser: + assert isinstance(stdout, io.BytesIO) + return deserialiser(stdout.getbuffer()) + return proc.returncode + + async def _exec_io( cmd: Arguments, data: bytes, diff --git a/tests/unit/proc/test_async.py b/tests/unit/proc/test_async.py new file mode 100644 index 0000000..7dfb7c4 --- /dev/null +++ b/tests/unit/proc/test_async.py @@ -0,0 +1,104 @@ +# Copyright 2022 Dominik Sekotill +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +""" +Unit tests for the async features of behave_utils.proc +""" + +import io +import os +import sys +from sys import executable as python +from typing import Any + +from behave_utils import json +from behave_utils import proc + +from .. import TestCase +from ..trio import trio_test + +FIXTURE_CMD = [python, "-BESsm", "tests.unit.proc.fixture_output"] +JSON_OUTPUT = [*FIXTURE_CMD, "json"] +ECHO_OUTPUT = [*FIXTURE_CMD, "echo"] + +TEST_BYTES = b"""lorem ipsum dolorum""" + + +class ExecIOTests(TestCase): + """ + Tests for the behave_utils.proc.exec_io function + """ + + @trio_test() + async def test_deserialiser(self) -> None: + """ + Check that calling with a deserialiser correctly deserialises output + """ + with self.subTest(deserialiser=bytes): + output: Any = await proc.aexec_io(JSON_OUTPUT, deserialiser=bytes) + self.assertIsInstance(output, bytes) + + with self.subTest(deserialiser=json.JSONObject): + output = await proc.aexec_io(JSON_OUTPUT, deserialiser=json.JSONObject.from_string) + self.assertIsInstance(output, json.JSONObject) + + @trio_test() + async def test_deserialiser_with_stdout(self) -> None: + """ + Check that calling with both deserialiser and stdout raises TypeError + """ + with self.assertRaises(TypeError): + await proc.aexec_io(ECHO_OUTPUT, stdout=sys.stdout, deserialiser=bytes) + + @trio_test() + async def test_input(self) -> None: + """ + Check that calling with the "input" argument passes bytes to stdin + """ + output = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, deserialiser=bytes) + + self.assertEqual(output, TEST_BYTES) + + @trio_test() + async def test_input_with_stdin(self) -> None: + """ + Check that calling with both "input" and "stdin" arguments raises TypeError + """ + with self.assertRaises(TypeError): + await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdin=sys.stdin) + + @trio_test() + async def test_stdout(self) -> None: + """ + Check that calling with the "stdout" argument receives bytes from stdout + """ + with self.subTest(stdout="BytesIO"): + bbuff = io.BytesIO() + + code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=bbuff) + + bbuff.seek(0) + self.assertEqual(code, 0) + self.assertEqual(bbuff.read(), TEST_BYTES) + + with self.subTest(stdout="StringIO"): + sbuff = io.StringIO() + + code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=sbuff) + + sbuff.seek(0) + self.assertEqual(code, 0) + self.assertEqual(sbuff.read(), TEST_BYTES.decode()) + + with self.subTest(stdout="pipe"): + read_fd, write_fd = os.pipe() + + code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=write_fd) + + os.close(write_fd) + with io.open(read_fd, mode="rb") as pipe: + self.assertEqual(pipe.read(), TEST_BYTES) + self.assertEqual(code, 0) diff --git a/tests/unit/trio.py b/tests/unit/trio.py new file mode 100644 index 0000000..a6e8e89 --- /dev/null +++ b/tests/unit/trio.py @@ -0,0 +1,41 @@ +# Copyright 2022 Dominik Sekotill +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +""" +Utilities for writing trio-enabled async test code +""" + +from __future__ import annotations + +from collections.abc import Awaitable +from collections.abc import Callable +from functools import partial +from functools import wraps +from typing import TYPE_CHECKING + +from trio import run +from trio.abc import Clock + +if TYPE_CHECKING: + from typing import TypeVar + + from typing_extensions import ParamSpec + + T = TypeVar("T") + P = ParamSpec("P") + + +def trio_test(clock: Clock|None = None) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, T]]: + """ + Return a decorator that wraps async test functions with a runner + """ + def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, T]: + @wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + # https://github.com/python-trio/trio-typing/issues/58 + return run(partial(func, *args, **kwargs), clock=clock) # type: ignore + return wrapper + return decorator -- GitLab From addeb5aa34482f73279de7cda7baa2e431055e61 Mon Sep 17 00:00:00 2001 From: Dom Sekotill Date: Wed, 27 Apr 2022 01:48:12 +0100 Subject: [PATCH 2/2] Add .proc.AsyncExecutor --- behave_utils/proc.py | 144 ++++++++++++++++++++++++++++------ tests/unit/proc/test_async.py | 101 ++++++++++++++++++++++++ 2 files changed, 219 insertions(+), 26 deletions(-) diff --git a/behave_utils/proc.py b/behave_utils/proc.py index c7f0f6c..fc9d08f 100644 --- a/behave_utils/proc.py +++ b/behave_utils/proc.py @@ -21,6 +21,7 @@ from subprocess import DEVNULL from subprocess import PIPE from subprocess import CalledProcessError from typing import IO +from typing import TYPE_CHECKING from typing import Any from typing import BinaryIO from typing import Callable @@ -221,7 +222,41 @@ async def _passthru(in_stream: trio.abc.ReceiveStream, out_stream: IO[str]|IO[by await write(data) -class Executor(list[Argument]): +class _ExecutorBase(list[Argument]): + + if TYPE_CHECKING: + E = TypeVar("E", bound="_ExecutorBase") + + def __init__(self, *cmd: Argument): + self[:] = cmd + + def get_arguments( + self, + cmd: Arguments, + kwargs: MutableMapping[str, Any], + has_input: bool, + is_query: bool, + deserialiser: Deserialiser[Any]|None, + ) -> Arguments: + """ + Override to amend command arguments and kwargs for exec_io() prior to execution + """ + return cmd + + def subcommand(self: E, *args: Argument) -> E: + """ + Return a new Executor instance of the same class with additional arguments appended + + The returned instance is created as a shallow copy; if attribute values need to be + copied, subclasses must implement __copy__(). + (see https://docs.python.org/3/library/copy.html) + """ + new = copy(self) + new.extend(args) + return new + + +class Executor(_ExecutorBase): """ Manage calling executables with composable argument lists @@ -232,12 +267,6 @@ class Executor(list[Argument]): is called. """ - T = TypeVar("T") - E = TypeVar("E", bound="Executor") - - def __init__(self, *cmd: Argument): - self[:] = cmd - @overload def __call__( self, @@ -323,27 +352,90 @@ class Executor(list[Argument]): raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None - def get_arguments( + +class AsyncExecutor(_ExecutorBase): + + @overload + async def __call__( self, - cmd: Arguments, - kwargs: MutableMapping[str, Any], - has_input: bool, - is_query: bool, - deserialiser: Deserialiser[Any]|None, - ) -> Arguments: - """ - Override to amend command arguments and kwargs for exec_io() prior to execution - """ - return cmd + *args: Argument, + input: str|bytes|SupportsBytes|None = ..., + deserialiser: Deserialiser[T], + query: Literal[False] = False, + **kwargs: Any, + ) -> T: ... - def subcommand(self: E, *args: Argument) -> E: + @overload + async def __call__( + self, + *args: Argument, + input: str|bytes|SupportsBytes|None = ..., + deserialiser: None = None, + query: Literal[True], + **kwargs: Any, + ) -> int: ... + + @overload + async def __call__( + self, + *args: Argument, + input: str|bytes|SupportsBytes|None = ..., + deserialiser: None = None, + query: Literal[False] = False, + **kwargs: Any, + ) -> None: ... + + async def __call__( + self, + *args: Argument, + input: str|bytes|SupportsBytes|None = None, + deserialiser: Deserialiser[Any]|None = None, + query: bool = False, + **kwargs: Any, + ) -> Any: """ - Return a new Executor instance of the same class with additional arguments appended + Execute the configure command with the given arguments - The returned instance is created as a shallow copy; if attribute values need to be - copied, subclasses must implement __copy__(). - (see https://docs.python.org/3/library/copy.html) + Input: + Any bytes passed as "input" will be fed into the process' stdin pipe. + + Output: + If "deserialiser" is provided it will be called with a memoryview of a buffer + containing any bytes from the process' stdout; whatever is returned by + "deserialiser" will be returned. + + If "query" is true the return code of the process will be returned. + + Otherwise nothing is returned. + + Note that "deserialiser" and "query" are mutually exclusive; if debugging is + enabled an AssertionError will be raised if both are non-None/non-False, otherwise + "query" is ignored. + + Errors: + If "query" is not true any non-zero return code will cause CalledProcessError to + be raised. """ - new = copy(self) - new.extend(args) - return new + assert not deserialiser or not query + + data = ( + b"" if input is None else + input.encode() if isinstance(input, str) else + bytes(input) + ) + cmd = self.get_arguments( + [*self, *args], kwargs, + has_input=bool(data), + is_query=query, + deserialiser=deserialiser, + ) + + if deserialiser: + return await aexec_io(cmd, input=data, deserialiser=deserialiser, **kwargs) + + rcode = await aexec_io(cmd, input=data, **kwargs) + if query: + return rcode + if 0 != rcode: + raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) + return None diff --git a/tests/unit/proc/test_async.py b/tests/unit/proc/test_async.py index 7dfb7c4..43363f9 100644 --- a/tests/unit/proc/test_async.py +++ b/tests/unit/proc/test_async.py @@ -10,6 +10,7 @@ Unit tests for the async features of behave_utils.proc import io import os +import subprocess import sys from sys import executable as python from typing import Any @@ -102,3 +103,103 @@ class ExecIOTests(TestCase): with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) self.assertEqual(code, 0) + + +class ExecutorTests(TestCase): + """ + Tests for the behave_utils.proc.AsyncExecutor class + """ + + @trio_test() + async def test_deserialiser(self) -> None: + """ + Check that calling with a deserialiser correctly deserialises output + """ + exe = proc.AsyncExecutor(*FIXTURE_CMD) + + with self.subTest(deserialiser=bytes): + output: Any = await exe("json", deserialiser=bytes) + self.assertIsInstance(output, bytes) + + with self.subTest(deserialiser=json.JSONObject): + output = await exe("json", deserialiser=json.JSONObject.from_string) + self.assertIsInstance(output, json.JSONObject) + + @trio_test() + async def test_input(self) -> None: + """ + Check that calling with the "input" argument passes bytes to stdin + """ + exe = proc.AsyncExecutor(*FIXTURE_CMD) + + output = await exe("echo", input=TEST_BYTES, deserialiser=bytes) + + self.assertEqual(output, TEST_BYTES) + + @trio_test() + async def test_stdout(self) -> None: + """ + Check that calling with the "stdout" argument receives bytes from stdout + """ + exe = proc.AsyncExecutor(*FIXTURE_CMD) + + with self.subTest(stdout="BytesIO"): + bbuff = io.BytesIO() + + await exe("echo", input=TEST_BYTES, stdout=bbuff) + + bbuff.seek(0) + self.assertEqual(bbuff.read(), TEST_BYTES) + + with self.subTest(stdout="StringIO"): + sbuff = io.StringIO() + + await exe("echo", input=TEST_BYTES, stdout=sbuff) + + sbuff.seek(0) + self.assertEqual(sbuff.read(), TEST_BYTES.decode()) + + with self.subTest(stdout="pipe"): + read_fd, write_fd = os.pipe() + + await exe("echo", input=TEST_BYTES, stdout=write_fd) + + os.close(write_fd) + with io.open(read_fd, mode="rb") as pipe: + self.assertEqual(pipe.read(), TEST_BYTES) + + @trio_test() + async def test_return_code(self) -> None: + """ + Check that the "query" argument behaves as expected + """ + exe = proc.AsyncExecutor(*FIXTURE_CMD) + + with self.subTest(query=True): + await exe("rcode", "--code=3", query=True) + + with self.subTest(query=False), self.assertRaises(subprocess.CalledProcessError): + await exe("rcode", "--code=3", query=False) + + with self.subTest(query=None), self.assertRaises(subprocess.CalledProcessError): + await exe("rcode", "--code=3") + + @trio_test() + async def test_subcommand(self) -> None: + """ + Check that the subcommand method returns a new instance with appended arguments + """ + class NewExecutor(proc.AsyncExecutor): + ... + + with self.subTest(cls=proc.AsyncExecutor): + exe = proc.AsyncExecutor("foo", "bar").subcommand("baz") + + self.assertIsInstance(exe, proc.AsyncExecutor) + self.assertListEqual(exe, ["foo", "bar", "baz"]) + + with self.subTest(cls=NewExecutor): + exe = NewExecutor("foo", "bar").subcommand("baz") + + self.assertIsInstance(exe, NewExecutor) + self.assertListEqual(exe, ["foo", "bar", "baz"]) -- GitLab