Coverage for kilter/service/util.py: 94.03%

59 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-07 13:26 +0000

1# Copyright 2022 Dominik Sekotill <dom.sekotill@kodo.org.uk> 

2# 

3# This Source Code Form is subject to the terms of the Mozilla Public 

4# License, v. 2.0. If a copy of the MPL was not distributed with this 

5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 

6 

7""" 

8Common helper utilities 

9""" 

10 

11# mypy: disallow-any-explicit=False 

12 

13from __future__ import annotations 

14 

15from collections.abc import AsyncIterator 

16from collections.abc import Callable 

17from contextlib import asynccontextmanager 

18from typing import Generic 

19from typing import Optional 

20from typing import TypeVar 

21 

22import anyio 

23 

24T = TypeVar("T") 

25Fn = TypeVar("Fn", bound=Callable[..., object]) 

26 

27 

28class Broadcast(anyio.Condition, Generic[T]): 

29 """ 

30 A reliable, blocking message queue for delivering to multiple listening tasks 

31 

32 Listeners must acquire the lock (by using the `Broadcast` instance as a context manager) 

33 before calling `Broadcast.receive()` or it will fail. If a listener is repeatedly 

34 awaiting messages in a loop, the loop should be inside the locked context or messages 

35 may be lost to race conditions. 

36 """ 

37 

38 def __init__(self) -> None: 

39 super().__init__() 

40 self.obj: Optional[T] = None 

41 self.exc: Optional[BaseException|type[BaseException]] = None 

42 

43 async def pre_receive_hook(self) -> None: 

44 """ 

45 A hook for subclasses to inject synchronisation instructions before awaiting objects 

46 """ # noqa: D401 

47 

48 async def post_send_hook(self) -> None: 

49 """ 

50 A hook for subclasses to inject synchronisation instructions after sending objects 

51 """ # noqa: D401 

52 

53 async def shutdown_hook(self) -> None: 

54 """ 

55 A hook for subclasses to inject cleanup or synchronisation instructions on close 

56 

57 Users must ensure this method is called, especially if using a subclass which 

58 implements it. 

59 """ # noqa: D401 

60 

61 async def abort(self, exc: BaseException|type[BaseException]) -> None: 

62 """ 

63 Send a notification to all listeners to abort by raising an exception 

64 """ 

65 async with self._ready(): 

66 assert self.exc is None and self.obj is None 

67 self.exc = exc 

68 self.notify_all() 

69 await self._post() 

70 

71 async def send(self, obj: T) -> None: 

72 """ 

73 Send a message object and block until all listeners have received it 

74 """ 

75 async with self._ready(): 

76 assert self.exc is None and self.obj is None 

77 self.obj = obj 

78 self.notify_all() 

79 await self._post() 

80 

81 @asynccontextmanager 

82 async def _ready(self) -> AsyncIterator[None]: 

83 while 1: 

84 await anyio.sleep(0.0) 

85 async with self: 

86 if self.obj is not None or self.exc is not None: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 continue 

88 yield 

89 return 

90 

91 async def _post(self) -> None: 

92 await anyio.sleep(0.0) # ensure listeners have opportunity to wait for locks 

93 await self.post_send_hook() 

94 

95 # Ensure all listeners have had a chance to lock and process self.obj 

96 while 1: 

97 async with self: 

98 if self.statistics().lock_statistics.tasks_waiting: # pragma: no-branch 

99 continue 

100 self.obj = self.exc = None 

101 break 

102 

103 async def receive(self) -> T: 

104 """ 

105 Listen for a single message and return it once it arrives 

106 """ 

107 await self.pre_receive_hook() 

108 await self.wait() 

109 if self.exc is not None: 

110 raise self.exc 

111 assert self.obj is not None 

112 return self.obj 

113 

114 

115def qualname(func: Fn) -> str: 

116 """ 

117 Return a qualified name for a callable 

118 """ 

119 if func.__module__ == "__main__": 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 return func.__qualname__ 

121 return f"{func.__module__}.{func.__qualname__}"