Coverage for aiocoap/pipe.py: 85%

125 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-10 11:47 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5import asyncio 

6from collections import namedtuple 

7import functools 

8import sys 

9 

10from . import error 

11from .numbers import INTERNAL_SERVER_ERROR 

12 

13 

14class Pipe: 

15 """Low-level meeting point between a request and a any responses that come 

16 back on it. 

17 

18 A single request message is placed in the Pipe at creation time. 

19 Any responses, as well as any exception happening in the course of 

20 processing, are passed back to the requester along the Pipe. A 

21 response can carry an indication of whether it is final; an exception 

22 always is. 

23 

24 This object is used both on the client side (where the Context on behalf of 

25 the application creates a Pipe and passes it to the network 

26 transports that send the request and fill in any responses) and on the 

27 server side (where the Context creates one for an incoming request and 

28 eventually lets the server implementation populate it with responses). 

29 

30 This currently follows a callback dispatch style. (It may be developed into 

31 something where only awaiting a response drives the proces, though). 

32 

33 Currently, the requester sets up the object, connects callbacks, and then 

34 passes the Pipe on to whatever creates the response. 

35 

36 The creator of responses is notified by the Pipe of a loss of 

37 interest in a response when there are no more callback handlers registered 

38 by registering an on_interest_end callback. As the response callbacks need 

39 to be already in place when the Pipe is passed on to the 

40 responder, the absence event callbacks is signalled by callign the callback 

41 immediately on registration. 

42 

43 To accurately model "loss of interest", it is important to use the 

44 two-phase setup of first registering actual callbacks and then producing 

45 events and/or placing on_interest_end callbacks; this is not clearly 

46 expressed in type or state yet. (One possibility would be for the 

47 Pipe to carry a preparation boolean, and which prohibits event 

48 sending during preparation and is_interest=True callback creation 

49 afterwards). 

50 

51 This was previously named PlumbingRequest. 

52 

53 **Stability** 

54 

55 Sites and resources implemented by provinding a 

56 :meth:`~aiocoap.interfaces.Resource.render_to_pipe` method can stably use 

57 the :meth:`add_response` method of a Pipe (or something that quacks like 

58 it). 

59 

60 They should not rely on :meth:`add_exception` but rather just raise the 

61 exception, and neither register :meth:`on_event` handlers (being the sole 

62 producer of events) nor hook to :meth:`on_interest_end` (instead, they can 

63 use finally clauses or async context managers to handle any cleanup when 

64 the cancellation of the render task indicates the peer's loss of interest). 

65 """ 

66 

67 Event = namedtuple("Event", ("message", "exception", "is_last")) 

68 

69 # called by the initiator of the request 

70 

71 def __init__(self, request, log): 

72 self.request = request 

73 self.log = log 

74 

75 self._event_callbacks = [] 

76 """list[(callback, is_interest)], or None during event processing, or 

77 False when there were no more event callbacks and an the 

78 on_interest_end callbacks have already been called""" 

79 

80 def __repr__(self): 

81 return "<%s at %#x around %r with %r callbacks (thereof %r interests)>" % ( 

82 type(self).__name__, 

83 id(self), 

84 self.request, 

85 len(self._event_callbacks) 

86 if self._event_callbacks 

87 else self._event_callbacks, 

88 sum(1 for (e, is_interest) in self._event_callbacks if is_interest) 

89 if self._event_callbacks 

90 else self._event_callbacks, 

91 ) 

92 

93 def _any_interest(self): 

94 return any(is_interest for (cb, is_interest) in self._event_callbacks) 

95 

96 def poke(self): 

97 """Ask the responder for a life sign. It is up to the responder to 

98 ignore this (eg. because the responder is the library/application and 

99 can't be just gone), to issue a generic transport-dependent 'ping' to 

100 see whether the connection is still alive, or to retransmit the request 

101 if it is an observation over an unreliable channel. 

102 

103 In any case, no status is reported directly to the poke, but if 

104 whatever the responder does fails, it will send an appropriate error 

105 message as a response.""" 

106 raise NotImplementedError() 

107 

108 def on_event(self, callback, is_interest=True): 

