Loading kilter/service/runner.py +19 −4 Original line number Diff line number Diff line Loading @@ -55,6 +55,8 @@ _DISABLE_PROTOCOL_FLAGS: Final = ProtocolFlags.NO_CONNECT | ProtocolFlags.NO_HEL ProtocolFlags.NR_UNKNOWN | ProtocolFlags.NR_EOH | ProtocolFlags.NR_BODY | \ ProtocolFlags.NR_HEADER _logger = logging.getLogger(__package__) class NegotiationError(Exception): """ Loading Loading @@ -118,6 +120,8 @@ class Runner: await runner.aclose() return for message in proto.read_from(buff): if __debug__: _logger.debug(f"received: {message}") match message: case Negotiate(): await sender.asend(await self._negotiate(message, sender)) Loading @@ -142,6 +146,8 @@ class Runner: await sender.asend(await runner.message_events(message)) # type: ignore[arg-type] async def _negotiate(self, message: Negotiate, sender: Sender) -> Negotiate: _logger.info("Negotiating with MTA") # TODO: actually negotiate what the filter wants, not just "everything" actions = set(ActionFlags) # All actions! if actions != ActionFlags.unpack(message.action_flags): Loading @@ -162,6 +168,7 @@ class Runner: runner: _TaskRunner, macro: Macro|None, ) -> ResponseMessage: _logger.info(f"Client connected from {message.hostname}") for fltr in self.filters: session = Session(message, sender, _Broadcast()) runner.add_filter(fltr, session) Loading Loading @@ -205,7 +212,10 @@ class _TaskRunner: pass case Reject()|Discard()|ReplyCode() as resp: if not first_connect: logging.warning("Unexpected response from filter after restart") _logger.warning( f"Ignoring unexpected response from filter after restart: " f"{qualname(flter)} -> {resp}", ) continue return resp case _ as arg: # pragma: no-cover Loading Loading @@ -241,6 +251,8 @@ class _TaskRunner: ) async def abort(self, abort: Abort) -> None: if self.channels: _logger.info("Aborting filters") for channel in self.channels: await channel.send(abort) await channel.receive() Loading @@ -248,6 +260,7 @@ class _TaskRunner: del self.channels[:] async def aclose(self) -> None: _logger.info("Closing runners") self.tasks.cancel_scope.cancel() del self.channels[:] Loading @@ -272,10 +285,10 @@ class _TaskRunner: try: final_resp = await fltr(session) except Aborted: logging.info(f"aborted filter {qualname(fltr)}") _logger.debug(f"Aborted filter {qualname(fltr)}") return except Exception: logging.exception(f"error in filter {qualname(fltr)}") _logger.exception(f"Error in filter {qualname(fltr)}") final_resp = TemporaryFailure() if not isinstance(final_resp, _VALID_FINAL_RESPONSES): warn(f"expected a valid response from {qualname(fltr)}, got {final_resp}") Loading Loading @@ -315,6 +328,8 @@ def _make_message_channel() -> tuple[MessageChannel, MessageChannel]: async def _sender(client: anyio.abc.ByteSendStream, proto: FilterProtocol) -> Sender: buff = SimpleBuffer(1*kiB) while 1: proto.write_to(buff, (yield)) proto.write_to(buff, (message := (yield))) if __debug__: _logger.debug(f"sent: {message}") await client.send(buff[:]) del buff[:] Loading
kilter/service/runner.py +19 −4 Original line number Diff line number Diff line Loading @@ -55,6 +55,8 @@ _DISABLE_PROTOCOL_FLAGS: Final = ProtocolFlags.NO_CONNECT | ProtocolFlags.NO_HEL ProtocolFlags.NR_UNKNOWN | ProtocolFlags.NR_EOH | ProtocolFlags.NR_BODY | \ ProtocolFlags.NR_HEADER _logger = logging.getLogger(__package__) class NegotiationError(Exception): """ Loading Loading @@ -118,6 +120,8 @@ class Runner: await runner.aclose() return for message in proto.read_from(buff): if __debug__: _logger.debug(f"received: {message}") match message: case Negotiate(): await sender.asend(await self._negotiate(message, sender)) Loading @@ -142,6 +146,8 @@ class Runner: await sender.asend(await runner.message_events(message)) # type: ignore[arg-type] async def _negotiate(self, message: Negotiate, sender: Sender) -> Negotiate: _logger.info("Negotiating with MTA") # TODO: actually negotiate what the filter wants, not just "everything" actions = set(ActionFlags) # All actions! if actions != ActionFlags.unpack(message.action_flags): Loading @@ -162,6 +168,7 @@ class Runner: runner: _TaskRunner, macro: Macro|None, ) -> ResponseMessage: _logger.info(f"Client connected from {message.hostname}") for fltr in self.filters: session = Session(message, sender, _Broadcast()) runner.add_filter(fltr, session) Loading Loading @@ -205,7 +212,10 @@ class _TaskRunner: pass case Reject()|Discard()|ReplyCode() as resp: if not first_connect: logging.warning("Unexpected response from filter after restart") _logger.warning( f"Ignoring unexpected response from filter after restart: " f"{qualname(flter)} -> {resp}", ) continue return resp case _ as arg: # pragma: no-cover Loading Loading @@ -241,6 +251,8 @@ class _TaskRunner: ) async def abort(self, abort: Abort) -> None: if self.channels: _logger.info("Aborting filters") for channel in self.channels: await channel.send(abort) await channel.receive() Loading @@ -248,6 +260,7 @@ class _TaskRunner: del self.channels[:] async def aclose(self) -> None: _logger.info("Closing runners") self.tasks.cancel_scope.cancel() del self.channels[:] Loading @@ -272,10 +285,10 @@ class _TaskRunner: try: final_resp = await fltr(session) except Aborted: logging.info(f"aborted filter {qualname(fltr)}") _logger.debug(f"Aborted filter {qualname(fltr)}") return except Exception: logging.exception(f"error in filter {qualname(fltr)}") _logger.exception(f"Error in filter {qualname(fltr)}") final_resp = TemporaryFailure() if not isinstance(final_resp, _VALID_FINAL_RESPONSES): warn(f"expected a valid response from {qualname(fltr)}, got {final_resp}") Loading Loading @@ -315,6 +328,8 @@ def _make_message_channel() -> tuple[MessageChannel, MessageChannel]: async def _sender(client: anyio.abc.ByteSendStream, proto: FilterProtocol) -> Sender: buff = SimpleBuffer(1*kiB) while 1: proto.write_to(buff, (yield)) proto.write_to(buff, (message := (yield))) if __debug__: _logger.debug(f"sent: {message}") await client.send(buff[:]) del buff[:]