Verified Commit 533476be authored by Dom Sekotill's avatar Dom Sekotill
Browse files

Implement negotiation

parent 6e3eba11
Loading
Loading
Loading
Loading
+222 −0
Original line number Diff line number Diff line
# Copyright 2023 Dominik Sekotill <dom.sekotill@kodo.org.uk>
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
Filter decorators for marking the requested protocol options and actions used
"""

from __future__ import annotations

from typing import Callable
from typing import Literal
from typing import NamedTuple

from kilter.protocol.messages import ActionFlags
from kilter.protocol.messages import ProtocolFlags

from .session import Filter

__all__ = [
	"responds_to_connect", "examine_helo",
	"examine_sender", "examine_recipients",
	"examine_headers", "examine_body",
	"get_flags", "modify_flags",
]

Decorator = Callable[[Filter], Filter]
SIZES = Literal[ProtocolFlags.NONE, ProtocolFlags.MDS_256K, ProtocolFlags.MDS_1M]

FLAGS_ATTRIBUTE = "filter_flags"

NR_FLAGS = \
	ProtocolFlags.NR_CONNECT | ProtocolFlags.NR_HELO | \
	ProtocolFlags.NR_SENDER | ProtocolFlags.NR_RECIPIENT | \
	ProtocolFlags.NR_DATA | ProtocolFlags.NR_BODY | \
	ProtocolFlags.NR_HEADER | ProtocolFlags.NR_END_OF_HEADERS | \
	ProtocolFlags.NR_UNKNOWN


class FlagsTuple(NamedTuple):

	unset_options: ProtocolFlags = ProtocolFlags.NONE
	set_options: ProtocolFlags = ProtocolFlags.NONE
	set_actions: ActionFlags = ActionFlags.NONE


def modify_flags(
	set_options: ProtocolFlags = ProtocolFlags.NONE,
	unset_options: ProtocolFlags = ProtocolFlags.NONE,
	set_actions: ActionFlags = ActionFlags.NONE,
) -> Decorator:
	"""
	Return a decorator that modifies the given flags on a decorated filter
	"""
	def decorator(filtr: Filter) -> Filter:
		flags = _get_flags(filtr, FlagsTuple())
		flags = FlagsTuple(
			flags.unset_options|unset_options,
			flags.set_options|set_options,
			flags.set_actions|set_actions,
		)
		setattr(filtr, FLAGS_ATTRIBUTE, flags)
		return filtr
	return decorator


def get_flags(filtr: Filter) -> FlagsTuple:
	"""
	Return the flags attached to a filter
	"""
	default = FlagsTuple(unset_options=NR_FLAGS, set_actions=ActionFlags(0x1ff))
	return _get_flags(filtr, default)


def _get_flags(filtr: Filter, default: FlagsTuple) -> FlagsTuple:
	assert isinstance(getattr(filtr, FLAGS_ATTRIBUTE, default), FlagsTuple)
	return getattr(filtr, FLAGS_ATTRIBUTE, default)


def responds_to_connect() -> Decorator:
	"""
	Mark a filter as possibly delivering a non-continue response to Connect events
	"""
	return modify_flags(unset_options=ProtocolFlags.NR_CONNECT)


def examine_helo(
	can_respond: bool = False,
) -> Decorator:
	"""
	Mark a filter as needing to examine the HELO command

	If `can_respond` is `False` the filter runner will attempt to negotiate faster event
	delivery by disabling the need to respond to this event.
	"""
	unset = ProtocolFlags.NO_HELO
	if can_respond:
		unset |= ProtocolFlags.NR_HELO
	return modify_flags(unset_options=unset)


def examine_sender(
	can_respond: bool = False,
	can_replace: bool = False,
) -> Decorator:
	"""
	Mark a filter as needing to examine and optionally replace the RCPT FROM sender

	If `can_respond` is `False` the filter runner will attempt to negotiate faster event
	delivery by disabling the need to respond to this event.

	If `can_replace` is `True` but is not offered by the MTA an exception will be raised
	during negotiation and the filter will be disabled.
	"""
	unset = ProtocolFlags.NO_SENDER
	if can_respond:
		unset |= ProtocolFlags.NR_SENDER
	return modify_flags(
		unset_options=unset,
		set_actions=ActionFlags.CHANGE_FROM if can_replace else ActionFlags.NONE,
	)


def examine_recipients(
	can_respond: bool = False,
	can_add: bool = False,
	can_remove: bool = False,
	include_rejected: bool= False,
	with_parameters: bool = False,
) -> Decorator:
	"""
	Mark a filter as needing to examine and optionally modify the RCPT TO recipients

	If `can_respond` is `False` the filter runner will attempt to negotiate faster event
	delivery by disabling the need to respond to this event.

	If `include_rejected` is `True` the recipients available to the filter will include any
	that the MTA or another filter has already rejected.

	The option `with_parameters` enables the use of RFC-1425 [section 6] extensions for
	"MAIL" commands (ratified by RFC-5321) when adding recipients.  The specific details of
	any extension parameters will be dependent on the MTA.

	If a requested option or update action is not offered by the MTA an exception will be
	raised during negotiation and the filter will be disabled.
	"""
	unset = ProtocolFlags.NO_RECIPIENT
	opts = ProtocolFlags.NONE
	acts = ActionFlags.NONE
	if can_respond:
		unset |= ProtocolFlags.NR_RECIPIENT
	if can_add:
		acts |= ActionFlags.ADD_RECIPIENT
	if can_add and with_parameters:
		acts |= ActionFlags.ADD_RECIPIENT_PAR
	if can_remove:
		acts |= ActionFlags.DELETE_RECIPIENT
	if include_rejected:
		opts |= ProtocolFlags.REJECTED_RECIPIENT
	return modify_flags(unset_options=unset, set_options=opts, set_actions=acts)


def examine_headers(
	can_respond: bool = False,
	can_add: bool = False,
	can_modify: bool = False,
	leading_space: bool = False,
) -> Decorator:
	"""
	Mark a filter as needing to examine and optionally add or modify message headers

	If `can_respond` is `False` the filter runner will attempt to negotiate faster event
	delivery by disabling the need to respond to this event.

	If `leading_space` is `True` the headers will be delivered without any whitespace
	removed from values (i.e. after the separating colon). This is for filters which need
	the exact bytes contained in message headers.

	If a requested option or update action is not offered by the MTA an exception will be
	raised during negotiation and the filter will be disabled.
	"""
	unset = ProtocolFlags.NO_HEADERS
	opts = ProtocolFlags.NONE
	acts = ActionFlags.NONE
	if can_respond:
		unset |= ProtocolFlags.NR_HEADER
	if can_add:
		acts |= ActionFlags.ADD_HEADERS
	if can_modify:
		acts |= ActionFlags.CHANGE_HEADERS
	if leading_space:
		opts |= ProtocolFlags.HEADER_LEADING_SPACE
	return modify_flags(unset_options=unset, set_options=opts, set_actions=acts)


def examine_body(
	can_respond: bool = False,
	can_replace: bool = False,
	data_size: SIZES = ProtocolFlags.NONE,
) -> Decorator:
	"""
	Mark a filter as needing to examine and optionally replace message bodies

	If `can_respond` is `False` the filter runner will attempt to negotiate faster event
	delivery by disabling the need to respond to this event.

	The `data_size` option is a hint, and does not guarantee that the message will be
	delivered in blocks of that size. If `ProtocolFlags.NONE` (the default) the MTA's
	default will be used.

	If `can_replace` is `True` but is not offered by the MTA an exception will be raised
	during negotiation and the filter will be disabled.
	"""
	unset = ProtocolFlags.NO_BODY
	if can_respond:
		unset |= ProtocolFlags.NR_BODY
	return modify_flags(
		unset_options=unset, set_options=data_size,
		set_actions=ActionFlags.CHANGE_BODY if can_replace else ActionFlags.NONE,
	)
+25 −15
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ from kilter.protocol.core import FilterProtocol
from kilter.protocol.core import ResponseMessage
from kilter.protocol.messages import ProtocolFlags

from .options import get_flags
from .session import *
from .util import Broadcast
from .util import qualname
@@ -46,13 +47,6 @@ MiB: Final = 2**20
_VALID_FINAL_RESPONSES: Final = Reject, Discard, Accept, TemporaryFailure, ReplyCode
_VALID_EVENT_MESSAGE: TypeAlias = Helo | EnvelopeFrom | EnvelopeRecipient | Data | \
	Unknown | Header | EndOfHeaders | Body | EndOfMessage | Abort
_DISABLE_PROTOCOL_FLAGS: Final = ProtocolFlags.NO_CONNECT | ProtocolFlags.NO_HELO | \
	ProtocolFlags.NO_SENDER | ProtocolFlags.NO_RECIPIENT | ProtocolFlags.NO_BODY | \
	ProtocolFlags.NO_HEADERS | ProtocolFlags.NO_EOH | ProtocolFlags.NO_UNKNOWN | \
	ProtocolFlags.NO_DATA | ProtocolFlags.NR_CONNECT | ProtocolFlags.NR_HELO | \
	ProtocolFlags.NR_SENDER | ProtocolFlags.NR_RECIPIENT | ProtocolFlags.NR_DATA | \
	ProtocolFlags.NR_UNKNOWN | ProtocolFlags.NR_EOH | ProtocolFlags.NR_BODY | \
	ProtocolFlags.NR_HEADER

_logger = logging.getLogger(__package__)

@@ -149,17 +143,33 @@ class Runner:
	async def _negotiate(self, message: Negotiate) -> Negotiate:
		_logger.info("Negotiating with MTA")

		# TODO: actually negotiate what the filter wants, not just "everything"
		actions = sum(ActionFlags)  # All actions!
		if actions != message.action_flags:
			raise NegotiationError("MTA does not accept all actions required by the filter")
		optmask = ProtocolFlags.NONE
		options = \
			ProtocolFlags.SKIP | ProtocolFlags.NR_HELO | \
			ProtocolFlags.NR_SENDER | ProtocolFlags.NR_RECIPIENT | \
			ProtocolFlags.NR_DATA | ProtocolFlags.NR_BODY | \
			ProtocolFlags.NR_HEADER | ProtocolFlags.NR_END_OF_HEADERS
		actions = ActionFlags.NONE

		resp = Negotiate(6, message.action_flags, message.protocol_flags)
		resp.protocol_flags &= ~_DISABLE_PROTOCOL_FLAGS
		options &= message.protocol_flags  # Remove unoffered initial flags, they are not required

		self.use_skip = bool(resp.protocol_flags & ProtocolFlags.SKIP)
		for filtr in self.filters:
			flags = get_flags(filtr)
			optmask |= flags.unset_options
			options |= flags.set_options
			actions |= flags.set_actions

		return resp
		options &= ~optmask

		if (missing_actions := actions & ~message.action_flags):
			raise NegotiationError(f"MTA does not accept {missing_actions}")

		if (missing_options := options & ~message.protocol_flags):
			raise NegotiationError(f"MTA does not offer {missing_options}")

		self.use_skip = ProtocolFlags.SKIP in options

		return Negotiate(6, actions, options)

	async def _prepare_filters(
		self,

tests/test_options.py

0 → 100644
+330 −0
Original line number Diff line number Diff line
from unittest import TestCase

from kilter.protocol import Accept
from kilter.protocol import ActionFlags
from kilter.protocol import ProtocolFlags
from kilter.service import Session
from kilter.service import options

NO_CONNECT = ProtocolFlags.NO_CONNECT
NO_HELO = ProtocolFlags.NO_HELO
NO_SENDER = ProtocolFlags.NO_SENDER
NO_RECIPIENT = ProtocolFlags.NO_RECIPIENT
NO_BODY = ProtocolFlags.NO_BODY
NO_HEADERS = ProtocolFlags.NO_HEADERS
NO_END_OF_HEADERS = ProtocolFlags.NO_END_OF_HEADERS
NO_UNKNOWN = ProtocolFlags.NO_UNKNOWN
NO_DATA = ProtocolFlags.NO_DATA
SKIP = ProtocolFlags.SKIP
REJECTED_RECIPIENT = ProtocolFlags.REJECTED_RECIPIENT
NR_CONNECT = ProtocolFlags.NR_CONNECT
NR_HELO = ProtocolFlags.NR_HELO
NR_SENDER = ProtocolFlags.NR_SENDER
NR_RECIPIENT = ProtocolFlags.NR_RECIPIENT
NR_DATA = ProtocolFlags.NR_DATA
NR_UNKNOWN = ProtocolFlags.NR_UNKNOWN
NR_END_OF_HEADERS = ProtocolFlags.NR_END_OF_HEADERS
NR_BODY = ProtocolFlags.NR_BODY
NR_HEADER = ProtocolFlags.NR_HEADER
HEADER_LEADING_SPACE = ProtocolFlags.HEADER_LEADING_SPACE
MAX_DATA_SIZE_256K = ProtocolFlags.MAX_DATA_SIZE_256K
MAX_DATA_SIZE_1M = ProtocolFlags.MAX_DATA_SIZE_1M

ADD_HEADERS = ActionFlags.ADD_HEADERS
CHANGE_HEADERS = ActionFlags.CHANGE_HEADERS
CHANGE_BODY = ActionFlags.CHANGE_BODY
ADD_RECIPIENT = ActionFlags.ADD_RECIPIENT
ADD_RECIPIENT_PAR = ActionFlags.ADD_RECIPIENT_PAR
DELETE_RECIPIENT = ActionFlags.DELETE_RECIPIENT
QUARANTINE = ActionFlags.QUARANTINE
CHANGE_FROM = ActionFlags.CHANGE_FROM
SETSYMLIST = ActionFlags.SETSYMLIST


def resolve_opts(
	flags: options.FlagsTuple,
	starting: ProtocolFlags = ProtocolFlags.NONE,
) -> ProtocolFlags:
	"""
	Resolve the ProtocolFlags set & unset by a FlagsTuple object, from starting options
	"""
	return (starting | flags.set_options) & ~flags.unset_options


class Tests(TestCase):
	"""
	Tests for options decorators
	"""

	def test_get_flags(self) -> None:
		"""
		Check that get_flags works with both decorated and undecorated filters
		"""
		@options.modify_flags()
		async def decorated(session: Session) -> Accept:
			return Accept()

		async def undecorated(session: Session) -> Accept:
			return Accept()

		with self.subTest("decorated"):
			flags = options.get_flags(decorated)

			assert (flags.set_options & ~flags.unset_options) == ProtocolFlags.NONE
			assert flags.set_actions == ActionFlags.NONE

		with self.subTest("undecorated"):
			flags = options.get_flags(undecorated)

			assert (flags.set_options & ~flags.unset_options) == ProtocolFlags.NONE
			assert flags.set_actions == ActionFlags(0x1ff)

	def test_modify_flags(self) -> None:
		"""
		Check that modify_flags compounds flags
		"""
		@options.modify_flags(
			set_options=SKIP,
			unset_options=HEADER_LEADING_SPACE,
			set_actions=ADD_HEADERS,
		)
		@options.modify_flags(
			set_options=NR_CONNECT,
			unset_options=SKIP,
		)
		@options.modify_flags(
			set_options=NR_CONNECT,
		)
		@options.modify_flags(
			set_actions=CHANGE_HEADERS,
		)
		async def filtr(session: Session) -> Accept:
			return Accept()

		flags = options.get_flags(filtr)

		assert flags.set_options == SKIP|NR_CONNECT, \
				f"{flags.set_options!r} is not SKIP"
		assert flags.unset_options == HEADER_LEADING_SPACE|SKIP, \
				f"{flags.unset_options!r} is not HEADER_LEADING_SPACE"
		assert flags.set_actions == ADD_HEADERS|CHANGE_HEADERS, \
				f"{flags.set_actions!r} is not ADD_HEADERS"

	def test_responds_to_connect(self) -> None:
		"""
		Check that decorating a filter with responds_to_connect disables NR_CONNECT
		"""
		@options.responds_to_connect()
		async def filtr(session: Session) -> Accept:
			return Accept()

		flags = options.get_flags(filtr)

		assert NR_CONNECT not in resolve_opts(flags)

	def test_examine_helo(self) -> None:
		"""
		Check that examine_helo unsets NO_HELO and toggles NR_HELO appropriately
		"""
		with self.subTest("can_respond=False"):
			@options.examine_helo(can_respond=False)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_HELO not in resolve_opts(flags, NO_HELO)
			assert NO_HELO not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_HELO not in resolve_opts(flags, NO_HELO)
			assert NR_HELO in resolve_opts(flags, NR_HELO)

		with self.subTest("can_respond=True"):
			@options.examine_helo(can_respond=True)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_HELO not in resolve_opts(flags, NO_HELO)
			assert NO_HELO not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_HELO not in resolve_opts(flags, NO_HELO)
			assert NR_HELO not in resolve_opts(flags, NR_HELO)

	def test_examine_sender(self) -> None:
		"""
		Check that examine_sender sets NO_SENDER, NR_SENDER & CHANGE_FROM appropriately
		"""
		with self.subTest("can_respond=False, can_replace=False"):
			@options.examine_sender(can_respond=False, can_replace=False)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_SENDER not in resolve_opts(flags, NO_SENDER)
			assert NO_SENDER not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_SENDER not in resolve_opts(flags, NO_SENDER)
			assert NR_SENDER in resolve_opts(flags, NR_SENDER)
			assert CHANGE_FROM not in flags.set_actions

		with self.subTest("can_respond=True, can_replace=True"):
			@options.examine_sender(can_respond=True, can_replace=True)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_SENDER not in resolve_opts(flags, NO_SENDER)
			assert NO_SENDER not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_SENDER not in resolve_opts(flags, NO_SENDER)
			assert NR_SENDER not in resolve_opts(flags, NR_SENDER)
			assert CHANGE_FROM in flags.set_actions

	def test_examine_recipients(self) -> None:
		"""
		Check that examine_recipients sets protocol options and actions as appropriate
		"""
		with self.subTest("all False"):
			@options.examine_recipients()
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_RECIPIENT not in resolve_opts(flags, NO_RECIPIENT)
			assert NO_RECIPIENT not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_RECIPIENT not in resolve_opts(flags, NO_RECIPIENT)
			assert NR_RECIPIENT in resolve_opts(flags, NR_RECIPIENT)
			assert REJECTED_RECIPIENT not in resolve_opts(flags, SKIP)
			assert REJECTED_RECIPIENT in resolve_opts(flags, REJECTED_RECIPIENT)

			assert ADD_RECIPIENT not in flags.set_actions
			assert ADD_RECIPIENT_PAR not in flags.set_actions
			assert DELETE_RECIPIENT not in flags.set_actions

		with self.subTest("can_respond, can_add, include_rejected"):
			@options.examine_recipients(can_respond=True, can_add=True, include_rejected=True)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_RECIPIENT not in resolve_opts(flags, NO_RECIPIENT)
			assert NO_RECIPIENT not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_RECIPIENT not in resolve_opts(flags, NO_RECIPIENT)
			assert NR_RECIPIENT not in resolve_opts(flags, NR_RECIPIENT)
			assert REJECTED_RECIPIENT in resolve_opts(flags, SKIP)

			assert ADD_RECIPIENT in flags.set_actions
			assert ADD_RECIPIENT_PAR not in flags.set_actions
			assert DELETE_RECIPIENT not in flags.set_actions

		with self.subTest("can_add, with_parameters"):
			@options.examine_recipients(can_add=True, with_parameters=True)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			# Skip repetitious checks…
			assert ADD_RECIPIENT in flags.set_actions
			assert ADD_RECIPIENT_PAR in flags.set_actions
			assert DELETE_RECIPIENT not in flags.set_actions

		with self.subTest("can_add, can_remove"):
			@options.examine_recipients(can_add=True, can_remove=True)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			# Skip repetitious checks…
			assert ADD_RECIPIENT in flags.set_actions
			assert ADD_RECIPIENT_PAR not in flags.set_actions
			assert DELETE_RECIPIENT in flags.set_actions

	def test_examine_headers(self) -> None:
		"""
		Check that examine_headers sets protocol options and actions as appropriate
		"""
		with self.subTest("all False"):
			@options.examine_headers()
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_HEADERS not in resolve_opts(flags, NO_HEADERS)
			assert NO_HEADERS not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_HEADER not in resolve_opts(flags, NO_HEADERS)
			assert NR_HEADER in resolve_opts(flags, NR_HEADER)
			assert HEADER_LEADING_SPACE not in resolve_opts(flags, ProtocolFlags.NONE)
			assert HEADER_LEADING_SPACE in resolve_opts(flags, HEADER_LEADING_SPACE)

			assert ADD_HEADERS not in flags.set_actions
			assert CHANGE_HEADERS not in flags.set_actions

		with self.subTest("can_respond=True, leading_space=True"):
			@options.examine_headers(can_respond=True, leading_space=True)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_HEADERS not in resolve_opts(flags, NO_HEADERS)
			assert NO_HEADERS not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_HEADER not in resolve_opts(flags, NO_HEADERS)
			assert NR_HEADER not in resolve_opts(flags, NR_HEADER)
			assert HEADER_LEADING_SPACE in resolve_opts(flags, ProtocolFlags.NONE)
			assert HEADER_LEADING_SPACE in resolve_opts(flags, HEADER_LEADING_SPACE)

			assert ADD_HEADERS not in flags.set_actions
			assert CHANGE_HEADERS not in flags.set_actions

		with self.subTest("can_add=True"):
			@options.examine_headers(can_add=True)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			# Skip repetitious checks…
			assert ADD_HEADERS in flags.set_actions
			assert CHANGE_HEADERS not in flags.set_actions

		with self.subTest("can_add=True, can_modify=True"):
			@options.examine_headers(can_add=True, can_modify=True)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			# Skip repetitious checks…
			assert ADD_HEADERS in flags.set_actions
			assert CHANGE_HEADERS in flags.set_actions

	def test_examine_body(self) -> None:
		"""
		Check that examine_body sets protocol options and actions as appropriate
		"""
		with self.subTest("all False"):
			@options.examine_body()
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_BODY not in resolve_opts(flags, NO_BODY)
			assert NO_BODY not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_BODY not in resolve_opts(flags, NO_BODY)
			assert NR_BODY in resolve_opts(flags, NR_BODY)
			assert MAX_DATA_SIZE_256K not in resolve_opts(flags, ProtocolFlags.NONE)
			assert MAX_DATA_SIZE_256K in resolve_opts(flags, MAX_DATA_SIZE_256K)
			assert MAX_DATA_SIZE_1M not in resolve_opts(flags, ProtocolFlags.NONE)
			assert MAX_DATA_SIZE_1M in resolve_opts(flags, MAX_DATA_SIZE_1M)

			assert CHANGE_BODY not in flags.set_actions

		with self.subTest("can_respond=True, can_replace=True, data_size=MDS_256K"):
			@options.examine_body(can_respond=True, can_replace=True, data_size=ProtocolFlags.MDS_256K)
			async def filtr(session: Session) -> Accept:
				return Accept()

			flags = options.get_flags(filtr)
			assert NO_BODY not in resolve_opts(flags, NO_BODY)
			assert NO_BODY not in resolve_opts(flags, ProtocolFlags.NONE)
			assert NR_BODY not in resolve_opts(flags, NO_BODY)
			assert NR_BODY not in resolve_opts(flags, NR_BODY)
			assert MAX_DATA_SIZE_256K in resolve_opts(flags, ProtocolFlags.NONE)
			assert MAX_DATA_SIZE_256K in resolve_opts(flags, MAX_DATA_SIZE_256K)
			assert MAX_DATA_SIZE_1M not in resolve_opts(flags, ProtocolFlags.NONE)
			assert MAX_DATA_SIZE_1M in resolve_opts(flags, MAX_DATA_SIZE_1M)

			assert CHANGE_BODY in flags.set_actions