109 """Call callback on any event. The callback must return True to be 

110 called again after an event. Callbacks must not produce new events or 

111 deregister unrelated event handlers. 

112 

113 If is_interest=False, the callback will not be counted toward the 

114 active callbacks, and will receive a (None, None, is_last=True) event 

115 eventually. 

116 

117 To unregister the handler, call the returned closure; this can trigger 

118 on_interest_end callbacks. 

119 """ 

120 self._event_callbacks.append((callback, is_interest)) 

121 return functools.partial(self._unregister_on_event, callback) 

122 

123 def _unregister_on_event(self, callback): 

124 if self._event_callbacks is False: 

125 # They wouldn't be called any more so they're already dropped.a 

126 # It's OK that the caller cleans up after itself: Sure it could 

127 # register an on_interest_end, but that's really not warranted if 

128 # all it wants to know is whether it'll have to execute cleanup 

129 # when it's shutting down or not. 

130 return 

131 

132 self._event_callbacks = [ 

133 (cb, i) for (cb, i) in self._event_callbacks if callback is not cb 

134 ] 

135 if not self._any_interest(): 

136 self._end() 

137 

138 def on_interest_end(self, callback): 

139 """Register a callback that will be called exactly once -- either right 

140 now if there is not even a current indicated interest, or at a last 

141 event, or when no more interests are present""" 

142 

143 if self._event_callbacks is False: 

144 # Happens, for example, when a proxy receives multiple requests on a single token 

145 self.log.warning( 

146 "on_interest_end callback %r added after %r has already ended", 

147 callback, 

148 self, 

149 ) 

150 callback() 

151 return 

152 

153 if self._any_interest(): 

154 self._event_callbacks.append( 

155 ( 

156 lambda e: ((callback(), False) if e.is_last else (None, True))[1], 

157 False, 

158 ) 

159 ) 

160 else: 

161 callback() 

162 

163 def _end(self): 

164 cbs = self._event_callbacks 

165 self._event_callbacks = False 

166 tombstone = self.Event(None, None, True) 

167 [cb(tombstone) for (cb, _) in cbs] 

168 

169 # called by the responding side 

170 

171 def _add_event(self, event): 

172 if self._event_callbacks is False: 

173 # Happens, for example, when a proxy receives multiple requests on a single token 

174 self.log.warning( 

175 "Response %r added after %r has already ended", event, self 

176 ) 

177 return 

178 

179 for cb, is_interest in self._event_callbacks[:]: 

180 keep_calling = cb(event) 

181 if not keep_calling: 

182 if self._event_callbacks is False: 

183 # All interest was just lost during the callback 

184 return 

185 

186 self._event_callbacks.remove((cb, is_interest)) 

187 

188 if not self._any_interest(): 

189 self._end() 

190 

191 def add_response(self, response, is_last=False): 

192 self._add_event(self.Event(response, None, is_last)) 

193 

194 def add_exception(self, exception): 

195 self._add_event(self.Event(None, exception, True)) 

196 

197 

198def run_driving_pipe(pipe, coroutine, name=None): 

199 """Create a task from a coroutine where the end of the coroutine produces a 

200 terminal event on the pipe, and lack of interest in the pipe cancels the 

201 task. 

202 

203 The coroutine will typically produce output into the pipe; that 

204 connection is set up by the caller like as in 

205 ``run_driving_pipe(pipe, render_to(pipe))``. 

206 

207 The create task is not returned, as the only sensible operation on it would 

208 be cancellation and that's already set up from the pipe. 

209 """ 

210 

211 async def wrapped(): 

212 try: 

213 await coroutine 

214 except Exception as e: 

215 pipe.add_exception(e) 

216 # Not doing anything special about cancellation: it indicates the 

217 # peer's loss of interest, so there's no use in sending anythign out to 

218 # someone not listening any more 

219 

220 task = asyncio.create_task( 

221 wrapped(), 

222 name=name, 

223 ) 

224 if sys.version_info < (3, 8): 

225 # These Python versions used to complain about cancelled tasks, where 

226 # really a cancelled task is perfectly natural (especially here where 

227 # it's just not needed any more because nobody is listening to what it 

