Loading kilter/service/runner.py +19 −18 Original line number Diff line number Diff line Loading @@ -264,7 +264,7 @@ class _TaskRunner: continue if not needs_response: _logger.warning( f"Unexpected response from filter {flter}", f"Unexpected response from filter {qualname(flter)}", ) return _CloseFilter(flter) return resp Loading Loading @@ -295,31 +295,31 @@ class _TaskRunner: continue case Continue(): skip = False case Accept(): await self.close_channel(channel) case Accept() as resp: flter = await self.close_channel(channel) if len(self.channels) == 0: _logger.info(f"Returning response Accept from {qualname(flter)}") return resp _logger.info(f"Holding response Accept from {qualname(flter)}") case (Reject() | Discard() | TemporaryFailure() | ReplyCode()) as resp: await self.close_channel(channel) flter = await self.close_channel(channel) if not needs_response: flter = self.channels[channel] _logger.warning( f"Unexpected response from filter {flter}", ) _logger.warning(f"Unexpected response from filter {qualname(flter)}") return _CloseFilter(flter) _logger.info(f"Returning response {type(resp).__name__} from {qualname(flter)}") return resp assert len(self.channels) > 0, "Running filters reached zero without a response?!" if not needs_response: return None return ( Accept() if len(self.channels) == 0 else Skip() if skip else Continue() ) return Skip() if skip else Continue() async def close_channel(self, channel: MessageChannel) -> None: async def close_channel(self, channel: MessageChannel) -> Filter: await channel.aclose() del self.channels[channel] return self.channels.pop(channel) async def abort(self, abort: Abort) -> None: if self.channels: if not self.channels: return _logger.info("Aborting filters") for channel in self.channels: await channel.send(abort) Loading @@ -328,7 +328,8 @@ class _TaskRunner: self.channels.clear() async def aclose(self) -> None: _logger.info("Closing runners") if self.channels: _logger.info("Closing filters") self.tasks.cancel_scope.cancel() self.channels.clear() Loading Loading
kilter/service/runner.py +19 −18 Original line number Diff line number Diff line Loading @@ -264,7 +264,7 @@ class _TaskRunner: continue if not needs_response: _logger.warning( f"Unexpected response from filter {flter}", f"Unexpected response from filter {qualname(flter)}", ) return _CloseFilter(flter) return resp Loading Loading @@ -295,31 +295,31 @@ class _TaskRunner: continue case Continue(): skip = False case Accept(): await self.close_channel(channel) case Accept() as resp: flter = await self.close_channel(channel) if len(self.channels) == 0: _logger.info(f"Returning response Accept from {qualname(flter)}") return resp _logger.info(f"Holding response Accept from {qualname(flter)}") case (Reject() | Discard() | TemporaryFailure() | ReplyCode()) as resp: await self.close_channel(channel) flter = await self.close_channel(channel) if not needs_response: flter = self.channels[channel] _logger.warning( f"Unexpected response from filter {flter}", ) _logger.warning(f"Unexpected response from filter {qualname(flter)}") return _CloseFilter(flter) _logger.info(f"Returning response {type(resp).__name__} from {qualname(flter)}") return resp assert len(self.channels) > 0, "Running filters reached zero without a response?!" if not needs_response: return None return ( Accept() if len(self.channels) == 0 else Skip() if skip else Continue() ) return Skip() if skip else Continue() async def close_channel(self, channel: MessageChannel) -> None: async def close_channel(self, channel: MessageChannel) -> Filter: await channel.aclose() del self.channels[channel] return self.channels.pop(channel) async def abort(self, abort: Abort) -> None: if self.channels: if not self.channels: return _logger.info("Aborting filters") for channel in self.channels: await channel.send(abort) Loading @@ -328,7 +328,8 @@ class _TaskRunner: self.channels.clear() async def aclose(self) -> None: _logger.info("Closing runners") if self.channels: _logger.info("Closing filters") self.tasks.cancel_scope.cancel() self.channels.clear() Loading