Coverage for kilter/service/util.py: 94.03%
59 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-07 13:26 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-07 13:26 +0000
1# Copyright 2022 Dominik Sekotill <dom.sekotill@kodo.org.uk>
2#
3# This Source Code Form is subject to the terms of the Mozilla Public
4# License, v. 2.0. If a copy of the MPL was not distributed with this
5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
7"""
8Common helper utilities
9"""
11# mypy: disallow-any-explicit=False
13from __future__ import annotations
15from collections.abc import AsyncIterator
16from collections.abc import Callable
17from contextlib import asynccontextmanager
18from typing import Generic
19from typing import Optional
20from typing import TypeVar
22import anyio
24T = TypeVar("T")
25Fn = TypeVar("Fn", bound=Callable[..., object])
28class Broadcast(anyio.Condition, Generic[T]):
29 """
30 A reliable, blocking message queue for delivering to multiple listening tasks
32 Listeners must acquire the lock (by using the `Broadcast` instance as a context manager)
33 before calling `Broadcast.receive()` or it will fail. If a listener is repeatedly
34 awaiting messages in a loop, the loop should be inside the locked context or messages
35 may be lost to race conditions.
36 """
38 def __init__(self) -> None:
39 super().__init__()
40 self.obj: Optional[T] = None
41 self.exc: Optional[BaseException|type[BaseException]] = None
43 async def pre_receive_hook(self) -> None:
44 """
45 A hook for subclasses to inject synchronisation instructions before awaiting objects
46 """ # noqa: D401
48 async def post_send_hook(self) -> None:
49 """
50 A hook for subclasses to inject synchronisation instructions after sending objects
51 """ # noqa: D401
53 async def shutdown_hook(self) -> None:
54 """
55 A hook for subclasses to inject cleanup or synchronisation instructions on close
57 Users must ensure this method is called, especially if using a subclass which
58 implements it.
59 """ # noqa: D401
61 async def abort(self, exc: BaseException|type[BaseException]) -> None:
62 """
63 Send a notification to all listeners to abort by raising an exception
64 """
65 async with self._ready():
66 assert self.exc is None and self.obj is None
67 self.exc = exc
68 self.notify_all()
69 await self._post()
71 async def send(self, obj: T) -> None:
72 """
73 Send a message object and block until all listeners have received it
74 """
75 async with self._ready():
76 assert self.exc is None and self.obj is None
77 self.obj = obj
78 self.notify_all()
79 await self._post()
81 @asynccontextmanager
82 async def _ready(self) -> AsyncIterator[None]:
83 while 1:
84 await anyio.sleep(0.0)
85 async with self:
86 if self.obj is not None or self.exc is not None: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 continue
88 yield
89 return
91 async def _post(self) -> None:
92 await anyio.sleep(0.0) # ensure listeners have opportunity to wait for locks
93 await self.post_send_hook()
95 # Ensure all listeners have had a chance to lock and process self.obj
96 while 1:
97 async with self:
98 if self.statistics().lock_statistics.tasks_waiting: # pragma: no-branch
99 continue
100 self.obj = self.exc = None
101 break
103 async def receive(self) -> T:
104 """
105 Listen for a single message and return it once it arrives
106 """
107 await self.pre_receive_hook()
108 await self.wait()
109 if self.exc is not None:
110 raise self.exc
111 assert self.obj is not None
112 return self.obj
115def qualname(func: Fn) -> str:
116 """
117 Return a qualified name for a callable
118 """
119 if func.__module__ == "__main__": 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 return func.__qualname__
121 return f"{func.__module__}.{func.__qualname__}"