Commit d6cbb346 authored by Dom Sekotill's avatar Dom Sekotill
Browse files

Ensure Broadcast messages do not overlap

parent 668a504a
Loading
Loading
Loading
Loading
+14 −2
Original line number Diff line number Diff line
@@ -12,7 +12,9 @@ Common helper utilities

from __future__ import annotations

from collections.abc import AsyncIterator
from collections.abc import Callable
from contextlib import asynccontextmanager
from typing import Generic
from typing import Optional
from typing import TypeVar
@@ -60,7 +62,7 @@ class Broadcast(anyio.Condition, Generic[T]):
		"""
		Send a notification to all listeners to abort by raising an exception
		"""
		async with self:
		async with self._ready():
			assert self.exc is None and self.obj is None
			self.exc = exc
			self.notify_all()
@@ -70,12 +72,22 @@ class Broadcast(anyio.Condition, Generic[T]):
		"""
		Send a message object and block until all listeners have received it
		"""
		async with self:
		async with self._ready():
			assert self.exc is None and self.obj is None
			self.obj = obj
			self.notify_all()
		await self._post()

	@asynccontextmanager
	async def _ready(self) -> AsyncIterator[None]:
		while 1:
			await anyio.sleep(0.0)
			async with self:
				if self.obj is not None or self.exc is not None:
					continue
				yield
				return

	async def _post(self) -> None:
		await anyio.sleep(0.0)  # ensure listeners have opportunity to wait for locks
		await self.post_send_hook()