Loading kilter/protocol/messages.py +78 −1 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ from __future__ import annotations from abc import ABCMeta from abc import abstractmethod from collections.abc import Collection from collections.abc import Iterable from collections.abc import Iterator from collections.abc import Mapping from dataclasses import dataclass Loading @@ -41,7 +42,7 @@ if TYPE_CHECKING: LONG = Struct("!L") __all__ = [ "Family", "Stage", "Family", "Stage", "BitField", "ActionFlags", "ProtocolFlags", "Message", "Negotiate", "Macro", "Connect", "Helo", "EnvelopeFrom", "EnvelopeRecipient", "Data", "Unknown", "Header", "EndOfHeaders", "Body", "EndOfMessage", "Abort", "Close", "Continue", "Reject", "Discard", "Accept", Loading Loading @@ -110,6 +111,82 @@ class Stage(int, Enum): END_HEADERS = 6 BFSelf = TypeVar("BFSelf", bound="BitField") class BitField(int, Enum): """ Base class for bit-field enums like ActionFlags and ProtocolFlags """ @classmethod def pack(cls: type[BFSelf], flags: Iterable[BFSelf]) -> int: rflag = 0 for flag in flags: rflag |= flag return rflag @classmethod def unpack(cls: type[BFSelf], bitfield: int) -> set[BFSelf]: return {flag for flag in cls if flag & bitfield} class ActionFlags(BitField): """ Bit-field values for the `Negotiate.action_flags` field of the `Negotiate` message The values correspond to the `SPFIF_*` codes as described in https://pythonhosted.org/pymilter/milter_api/smfi_register.html#flags """ ADD_HEADERS = ADDHDRS = 0x1 CHANGE_HEADERS = CHGHDRS = 0x10 CHANGE_BODY = CHGBODY = 0x2 ADD_RECIPIENT = ADDRCPT = 0x4 ADD_RECIPIENT_PAR = ADDRCPT_PAR = 0x80 DELETE_RECIPIENT = DELRCPT = 0x8 QUARANTINE = 0x20 CHANGE_FROM = CHGFROM = 0x40 SETSYMLIST = 0x100 class ProtocolFlags(BitField): """ Bit-field values for the `Negotiate.protocol_flags` field of the `Negotiate` message The values correspond to the `SMFIP_*` codes described in https://pythonhosted.org/pymilter/milter_api/xxfi_negotiate.html """ NO_CONNECT = 0x1 NO_HELO = 0x2 NO_SENDER = NO_MAIL = 0x4 NO_RECIPIENT = NO_RCPT = 0x8 NO_BODY = 0x10 NO_HEADERS = NO_HDRS = 0x20 NO_END_OF_HEADERS = NO_EOH = 0x40 NO_UNKNOWN = 0x100 NO_DATA = 0x200 SKIP = 0x400 REJECTEDT_RECIPIENT = RCPT_REJ = 0x800 NR_CONNECT = NR_CONN = 0x1000 NR_HELO = 0x2000 NR_SENDER = NR_MAIL = 0x4000 NR_RECIPIENT = NR_RCPT = 0x8000 NR_DATA = 0x10000 NR_UNKNOWN = NR_UNKN = 0x20000 NR_END_OF_HEADERS = NR_EOH = 0x40000 NR_BODY = 0x80000 NR_HEADER = NR_HDR = 0x80 HEADER_LEADING_SPACE = HDR_LEADSPC = 0x100000 MAX_DATA_SIZE_256K = MDS_256K = 0x10000000 MAX_DATA_SIZE_1M = MDS_1M = 0x20000000 class Message(metaclass=ABCMeta): r""" A base class for all messages which also handles packing and unpacking messages Loading tests/test_bitfields.py 0 → 100644 +36 −0 Original line number Diff line number Diff line """ Tests for BitField classes in the kilter.protocol.messages module """ from __future__ import annotations from unittest import TestCase from kilter.protocol.messages import ActionFlags class ActionFlagsTests(TestCase): """ Tests for packing and unpacking ActionFlags """ def test_pack(self) -> None: """ Check that packing a set of ActionFlag values into an int works """ flags = {ActionFlags.ADD_HEADERS, ActionFlags.CHANGE_BODY, ActionFlags.QUARANTINE} bitfield = ActionFlags.pack(flags) assert bitfield == 0x23 def test_unpack(self) -> None: """ Check that unpacking a set of ActionFlags from an int works """ flags = ActionFlags.unpack(0x23) self.assertSetEqual( flags, {ActionFlags.ADD_HEADERS, ActionFlags.CHANGE_BODY, ActionFlags.QUARANTINE}, ) Loading
kilter/protocol/messages.py +78 −1 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ from __future__ import annotations from abc import ABCMeta from abc import abstractmethod from collections.abc import Collection from collections.abc import Iterable from collections.abc import Iterator from collections.abc import Mapping from dataclasses import dataclass Loading @@ -41,7 +42,7 @@ if TYPE_CHECKING: LONG = Struct("!L") __all__ = [ "Family", "Stage", "Family", "Stage", "BitField", "ActionFlags", "ProtocolFlags", "Message", "Negotiate", "Macro", "Connect", "Helo", "EnvelopeFrom", "EnvelopeRecipient", "Data", "Unknown", "Header", "EndOfHeaders", "Body", "EndOfMessage", "Abort", "Close", "Continue", "Reject", "Discard", "Accept", Loading Loading @@ -110,6 +111,82 @@ class Stage(int, Enum): END_HEADERS = 6 BFSelf = TypeVar("BFSelf", bound="BitField") class BitField(int, Enum): """ Base class for bit-field enums like ActionFlags and ProtocolFlags """ @classmethod def pack(cls: type[BFSelf], flags: Iterable[BFSelf]) -> int: rflag = 0 for flag in flags: rflag |= flag return rflag @classmethod def unpack(cls: type[BFSelf], bitfield: int) -> set[BFSelf]: return {flag for flag in cls if flag & bitfield} class ActionFlags(BitField): """ Bit-field values for the `Negotiate.action_flags` field of the `Negotiate` message The values correspond to the `SPFIF_*` codes as described in https://pythonhosted.org/pymilter/milter_api/smfi_register.html#flags """ ADD_HEADERS = ADDHDRS = 0x1 CHANGE_HEADERS = CHGHDRS = 0x10 CHANGE_BODY = CHGBODY = 0x2 ADD_RECIPIENT = ADDRCPT = 0x4 ADD_RECIPIENT_PAR = ADDRCPT_PAR = 0x80 DELETE_RECIPIENT = DELRCPT = 0x8 QUARANTINE = 0x20 CHANGE_FROM = CHGFROM = 0x40 SETSYMLIST = 0x100 class ProtocolFlags(BitField): """ Bit-field values for the `Negotiate.protocol_flags` field of the `Negotiate` message The values correspond to the `SMFIP_*` codes described in https://pythonhosted.org/pymilter/milter_api/xxfi_negotiate.html """ NO_CONNECT = 0x1 NO_HELO = 0x2 NO_SENDER = NO_MAIL = 0x4 NO_RECIPIENT = NO_RCPT = 0x8 NO_BODY = 0x10 NO_HEADERS = NO_HDRS = 0x20 NO_END_OF_HEADERS = NO_EOH = 0x40 NO_UNKNOWN = 0x100 NO_DATA = 0x200 SKIP = 0x400 REJECTEDT_RECIPIENT = RCPT_REJ = 0x800 NR_CONNECT = NR_CONN = 0x1000 NR_HELO = 0x2000 NR_SENDER = NR_MAIL = 0x4000 NR_RECIPIENT = NR_RCPT = 0x8000 NR_DATA = 0x10000 NR_UNKNOWN = NR_UNKN = 0x20000 NR_END_OF_HEADERS = NR_EOH = 0x40000 NR_BODY = 0x80000 NR_HEADER = NR_HDR = 0x80 HEADER_LEADING_SPACE = HDR_LEADSPC = 0x100000 MAX_DATA_SIZE_256K = MDS_256K = 0x10000000 MAX_DATA_SIZE_1M = MDS_1M = 0x20000000 class Message(metaclass=ABCMeta): r""" A base class for all messages which also handles packing and unpacking messages Loading
tests/test_bitfields.py 0 → 100644 +36 −0 Original line number Diff line number Diff line """ Tests for BitField classes in the kilter.protocol.messages module """ from __future__ import annotations from unittest import TestCase from kilter.protocol.messages import ActionFlags class ActionFlagsTests(TestCase): """ Tests for packing and unpacking ActionFlags """ def test_pack(self) -> None: """ Check that packing a set of ActionFlag values into an int works """ flags = {ActionFlags.ADD_HEADERS, ActionFlags.CHANGE_BODY, ActionFlags.QUARANTINE} bitfield = ActionFlags.pack(flags) assert bitfield == 0x23 def test_unpack(self) -> None: """ Check that unpacking a set of ActionFlags from an int works """ flags = ActionFlags.unpack(0x23) self.assertSetEqual( flags, {ActionFlags.ADD_HEADERS, ActionFlags.CHANGE_BODY, ActionFlags.QUARANTINE}, )