228 # produces). As catching CancellationError doesn't help silencing them, 

229 # this workaround ensures the cancellations don't raise. 

230 def silence_cancellation(task): 

231 try: 

232 task.result() 

233 except asyncio.CancelledError: 

234 pass 

235 

236 task.add_done_callback(silence_cancellation) 

237 pipe.on_interest_end(task.cancel) 

238 

239 

240def error_to_message(old_pr, log): 

241 """Given a pipe set up by the requester, create a new pipe to pass on to a 

242 responder. 

243 

244 Any exceptions produced by the responder will be turned into terminal 

245 responses on the original pipe, and loss of interest is forwarded.""" 

246 

247 from .message import Message 

248 

249 next_pr = Pipe(old_pr.request, log) 

250 

251 def on_event(event): 

252 if event.message is not None: 

253 old_pr.add_response(event.message, event.is_last) 

254 return not event.is_last 

255 

256 e = event.exception 

257 

258 if isinstance(e, error.RenderableError): 

259 # the repr() here is quite imporant for garbage collection 

260 log.info( 

261 "Render request raised a renderable error (%s), responding accordingly.", 

262 repr(e), 

263 ) 

264 try: 

265 msg = e.to_message() 

266 if msg is None: 

267 # This deserves a separate check because the ABC checks 

268 # that should ensure that the default to_message method is 

269 # never used in concrete classes fails due to the metaclass 

270 # conflict between ABC and Exceptions 

271 raise ValueError( 

272 "Exception to_message failed to produce a message on %r" % e 

273 ) 

274 except Exception as e2: 

275 log.error( 

276 "Rendering the renderable exception failed: %r", e2, exc_info=e2 

277 ) 

278 msg = Message(code=INTERNAL_SERVER_ERROR) 

279 old_pr.add_response(msg, is_last=True) 

280 else: 

281 log.error( 

282 "An exception occurred while rendering a resource: %r", e, exc_info=e 

283 ) 

284 old_pr.add_response(Message(code=INTERNAL_SERVER_ERROR), is_last=True) 

285 

286 return False 

287 

288 remove_interest = next_pr.on_event(on_event) 

289 old_pr.on_interest_end(remove_interest) 

290 return next_pr 

291 

292 

293class IterablePipe: 

294 """A stand-in for a Pipe that the requesting party can use 

295 instead. It should behave just like a Pipe to the responding 

296 party, but the caller does not register on_event handlers and instead 

297 iterates asynchronously over the events. 

298 

299 Note that the PR can be aitered over only once, and does not support any 

300 additional hook settings once asynchronous iteration is started; this is 

301 consistent with the usage pattern of pipes. 

302 """ 

303 

304 def __init__(self, request): 

305 self.request = request 

306 

307 self.__on_interest_end = [] 

308 

309 # FIXME: This is unbounded -- pipes should gain support for 

310 # backpressure. 

311 self.__queue = asyncio.Queue() 

312 

313 def on_interest_end(self, callback): 

314 try: 

315 self.__on_interest_end.append(callback) 

316 except AttributeError: 

317 raise RuntimeError( 

318 "Attempted to declare interest in the end of a IterablePipe on which iteration already started" 

319 ) from None 

320 

321 def __aiter__(self): 

322 i = self.Iterator(self.__queue, self.__on_interest_end) 

323 del self.__on_interest_end 

324 return i 

325 

326 def _add_event(self, e): 

327 self.__queue.put_nowait(e) 

328 

329 def add_response(self, response, is_last=False): 

330 self._add_event(Pipe.Event(response, None, is_last)) 

331 

332 def add_exception(self, exception): 

333 self._add_event(Pipe.Event(None, exception, True)) 

334 

335 class Iterator: 

336 def __init__(self, queue, on_interest_end): 

337 self.__queue = queue 

338 self.__on_interest_end = on_interest_end 

339 

340 async def __anext__(self): 

341 return await self.__queue.get() 

342 

343 def __del__(self): 

344 # This is pretty reliable as the iterator is only created and 

345 # referenced in the desugaring of the `async for`. 

346 for c in self.__on_interest_end: 

347 c()