Commit b0daed63 authored by Dom Sekotill's avatar Dom Sekotill
Browse files

Add async .proc.aexec_io function

parent 36bf0446
Loading
Loading
Loading
Loading
+49 −0
Original line number Diff line number Diff line
@@ -112,6 +112,55 @@ def exec_io(
	return proc.returncode


@overload
async def aexec_io(
	cmd: Arguments, *,
	input: bytes = b'',
	deserialiser: Deserialiser[T],
	**kwargs: Any,
) -> T: ...


@overload
async def aexec_io(
	cmd: Arguments, *,
	input: bytes = b'',
	deserialiser: None = None,
	**kwargs: Any,
) -> int: ...


async def aexec_io(
	cmd: Arguments, *,
	input: bytes = b"",
	deserialiser: Deserialiser[Any]|None = None,
	**kwargs: Any,
) -> Any:
	"""
	Execute a command asynchronously, handling writing output to sys.stdout and sys.stderr

	If input is provided it will be fed to the process' stdin.
	If a deserialiser is provided it will be used to parse stdout data from the process.

	Stderr and stdout (if no deserialiser is provided) will be written to `sys.stderr` and
	`sys.stdout` respectively.

	Note that the data is written, not redirected.  If either `sys.stdout` or `sys.stderr`
	is changed to an IO-like object with no file descriptor, this will still work.
	"""
	if deserialiser and 'stdout' in kwargs:
		raise TypeError("Cannot provide 'deserialiser' with 'stdout' argument")
	if input and 'stdin' in kwargs:
		raise TypeError("Cannot provide 'input' with 'stdin' argument")
	stdout: IO[str]|IO[bytes]|int = io.BytesIO() if deserialiser else kwargs.pop('stdout', sys.stdout)
	stderr: IO[str]|IO[bytes]|int = kwargs.pop('stderr', sys.stderr)
	proc = await _exec_io(cmd, input, stdout, stderr, kwargs)
	if deserialiser:
		assert isinstance(stdout, io.BytesIO)
		return deserialiser(stdout.getbuffer())
	return proc.returncode


async def _exec_io(
	cmd: Arguments,
	data: bytes,
+104 −0
Original line number Diff line number Diff line
#  Copyright 2022  Dominik Sekotill <dom.sekotill@kodo.org.uk>
#
#  This Source Code Form is subject to the terms of the Mozilla Public
#  License, v. 2.0. If a copy of the MPL was not distributed with this
#  file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
Unit tests for the async features of behave_utils.proc
"""

import io
import os
import sys
from sys import executable as python
from typing import Any

from behave_utils import json
from behave_utils import proc

from .. import TestCase
from ..trio import trio_test

FIXTURE_CMD = [python, "-BESsm", "tests.unit.proc.fixture_output"]
JSON_OUTPUT = [*FIXTURE_CMD, "json"]
ECHO_OUTPUT = [*FIXTURE_CMD, "echo"]

TEST_BYTES = b"""lorem ipsum dolorum"""


class ExecIOTests(TestCase):
	"""
	Tests for the behave_utils.proc.exec_io function
	"""

	@trio_test()
	async def test_deserialiser(self) -> None:
		"""
		Check that calling with a deserialiser correctly deserialises output
		"""
		with self.subTest(deserialiser=bytes):
			output: Any = await proc.aexec_io(JSON_OUTPUT, deserialiser=bytes)
			self.assertIsInstance(output, bytes)

		with self.subTest(deserialiser=json.JSONObject):
			output = await proc.aexec_io(JSON_OUTPUT, deserialiser=json.JSONObject.from_string)
			self.assertIsInstance(output, json.JSONObject)

	@trio_test()
	async def test_deserialiser_with_stdout(self) -> None:
		"""
		Check that calling with both deserialiser and stdout raises TypeError
		"""
		with self.assertRaises(TypeError):
			await proc.aexec_io(ECHO_OUTPUT, stdout=sys.stdout, deserialiser=bytes)

	@trio_test()
	async def test_input(self) -> None:
		"""
		Check that calling with the "input" argument passes bytes to stdin
		"""
		output = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, deserialiser=bytes)

		self.assertEqual(output, TEST_BYTES)

	@trio_test()
	async def test_input_with_stdin(self) -> None:
		"""
		Check that calling with both "input" and "stdin" arguments raises TypeError
		"""
		with self.assertRaises(TypeError):
			await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdin=sys.stdin)

	@trio_test()
	async def test_stdout(self) -> None:
		"""
		Check that calling with the "stdout" argument receives bytes from stdout
		"""
		with self.subTest(stdout="BytesIO"):
			bbuff = io.BytesIO()

			code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=bbuff)

			bbuff.seek(0)
			self.assertEqual(code, 0)
			self.assertEqual(bbuff.read(), TEST_BYTES)

		with self.subTest(stdout="StringIO"):
			sbuff = io.StringIO()

			code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=sbuff)

			sbuff.seek(0)
			self.assertEqual(code, 0)
			self.assertEqual(sbuff.read(), TEST_BYTES.decode())

		with self.subTest(stdout="pipe"):
			read_fd, write_fd = os.pipe()

			code = await proc.aexec_io(ECHO_OUTPUT, input=TEST_BYTES, stdout=write_fd)

			os.close(write_fd)
			with io.open(read_fd, mode="rb") as pipe:
				self.assertEqual(pipe.read(), TEST_BYTES)
				self.assertEqual(code, 0)

tests/unit/trio.py

0 → 100644
+41 −0
Original line number Diff line number Diff line
#  Copyright 2022  Dominik Sekotill <dom.sekotill@kodo.org.uk>
#
#  This Source Code Form is subject to the terms of the Mozilla Public
#  License, v. 2.0. If a copy of the MPL was not distributed with this
#  file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
Utilities for writing trio-enabled async test code
"""

from __future__ import annotations

from collections.abc import Awaitable
from collections.abc import Callable
from functools import partial
from functools import wraps
from typing import TYPE_CHECKING

from trio import run
from trio.abc import Clock

if TYPE_CHECKING:
	from typing import TypeVar

	from typing_extensions import ParamSpec

	T = TypeVar("T")
	P = ParamSpec("P")


def trio_test(clock: Clock|None = None) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, T]]:
	"""
	Return a decorator that wraps async test functions with a runner
	"""
	def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, T]:
		@wraps(func)
		def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
			# https://github.com/python-trio/trio-typing/issues/58
			return run(partial(func, *args, **kwargs), clock=clock)  # type: ignore
		return wrapper
	return decorator