Loading kilter/service/runner.py +8 −6 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ from __future__ import annotations import logging from collections.abc import AsyncGenerator from typing import TYPE_CHECKING from typing import Final from typing import TypeAlias from typing import TypeVar from warnings import warn Loading @@ -33,16 +34,17 @@ from .session import * from .util import Broadcast from .util import qualname MessageChannel = anyio.abc.ObjectStream[Message] Sender = AsyncGenerator[None, Message] MessageChannel: TypeAlias = anyio.abc.ObjectStream[Message] Sender: TypeAlias = AsyncGenerator[None, Message] kiB = 2**10 MiB = 2**20 kiB: Final = 2**10 MiB: Final = 2**20 _VALID_FINAL_RESPONSES = Reject, Discard, Accept, TemporaryFailure, ReplyCode # TODO: Convert to Union type alias once python/mypy#14242 is fixed _VALID_FINAL_RESPONSES: Final = Reject, Discard, Accept, TemporaryFailure, ReplyCode _VALID_EVENT_MESSAGE: TypeAlias = Helo | EnvelopeFrom | EnvelopeRecipient | Data | \ Unknown | Header | EndOfHeaders | Body | EndOfMessage | Abort _DISABLE_PROTOCOL_FLAGS = ProtocolFlags.NO_CONNECT | ProtocolFlags.NO_HELO | \ _DISABLE_PROTOCOL_FLAGS: Final = ProtocolFlags.NO_CONNECT | ProtocolFlags.NO_HELO | \ ProtocolFlags.NO_SENDER | ProtocolFlags.NO_RECIPIENT | ProtocolFlags.NO_BODY | \ ProtocolFlags.NO_HEADERS | ProtocolFlags.NO_EOH | ProtocolFlags.NO_UNKNOWN | \ ProtocolFlags.NO_DATA | ProtocolFlags.NR_CONNECT | ProtocolFlags.NR_HELO | \ Loading Loading
kilter/service/runner.py +8 −6 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ from __future__ import annotations import logging from collections.abc import AsyncGenerator from typing import TYPE_CHECKING from typing import Final from typing import TypeAlias from typing import TypeVar from warnings import warn Loading @@ -33,16 +34,17 @@ from .session import * from .util import Broadcast from .util import qualname MessageChannel = anyio.abc.ObjectStream[Message] Sender = AsyncGenerator[None, Message] MessageChannel: TypeAlias = anyio.abc.ObjectStream[Message] Sender: TypeAlias = AsyncGenerator[None, Message] kiB = 2**10 MiB = 2**20 kiB: Final = 2**10 MiB: Final = 2**20 _VALID_FINAL_RESPONSES = Reject, Discard, Accept, TemporaryFailure, ReplyCode # TODO: Convert to Union type alias once python/mypy#14242 is fixed _VALID_FINAL_RESPONSES: Final = Reject, Discard, Accept, TemporaryFailure, ReplyCode _VALID_EVENT_MESSAGE: TypeAlias = Helo | EnvelopeFrom | EnvelopeRecipient | Data | \ Unknown | Header | EndOfHeaders | Body | EndOfMessage | Abort _DISABLE_PROTOCOL_FLAGS = ProtocolFlags.NO_CONNECT | ProtocolFlags.NO_HELO | \ _DISABLE_PROTOCOL_FLAGS: Final = ProtocolFlags.NO_CONNECT | ProtocolFlags.NO_HELO | \ ProtocolFlags.NO_SENDER | ProtocolFlags.NO_RECIPIENT | ProtocolFlags.NO_BODY | \ ProtocolFlags.NO_HEADERS | ProtocolFlags.NO_EOH | ProtocolFlags.NO_UNKNOWN | \ ProtocolFlags.NO_DATA | ProtocolFlags.NR_CONNECT | ProtocolFlags.NR_HELO | \ Loading