Loading behave_utils/proc.py +118 −26 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ from subprocess import DEVNULL from subprocess import PIPE from subprocess import CalledProcessError from typing import IO from typing import TYPE_CHECKING from typing import Any from typing import BinaryIO from typing import Callable Loading Loading @@ -221,7 +222,41 @@ async def _passthru(in_stream: trio.abc.ReceiveStream, out_stream: IO[str]|IO[by await write(data) class Executor(list[Argument]): class _ExecutorBase(list[Argument]): if TYPE_CHECKING: E = TypeVar("E", bound="_ExecutorBase") def __init__(self, *cmd: Argument): self[:] = cmd def get_arguments( self, cmd: Arguments, kwargs: MutableMapping[str, Any], has_input: bool, is_query: bool, deserialiser: Deserialiser[Any]|None, ) -> Arguments: """ Override to amend command arguments and kwargs for exec_io() prior to execution """ return cmd def subcommand(self: E, *args: Argument) -> E: """ Return a new Executor instance of the same class with additional arguments appended The returned instance is created as a shallow copy; if attribute values need to be copied, subclasses must implement __copy__(). (see https://docs.python.org/3/library/copy.html) """ new = copy(self) new.extend(args) return new class Executor(_ExecutorBase): """ Manage calling executables with composable argument lists Loading @@ -232,12 +267,6 @@ class Executor(list[Argument]): is called. """ T = TypeVar("T") E = TypeVar("E", bound="Executor") def __init__(self, *cmd: Argument): self[:] = cmd @overload def __call__( self, Loading Loading @@ -323,27 +352,90 @@ class Executor(list[Argument]): raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None def get_arguments( class AsyncExecutor(_ExecutorBase): @overload async def __call__( self, cmd: Arguments, kwargs: MutableMapping[str, Any], has_input: bool, is_query: bool, deserialiser: Deserialiser[Any]|None, ) -> Arguments: """ Override to amend command arguments and kwargs for exec_io() prior to execution """ return cmd *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: Deserialiser[T], query: Literal[False] = False, **kwargs: Any, ) -> T: ... def subcommand(self: E, *args: Argument) -> E: @overload async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[True], **kwargs: Any, ) -> int: ... @overload async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[False] = False, **kwargs: Any, ) -> None: ... async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = None, deserialiser: Deserialiser[Any]|None = None, query: bool = False, **kwargs: Any, ) -> Any: """ Return a new Executor instance of the same class with additional arguments appended Execute the configure command with the given arguments The returned instance is created as a shallow copy; if attribute values need to be copied, subclasses must implement __copy__(). (see https://docs.python.org/3/library/copy.html) Input: Any bytes passed as "input" will be fed into the process' stdin pipe. Output: If "deserialiser" is provided it will be called with a memoryview of a buffer containing any bytes from the process' stdout; whatever is returned by "deserialiser" will be returned. If "query" is true the return code of the process will be returned. Otherwise nothing is returned. Note that "deserialiser" and "query" are mutually exclusive; if debugging is enabled an AssertionError will be raised if both are non-None/non-False, otherwise "query" is ignored. Errors: If "query" is not true any non-zero return code will cause CalledProcessError to be raised. """ new = copy(self) new.extend(args) return new assert not deserialiser or not query data = ( b"" if input is None else input.encode() if isinstance(input, str) else bytes(input) ) cmd = self.get_arguments( [*self, *args], kwargs, has_input=bool(data), is_query=query, deserialiser=deserialiser, ) if deserialiser: return await aexec_io(cmd, input=data, deserialiser=deserialiser, **kwargs) rcode = await aexec_io(cmd, input=data, **kwargs) if query: return rcode if 0 != rcode: raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None tests/unit/proc/test_async.py +101 −0 Original line number Diff line number Diff line Loading @@ -10,6 +10,7 @@ Unit tests for the async features of behave_utils.proc import io import os import subprocess import sys from sys import executable as python from typing import Any Loading Loading @@ -102,3 +103,103 @@ class ExecIOTests(TestCase): with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) self.assertEqual(code, 0) class ExecutorTests(TestCase): """ Tests for the behave_utils.proc.AsyncExecutor class """ @trio_test() async def test_deserialiser(self) -> None: """ Check that calling with a deserialiser correctly deserialises output """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(deserialiser=bytes): output: Any = await exe("json", deserialiser=bytes) self.assertIsInstance(output, bytes) with self.subTest(deserialiser=json.JSONObject): output = await exe("json", deserialiser=json.JSONObject.from_string) self.assertIsInstance(output, json.JSONObject) @trio_test() async def test_input(self) -> None: """ Check that calling with the "input" argument passes bytes to stdin """ exe = proc.AsyncExecutor(*FIXTURE_CMD) output = await exe("echo", input=TEST_BYTES, deserialiser=bytes) self.assertEqual(output, TEST_BYTES) @trio_test() async def test_stdout(self) -> None: """ Check that calling with the "stdout" argument receives bytes from stdout """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(stdout="BytesIO"): bbuff = io.BytesIO() await exe("echo", input=TEST_BYTES, stdout=bbuff) bbuff.seek(0) self.assertEqual(bbuff.read(), TEST_BYTES) with self.subTest(stdout="StringIO"): sbuff = io.StringIO() await exe("echo", input=TEST_BYTES, stdout=sbuff) sbuff.seek(0) self.assertEqual(sbuff.read(), TEST_BYTES.decode()) with self.subTest(stdout="pipe"): read_fd, write_fd = os.pipe() await exe("echo", input=TEST_BYTES, stdout=write_fd) os.close(write_fd) with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) @trio_test() async def test_return_code(self) -> None: """ Check that the "query" argument behaves as expected """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(query=True): await exe("rcode", "--code=3", query=True) with self.subTest(query=False), self.assertRaises(subprocess.CalledProcessError): await exe("rcode", "--code=3", query=False) with self.subTest(query=None), self.assertRaises(subprocess.CalledProcessError): await exe("rcode", "--code=3") @trio_test() async def test_subcommand(self) -> None: """ Check that the subcommand method returns a new instance with appended arguments """ class NewExecutor(proc.AsyncExecutor): ... with self.subTest(cls=proc.AsyncExecutor): exe = proc.AsyncExecutor("foo", "bar").subcommand("baz") self.assertIsInstance(exe, proc.AsyncExecutor) self.assertListEqual(exe, ["foo", "bar", "baz"]) with self.subTest(cls=NewExecutor): exe = NewExecutor("foo", "bar").subcommand("baz") self.assertIsInstance(exe, NewExecutor) self.assertListEqual(exe, ["foo", "bar", "baz"]) Loading
behave_utils/proc.py +118 −26 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ from subprocess import DEVNULL from subprocess import PIPE from subprocess import CalledProcessError from typing import IO from typing import TYPE_CHECKING from typing import Any from typing import BinaryIO from typing import Callable Loading Loading @@ -221,7 +222,41 @@ async def _passthru(in_stream: trio.abc.ReceiveStream, out_stream: IO[str]|IO[by await write(data) class Executor(list[Argument]): class _ExecutorBase(list[Argument]): if TYPE_CHECKING: E = TypeVar("E", bound="_ExecutorBase") def __init__(self, *cmd: Argument): self[:] = cmd def get_arguments( self, cmd: Arguments, kwargs: MutableMapping[str, Any], has_input: bool, is_query: bool, deserialiser: Deserialiser[Any]|None, ) -> Arguments: """ Override to amend command arguments and kwargs for exec_io() prior to execution """ return cmd def subcommand(self: E, *args: Argument) -> E: """ Return a new Executor instance of the same class with additional arguments appended The returned instance is created as a shallow copy; if attribute values need to be copied, subclasses must implement __copy__(). (see https://docs.python.org/3/library/copy.html) """ new = copy(self) new.extend(args) return new class Executor(_ExecutorBase): """ Manage calling executables with composable argument lists Loading @@ -232,12 +267,6 @@ class Executor(list[Argument]): is called. """ T = TypeVar("T") E = TypeVar("E", bound="Executor") def __init__(self, *cmd: Argument): self[:] = cmd @overload def __call__( self, Loading Loading @@ -323,27 +352,90 @@ class Executor(list[Argument]): raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None def get_arguments( class AsyncExecutor(_ExecutorBase): @overload async def __call__( self, cmd: Arguments, kwargs: MutableMapping[str, Any], has_input: bool, is_query: bool, deserialiser: Deserialiser[Any]|None, ) -> Arguments: """ Override to amend command arguments and kwargs for exec_io() prior to execution """ return cmd *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: Deserialiser[T], query: Literal[False] = False, **kwargs: Any, ) -> T: ... def subcommand(self: E, *args: Argument) -> E: @overload async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[True], **kwargs: Any, ) -> int: ... @overload async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = ..., deserialiser: None = None, query: Literal[False] = False, **kwargs: Any, ) -> None: ... async def __call__( self, *args: Argument, input: str|bytes|SupportsBytes|None = None, deserialiser: Deserialiser[Any]|None = None, query: bool = False, **kwargs: Any, ) -> Any: """ Return a new Executor instance of the same class with additional arguments appended Execute the configure command with the given arguments The returned instance is created as a shallow copy; if attribute values need to be copied, subclasses must implement __copy__(). (see https://docs.python.org/3/library/copy.html) Input: Any bytes passed as "input" will be fed into the process' stdin pipe. Output: If "deserialiser" is provided it will be called with a memoryview of a buffer containing any bytes from the process' stdout; whatever is returned by "deserialiser" will be returned. If "query" is true the return code of the process will be returned. Otherwise nothing is returned. Note that "deserialiser" and "query" are mutually exclusive; if debugging is enabled an AssertionError will be raised if both are non-None/non-False, otherwise "query" is ignored. Errors: If "query" is not true any non-zero return code will cause CalledProcessError to be raised. """ new = copy(self) new.extend(args) return new assert not deserialiser or not query data = ( b"" if input is None else input.encode() if isinstance(input, str) else bytes(input) ) cmd = self.get_arguments( [*self, *args], kwargs, has_input=bool(data), is_query=query, deserialiser=deserialiser, ) if deserialiser: return await aexec_io(cmd, input=data, deserialiser=deserialiser, **kwargs) rcode = await aexec_io(cmd, input=data, **kwargs) if query: return rcode if 0 != rcode: raise CalledProcessError(rcode, ' '.join(coerce_args(cmd))) return None
tests/unit/proc/test_async.py +101 −0 Original line number Diff line number Diff line Loading @@ -10,6 +10,7 @@ Unit tests for the async features of behave_utils.proc import io import os import subprocess import sys from sys import executable as python from typing import Any Loading Loading @@ -102,3 +103,103 @@ class ExecIOTests(TestCase): with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) self.assertEqual(code, 0) class ExecutorTests(TestCase): """ Tests for the behave_utils.proc.AsyncExecutor class """ @trio_test() async def test_deserialiser(self) -> None: """ Check that calling with a deserialiser correctly deserialises output """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(deserialiser=bytes): output: Any = await exe("json", deserialiser=bytes) self.assertIsInstance(output, bytes) with self.subTest(deserialiser=json.JSONObject): output = await exe("json", deserialiser=json.JSONObject.from_string) self.assertIsInstance(output, json.JSONObject) @trio_test() async def test_input(self) -> None: """ Check that calling with the "input" argument passes bytes to stdin """ exe = proc.AsyncExecutor(*FIXTURE_CMD) output = await exe("echo", input=TEST_BYTES, deserialiser=bytes) self.assertEqual(output, TEST_BYTES) @trio_test() async def test_stdout(self) -> None: """ Check that calling with the "stdout" argument receives bytes from stdout """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(stdout="BytesIO"): bbuff = io.BytesIO() await exe("echo", input=TEST_BYTES, stdout=bbuff) bbuff.seek(0) self.assertEqual(bbuff.read(), TEST_BYTES) with self.subTest(stdout="StringIO"): sbuff = io.StringIO() await exe("echo", input=TEST_BYTES, stdout=sbuff) sbuff.seek(0) self.assertEqual(sbuff.read(), TEST_BYTES.decode()) with self.subTest(stdout="pipe"): read_fd, write_fd = os.pipe() await exe("echo", input=TEST_BYTES, stdout=write_fd) os.close(write_fd) with io.open(read_fd, mode="rb") as pipe: self.assertEqual(pipe.read(), TEST_BYTES) @trio_test() async def test_return_code(self) -> None: """ Check that the "query" argument behaves as expected """ exe = proc.AsyncExecutor(*FIXTURE_CMD) with self.subTest(query=True): await exe("rcode", "--code=3", query=True) with self.subTest(query=False), self.assertRaises(subprocess.CalledProcessError): await exe("rcode", "--code=3", query=False) with self.subTest(query=None), self.assertRaises(subprocess.CalledProcessError): await exe("rcode", "--code=3") @trio_test() async def test_subcommand(self) -> None: """ Check that the subcommand method returns a new instance with appended arguments """ class NewExecutor(proc.AsyncExecutor): ... with self.subTest(cls=proc.AsyncExecutor): exe = proc.AsyncExecutor("foo", "bar").subcommand("baz") self.assertIsInstance(exe, proc.AsyncExecutor) self.assertListEqual(exe, ["foo", "bar", "baz"]) with self.subTest(cls=NewExecutor): exe = NewExecutor("foo", "bar").subcommand("baz") self.assertIsInstance(exe, NewExecutor) self.assertListEqual(exe, ["foo", "bar", "baz"])