Coverage for aiocoap/protocol.py: 88%

481 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-16 16:09 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the classes that are responsible for keeping track of 

6messages: 

7 

8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP 

9 socket) -- something that can send requests and possibly can answer 

10 incoming requests. 

11 

12* a :class:`Request` gets generated whenever a request gets sent to keep 

13 track of the response 

14 

15* a :class:`Responder` keeps track of a single incoming request 

16 

17Logging 

18~~~~~~~ 

19 

20Several constructors of the Context accept a logger name; these names go into 

21the construction of a Python logger. 

22 

23Log events will be emitted to these on different levels, with "warning" and 

24above being a practical default for things that should may warrant reviewing by 

25an operator: 

26 

27* DEBUG is used for things that occur even under perfect conditions. 

28* INFO is for things that are well expected, but might be interesting during 

29 testing a network of nodes and not just when debugging the library. (This 

30 includes timeouts, retransmissions, and pings.) 

31* WARNING is for everything that indicates a malbehaved peer. These don't 

32 *necessarily* indicate a client bug, though: Things like requesting a 

33 nonexistent block can just as well happen when a resource's content has 

34 changed between blocks. The library will not go out of its way to determine 

35 whether there is a plausible explanation for the odd behavior, and will 

36 report something as a warning in case of doubt. 

37* ERROR is used when something clearly went wrong. This includes irregular 

38 connection terminations and resource handler errors (which are demoted to 

39 error responses), and can often contain a backtrace. 

40""" 

41 

42import asyncio 

43import weakref 

44import time 

45from typing import Optional, List 

46 

47from . import defaults 

48from .credentials import CredentialsMap 

49from .message import Message 

50from .messagemanager import MessageManager 

51from .tokenmanager import TokenManager 

52from .pipe import Pipe, run_driving_pipe, error_to_message 

53from . import interfaces 

54from . import error 

55from .numbers import (INTERNAL_SERVER_ERROR, NOT_FOUND, 

56 CONTINUE, SHUTDOWN_TIMEOUT) 

57from .util.asyncio import py38args 

58 

59import warnings 

60import logging 

61 

62 

63class Context(interfaces.RequestProvider): 

64 """Applications' entry point to the network 

65 

66 A :class:`.Context` coordinates one or more network :mod:`.transports` 

67 implementations and dispatches data between them and the application. 

68 

69 The application can start requests using the message dispatch methods, and 

70 set a :class:`resources.Site` that will answer requests directed to the 

71 application as a server. 

72 

73 On the library-internals side, it is the prime implementation of the 

74 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and 

75 :class:`Response` classes on demand, and decides which transport 

76 implementations to start and which are to handle which messages. 

77 

78 **Context creation and destruction** 

79 

80 The following functions are provided for creating and stopping a context: 

81 

82 .. note:: 

83 

84 A typical application should only ever create one context, even (or 

85 especially when) it acts both as a server and as a client (in which 

86 case a server context should be created). 

87 

88 A context that is not used any more must be shut down using 

89 :meth:`.shutdown()`, but typical applications will not need to because 

90 they use the context for the full process lifetime. 

91 

92 .. automethod:: create_client_context 

93 .. automethod:: create_server_context 

94 

95 .. automethod:: shutdown 

96 

97 **Dispatching messages** 

98 

99 CoAP requests can be sent using the following functions: 

100 

101 .. automethod:: request 

102 

103 If more control is needed, you can create a :class:`Request` yourself and 

104 pass the context to it. 

105 

106 

107 **Other methods and properties** 

108 

109 The remaining methods and properties are to be considered unstable even 

110 when the project reaches a stable version number; please file a feature 

111 request for stabilization if you want to reliably access any of them. 

112 

113 (Sorry for the duplicates, still looking for a way to make autodoc list 

114 everything not already mentioned). 

115 

116 """ 

117 def __init__(self, loop=None, serversite=None, loggername="coap", client_credentials=None, server_credentials=None): 

118 self.log = logging.getLogger(loggername) 

119 

120 self.loop = loop or asyncio.get_event_loop() 

121 

122 self.serversite = serversite 

123 

124 self.request_interfaces = [] 

125 

126 self.client_credentials = client_credentials or CredentialsMap() 

127 self.server_credentials = server_credentials or CredentialsMap() 

128 

129 # 

130 # convenience methods for class instanciation 

131 # 

132 

133 async def _append_tokenmanaged_messagemanaged_transport(self, message_interface_constructor): 

134 tman = TokenManager(self) 

135 mman = MessageManager(tman) 

136 transport = await message_interface_constructor(mman) 

137 

138 mman.message_interface = transport 

139 tman.token_interface = mman 

140 

141 self.request_interfaces.append(tman) 

142 

143 async def _append_tokenmanaged_transport(self, token_interface_constructor): 

144 tman = TokenManager(self) 

145 transport = await token_interface_constructor(tman) 

146 

147 tman.token_interface = transport 

148 

149 self.request_interfaces.append(tman) 

150 

151 @classmethod 

152 async def create_client_context(cls, *, loggername="coap", loop=None, transports: Optional[List[str]] = None): 

153 """Create a context bound to all addresses on a random listening port. 

154 

155 This is the easiest way to get a context suitable for sending client 

156 requests. 

157 """ 

158 

159 if loop is None: 

160 loop = asyncio.get_event_loop() 

161 

162 self = cls(loop=loop, serversite=None, loggername=loggername) 

163 

164 selected_transports = transports or defaults.get_default_clienttransports(loop=loop) 

165 

166 # FIXME make defaults overridable (postponed until they become configurable too) 

167 for transportname in selected_transports: 

168 if transportname == 'udp6': 

169 from .transports.udp6 import MessageInterfaceUDP6 

170 await self._append_tokenmanaged_messagemanaged_transport( 

171 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

172 elif transportname == 'simple6': 

173 from .transports.simple6 import MessageInterfaceSimple6 

174 await self._append_tokenmanaged_messagemanaged_transport( 

175 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

176 elif transportname == 'tinydtls': 

177 from .transports.tinydtls import MessageInterfaceTinyDTLS 

178 await self._append_tokenmanaged_messagemanaged_transport( 

179 

180 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

181 elif transportname == 'tcpclient': 

182 from .transports.tcp import TCPClient 

183 await self._append_tokenmanaged_transport( 

184 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)) 

185 elif transportname == 'tlsclient': 

186 from .transports.tls import TLSClient 

187 await self._append_tokenmanaged_transport( 

188 lambda tman: TLSClient.create_client_transport(tman, self.log, loop, self.client_credentials)) 

189 elif transportname == 'ws': 

190 from .transports.ws import WSPool 

191 await self._append_tokenmanaged_transport( 

192 lambda tman: WSPool.create_transport(tman, self.log, loop, client_credentials=self.client_credentials)) 

193 elif transportname == 'oscore': 

194 from .transports.oscore import TransportOSCORE 

195 oscoretransport = TransportOSCORE(self, self) 

196 self.request_interfaces.append(oscoretransport) 

197 else: 

198 raise RuntimeError("Transport %r not know for client context creation" % transportname) 

199 

200 return self 

201 

202 @classmethod 

203 async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None, multicast=[], server_credentials=None, transports: Optional[List[str]] = None): 

204 """Create a context, bound to all addresses on the CoAP port (unless 

205 otherwise specified in the ``bind`` argument). 

206 

207 This is the easiest way to get a context suitable both for sending 

208 client and accepting server requests. 

209 

210 The ``bind`` argument, if given, needs to be a 2-tuple of IP address 

211 string and port number, where the port number can be None to use the default port. 

212 

213 If ``multicast`` is given, it needs to be a list of (multicast address, 

214 interface name) tuples, which will all be joined. (The IPv4 style of 

215 selecting the interface by a local address is not supported; users may 

216 want to use the netifaces package to arrive at an interface name for an 

217 address). 

218 

219 As a shortcut, the list may also contain interface names alone. Those 

220 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with 

221 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6. 

222 

223 Under some circumstances you may already need a context to pass into 

224 the site for creation; this is typically the case for servers that 

225 trigger requests on their own. For those cases, it is usually easiest 

226 to pass None in as a site, and set the fully constructed site later by 

227 assigning to the ``serversite`` attribute. 

228 """ 

229 

230 if loop is None: 

231 loop = asyncio.get_event_loop() 

232 

233 self = cls(loop=loop, serversite=site, loggername=loggername, server_credentials=server_credentials) 

234 

235 multicast_done = not multicast 

236 

237 selected_transports = transports or defaults.get_default_servertransports(loop=loop) 

238 

239 for transportname in selected_transports: 

240 if transportname == 'udp6': 

241 from .transports.udp6 import MessageInterfaceUDP6 

242 

243 await self._append_tokenmanaged_messagemanaged_transport( 

244 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind, multicast=multicast)) 

245 multicast_done = True 

246 # FIXME this is duplicated from the client version, as those are client-only anyway 

247 elif transportname == 'simple6': 

248 from .transports.simple6 import MessageInterfaceSimple6 

249 await self._append_tokenmanaged_messagemanaged_transport( 

250 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

251 elif transportname == 'tinydtls': 

252 from .transports.tinydtls import MessageInterfaceTinyDTLS 

253 

254 await self._append_tokenmanaged_messagemanaged_transport( 

255 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

256 # FIXME end duplication 

257 elif transportname == 'tinydtls_server': 

258 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer 

259 

260 await self._append_tokenmanaged_messagemanaged_transport( 

261 lambda mman: MessageInterfaceTinyDTLSServer.create_server(bind, mman, log=self.log, loop=loop, server_credentials=self.server_credentials)) 

262 elif transportname == 'simplesocketserver': 

263 from .transports.simplesocketserver import MessageInterfaceSimpleServer 

264 await self._append_tokenmanaged_messagemanaged_transport( 

265 lambda mman: MessageInterfaceSimpleServer.create_server(bind, mman, log=self.log, loop=loop)) 

266 elif transportname == 'tcpserver': 

267 from .transports.tcp import TCPServer 

268 await self._append_tokenmanaged_transport( 

269 lambda tman: TCPServer.create_server(bind, tman, self.log, loop)) 

270 elif transportname == 'tcpclient': 

271 from .transports.tcp import TCPClient 

272 await self._append_tokenmanaged_transport( 

273 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)) 

274 elif transportname == 'tlsserver': 

275 if _ssl_context is not None: 

276 from .transports.tls import TLSServer 

277 await self._append_tokenmanaged_transport( 

278 lambda tman: TLSServer.create_server(bind, tman, self.log, loop, _ssl_context)) 

279 elif transportname == 'tlsclient': 

280 from .transports.tls import TLSClient 

281 await self._append_tokenmanaged_transport( 

282 lambda tman: TLSClient.create_client_transport(tman, self.log, loop, self.client_credentials)) 

283 elif transportname == 'ws': 

284 from .transports.ws import WSPool 

285 await self._append_tokenmanaged_transport( 

286 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind 

287 lambda tman: WSPool.create_transport(tman, self.log, loop, client_credentials=self.client_credentials, server_bind=bind or (None, None), server_context=_ssl_context)) 

288 elif transportname == 'oscore': 

289 from .transports.oscore import TransportOSCORE 

290 oscoretransport = TransportOSCORE(self, self) 

291 self.request_interfaces.append(oscoretransport) 

292 else: 

293 raise RuntimeError("Transport %r not know for server context creation" % transportname) 

294 

295 if not multicast_done: 

296 self.log.warning("Multicast was requested, but no multicast capable transport was selected.") 

297 

298 # This is used in tests to wait for externally launched servers to be ready 

299 self.log.debug("Server ready to receive requests") 

300 

301 return self 

302 

303 async def shutdown(self): 

304 """Take down any listening sockets and stop all related timers. 

305 

306 After this coroutine terminates, and once all external references to 

307 the object are dropped, it should be garbage-collectable. 

308 

309 This method takes up to 

310 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing 

311 transports to perform any cleanup implemented in them (such as orderly 

312 connection shutdown and cancelling observations, where the latter is 

313 currently not implemented).""" 

314 

315 self.log.debug("Shutting down context") 

316 

317 done, pending = await asyncio.wait([ 

318 asyncio.create_task( 

319 ri.shutdown(), 

320 **py38args(name="Shutdown of %r" % ri) 

321 ) 

322 for ri 

323 in self.request_interfaces], 

324 timeout=SHUTDOWN_TIMEOUT) 

325 for item in done: 

326 await item 

327 

328 # FIXME: determine how official this should be, or which part of it is 

329 # public -- now that BlockwiseRequest uses it. (And formalize what can 

330 # change about messages and what can't after the remote has been thusly 

331 # populated). 

332 async def find_remote_and_interface(self, message): 

333 for ri in self.request_interfaces: 

334 if await ri.fill_or_recognize_remote(message): 

335 return ri 

336 raise RuntimeError("No request interface could route message") 

337 

338 def request(self, request_message, handle_blockwise=True): 

339 if handle_blockwise: 

340 return BlockwiseRequest(self, request_message) 

341 

342 pipe = Pipe(request_message, self.log) 

343 # Request sets up callbacks at creation 

344 result = Request(pipe, self.loop, self.log) 

345 

346 async def send(): 

347 try: 

348 request_interface = await self.find_remote_and_interface(request_message) 

349 request_interface.request(pipe) 

350 except Exception as e: 

351 pipe.add_exception(e) 

352 return 

353 self.loop.create_task( 

354 send(), 

355 **py38args(name="Request processing of %r" % result) 

356 ) 

357 return result 

358 

359 # the following are under consideration for moving into Site or something 

360 # mixed into it 

361 

362 def render_to_pipe(self, pipe): 

363 """Fill a pipe by running the site's render_to_pipe interface and 

364 handling errors.""" 

365 

366 pr_that_can_receive_errors = error_to_message(pipe, self.log) 

367 

368 run_driving_pipe( 

369 pr_that_can_receive_errors, 

370 self._render_to_pipe(pipe), 

371 name="Rendering for %r" % pipe.request, 

372 ) 

373 

374 async def _render_to_pipe(self, pipe): 

375 if self.serversite is None: 

376 pipe.add_response(Message(code=NOT_FOUND, payload=b"not a server"), is_last=True) 

377 return 

378 

379 return await self.serversite.render_to_pipe(pipe) 

380 

381class BaseRequest(object): 

382 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`""" 

383 

384class BaseUnicastRequest(BaseRequest): 

385 """A utility class that offers the :attr:`response_raising` and 

386 :attr:`response_nonraising` alternatives to waiting for the 

387 :attr:`response` future whose error states can be presented either as an 

388 unsuccessful response (eg. 4.04) or an exception. 

389 

390 It also provides some internal tools for handling anything that has a 

391 :attr:`response` future and an :attr:`observation`""" 

392 

393 @property 

394 async def response_raising(self): 

395 """An awaitable that returns if a response comes in and is successful, 

396 otherwise raises generic network exception or a 

397 :class:`.error.ResponseWrappingError` for unsuccessful responses. 

398 

399 Experimental Interface.""" 

400 

401 response = await self.response 

402 if not response.code.is_successful(): 

403 raise error.ResponseWrappingError(response) 

404 

405 return response 

406 

407 @property 

408 async def response_nonraising(self): 

409 """An awaitable that rather returns a 500ish fabricated message (as a 

410 proxy would return) instead of raising an exception. 

411 

412 Experimental Interface.""" 

413 

414 # FIXME: Can we smuggle error_to_message into the underlying pipe? 

415 # That should make observe notifications into messages rather 

416 # than exceptions as well, plus it has fallbacks for `e.to_message()` 

417 # raising. 

418 

419 try: 

420 return await self.response 

421 except error.RenderableError as e: 

422 return e.to_message() 

423 except Exception: 

424 return Message(code=INTERNAL_SERVER_ERROR) 

425 

426class Request(interfaces.Request, BaseUnicastRequest): 

427 

428 # FIXME: Implement timing out with REQUEST_TIMEOUT here 

429 

430 def __init__(self, pipe, loop, log): 

431 self._pipe = pipe 

432 

433 self.response = loop.create_future() 

434 

435 if pipe.request.opt.observe == 0: 

436 self.observation = ClientObservation() 

437 else: 

438 self.observation = None 

439 

440 self._runner = self._run() 

441 self._runner.send(None) 

442 def process(event): 

443 try: 

444 self._runner.send(event) 

445 return True 

446 except StopIteration: 

447 return False 

448 self._stop_interest = self._pipe.on_event(process) 

449 

450 self.log = log 

451 

452 self.response.add_done_callback(self._response_cancellation_handler) 

453 

454 def _response_cancellation_handler(self, response): 

455 # Propagate cancellation to the runner (if interest in the first 

456 # response is lost, there won't be observation items to pull out), but 

457 # not general completion (because if it's completed and not cancelled, 

458 # eg. when an observation is active) 

459 if self.response.cancelled() and self._runner is not None: 

460 # Dropping the only reference makes it stop with GeneratorExit, 

461 # similar to a cancelled task 

462 self._runner = None 

463 self._stop_interest() 

464 # But either way we won't be calling _stop_interest any more, so let's 

465 # not keep anything referenced 

466 self._stop_interest = None 

467 

468 @staticmethod 

469 def _add_response_properties(response, request): 

470 response.request = request 

471 

472 def _run(self): 

473 # FIXME: This is in iterator form because it used to be a task that 

474 # awaited futures, and that code could be easily converted to an 

475 # iterator. I'm not sure that's a bad state here, but at least it 

476 # should be a more conscious decision to make this an iterator rather 

477 # than just having it happen to be one. 

478 # 

479 # FIXME: check that responses come from the same remmote as long as we're assuming unicast 

480 

481 first_event = yield None 

482 

483 if first_event.message is not None: 

484 self._add_response_properties(first_event.message, self._pipe.request) 

485 self.response.set_result(first_event.message) 

486 else: 

487 self.response.set_exception(first_event.exception) 

488 if not isinstance(first_event.exception, error.Error): 

489 self.log.warning( 

490 "An exception that is not an aiocoap Error was raised " 

491 "from a transport; please report this as a bug in " 

492 "aiocoap: %r", first_event.exception) 

493 

494 if self.observation is None: 

495 if not first_event.is_last: 

496 self.log.error("Pipe indicated more possible responses" 

497 " while the Request handler would not know what to" 

498 " do with them, stopping any further request.") 

499 self._pipe.stop_interest() 

500 return 

501 

502 if first_event.is_last: 

503 self.observation.error(error.NotObservable()) 

504 return 

505 

506 if first_event.message.opt.observe is None: 

507 self.log.error("Pipe indicated more possible responses" 

508 " while the Request handler would not know what to" 

509 " do with them, stopping any further request.") 

510 self._pipe.stop_interest() 

511 return 

512 

513 # variable names from RFC7641 Section 3.4 

514 v1 = first_event.message.opt.observe 

515 t1 = time.time() 

516 

517 while True: 

518 # We don't really support cancellation of observations yet (see 

519 # https://github.com/chrysn/aiocoap/issues/92), but at least 

520 # stopping the interest is a way to free the local resources after 

521 # the first observation update, and to make the MID handler RST the 

522 # observation on the next. 

523 # FIXME: there *is* now a .on_cancel callback, we should at least 

524 # hook into that, and possibly even send a proper cancellation 

525 # then. 

526 next_event = yield True 

527 if self.observation.cancelled: 

528 self._pipe.stop_interest() 

529 return 

530 

531 if next_event.exception is not None: 

532 self.observation.error(next_event.exception) 

533 if not next_event.is_last: 

534 self._pipe.stop_interest() 

535 if not isinstance(next_event.exception, error.Error): 

536 self.log.warning( 

537 "An exception that is not an aiocoap Error was " 

538 "raised from a transport during an observation; " 

539 "please report this as a bug in aiocoap: %r", 

540 next_event.exception) 

541 return 

542 

543 self._add_response_properties(next_event.message, self._pipe.request) 

544 

545 if next_event.message.opt.observe is not None: 

546 # check for reordering 

547 v2 = next_event.message.opt.observe 

548 t2 = time.time() 

549 

550 is_recent = (v1 < v2 and v2 - v1 < 2**23) or \ 

551 (v1 > v2 and v1 - v2 > 2**23) or \ 

552 (t2 > t1 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME) 

553 if is_recent: 

554 t1 = t2 

555 v1 = v2 

556 else: 

557 # the terminal message is always the last 

558 is_recent = True 

559 

560 if is_recent: 

561 self.observation.callback(next_event.message) 

562 

563 if next_event.is_last: 

564 self.observation.error(error.ObservationCancelled()) 

565 return 

566 

567 if next_event.message.opt.observe is None: 

568 self.observation.error(error.ObservationCancelled()) 

569 self.log.error("Pipe indicated more possible responses" 

570 " while the Request handler would not know what to" 

571 " do with them, stopping any further request.") 

572 self._pipe.stop_interest() 

573 return 

574 

575 

576class BlockwiseRequest(BaseUnicastRequest, interfaces.Request): 

577 def __init__(self, protocol, app_request): 

578 self.protocol = protocol 

579 self.log = self.protocol.log.getChild("blockwise-requester") 

580 

581 self.response = protocol.loop.create_future() 

582 

583 if app_request.opt.observe is not None: 

584 self.observation = ClientObservation() 

585 else: 

586 self.observation = None 

587 

588 self._runner = protocol.loop.create_task(self._run_outer( 

589 app_request, 

590 self.response, 

591 weakref.ref(self.observation) if self.observation is not None else lambda: None, 

592 self.protocol, 

593 self.log, 

594 ), 

595 **py38args(name="Blockwise runner for %r" % app_request)) 

596 self.response.add_done_callback(self._response_cancellation_handler) 

597 

598 def _response_cancellation_handler(self, response_future): 

599 # see Request._response_cancellation_handler 

600 if self.response.cancelled(): 

601 self._runner.cancel() 

602 

603 @classmethod 

604 async def _run_outer(cls, app_request, response, weak_observation, protocol, log): 

605 try: 

606 await cls._run(app_request, response, weak_observation, protocol, log) 

607 except asyncio.CancelledError: 

608 pass # results already set 

609 except Exception as e: 

610 logged = False 

611 if not response.done(): 

612 logged = True 

613 response.set_exception(e) 

614 obs = weak_observation() 

615 if app_request.opt.observe is not None and obs is not None: 

616 logged = True 

617 obs.error(e) 

618 if not logged: 

619 # should be unreachable 

620 log.error("Exception in BlockwiseRequest runner neither went to response nor to observation: %s", e, exc_info=e) 

621 

622 # This is a class method because that allows self and self.observation to 

623 # be freed even when this task is running, and the task to stop itself -- 

624 # otherwise we couldn't know when users just "forget" about a request 

625 # object after using its response (esp. in observe cases) and leave this 

626 # task running. 

627 @classmethod 

628 async def _run(cls, app_request, response, weak_observation, protocol, log): 

629 # we need to populate the remote right away, because the choice of 

630 # blocks depends on it. 

631 await protocol.find_remote_and_interface(app_request) 

632 

633 size_exp = app_request.remote.maximum_block_size_exp 

634 

635 if app_request.opt.block1 is not None: 

636 assert app_request.opt.block1.block_number == 0, "Unexpected block number in app_request" 

637 assert not app_request.opt.block1.more, "Unexpected more-flag in app_request" 

638 # this is where the library user can traditionally pass in size 

639 # exponent hints into the library. 

640 size_exp = app_request.opt.block1.size_exponent 

641 

642 # Offset in the message in blocks of size_exp. Whoever changes size_exp 

643 # is responsible for updating this number. 

644 block_cursor = 0 

645 

646 while True: 

647 # ... send a chunk 

648 

649 if size_exp >= 6: 

650 # FIXME from maximum_payload_size 

651 fragmentation_threshold = app_request.remote.maximum_payload_size 

652 else: 

653 fragmentation_threshold = (2 ** (size_exp + 4)) 

654 

655 if app_request.opt.block1 is not None or \ 

656 len(app_request.payload) > fragmentation_threshold: 

657 current_block1 = app_request._extract_block( 

658 block_cursor, 

659 size_exp, 

660 app_request.remote.maximum_payload_size) 

661 if block_cursor == 0: 

662 current_block1.opt.size1 = len(app_request.payload) 

663 else: 

664 current_block1 = app_request 

665 

666 blockrequest = protocol.request(current_block1, handle_blockwise=False) 

667 blockresponse = await blockrequest.response 

668 

669 # store for future blocks to ensure that the next blocks will be 

670 # sent from the same source address (in the UDP case; for many 

671 # other transports it won't matter). 

672 app_request.remote = blockresponse.remote 

673 

674 if blockresponse.opt.block1 is None: 

675 if blockresponse.code.is_successful() and current_block1.opt.block1: 

676 log.warning("Block1 option completely ignored by server, assuming it knows what it is doing.") 

677 # FIXME: handle 4.13 and retry with the indicated size option 

678 break 

679 

680 block1 = blockresponse.opt.block1 

681 log.debug("Response with Block1 option received, number = %d, more = %d, size_exp = %d.", block1.block_number, block1.more, block1.size_exponent) 

682 

683 if block1.block_number != current_block1.opt.block1.block_number: 

684 raise error.UnexpectedBlock1Option("Block number mismatch") 

685 

686 if size_exp == 7: 

687 block_cursor += len(current_block1.payload) // 1024 

688 else: 

689 block_cursor += 1 

690 

691 while block1.size_exponent < size_exp: 

692 block_cursor *= 2 

693 size_exp -= 1 

694 

695 if not current_block1.opt.block1.more: 

696 if block1.more or blockresponse.code == CONTINUE: 

697 # treating this as a protocol error -- letting it slip 

698 # through would misrepresent the whole operation as an 

699 # over-all 2.xx (successful) one. 

700 raise error.UnexpectedBlock1Option("Server asked for more data at end of body") 

701 break 

702 

703 # checks before preparing the next round: 

704 

705 if blockresponse.opt.observe: 

706 # we're not *really* interested in that block, we just sent an 

707 # observe option to indicate that we'll want to observe the 

708 # resulting representation as a whole 

709 log.warning("Server answered Observe in early Block1 phase, cancelling the erroneous observation.") 

710 blockrequest.observe.cancel() 

711 

712 if block1.more: 

713 # FIXME i think my own server is dowing this wrong 

714 #if response.code != CONTINUE: 

715 # raise error.UnexpectedBlock1Option("more-flag set but no Continue") 

716 pass 

717 else: 

718 if not blockresponse.code.is_successful(): 

719 break 

720 else: 

721 # ignoring (discarding) the successul intermediate result, waiting for a final one 

722 continue 

723 

724 lower_observation = None 

725 if app_request.opt.observe is not None: 

726 if blockresponse.opt.observe is not None: 

727 lower_observation = blockrequest.observation 

728 else: 

729 obs = weak_observation() 

730 if obs: 

731 obs.error(error.NotObservable()) 

732 del obs 

733 

734 assert blockresponse is not None, "Block1 loop broke without setting a response" 

735 blockresponse.opt.block1 = None 

736 

737 # FIXME check with RFC7959: it just says "send requests similar to the 

738 # requests in the Block1 phase", what does that mean? using the last 

739 # block1 as a reference for now, especially because in the 

740 # only-one-request-block case, that's the original request we must send 

741 # again and again anyway 

742 assembled_response = await cls._complete_by_requesting_block2(protocol, current_block1, blockresponse, log) 

743 

744 response.set_result(assembled_response) 

745 # finally set the result 

746 

747 if lower_observation is not None: 

748 # FIXME this can all be simplified a lot since it's no more 

749 # expected that observations shut themselves down when GC'd. 

750 obs = weak_observation() 

751 del weak_observation 

752 if obs is None: 

753 lower_observation.cancel() 

754 return 

755 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask 

756 subtask = asyncio.create_task( 

757 cls._run_observation( 

758 app_request, 

759 lower_observation, 

760 future_weak_observation, 

761 protocol, 

762 log), 

763 **py38args(name="Blockwise observation for %r" % app_request) 

764 ) 

765 future_weak_observation.set_result(weakref.ref(obs, lambda obs: subtask.cancel())) 

766 obs.on_cancel(subtask.cancel) 

767 del obs 

768 await subtask 

769 

770 @classmethod 

771 async def _run_observation(cls, original_request, lower_observation, future_weak_observation, protocol, log): 

772 weak_observation = await future_weak_observation 

773 # we can use weak_observation() here at any time, because whenever that 

774 # becomes None, this task gets cancelled 

775 try: 

776 async for block1_notification in lower_observation: 

777 log.debug("Notification received") 

778 full_notification = await cls._complete_by_requesting_block2(protocol, original_request, block1_notification, log) 

779 log.debug("Reporting completed notification") 

780 weak_observation().callback(full_notification) 

781 # FIXME verify that this loop actually ends iff the observation 

782 # was cancelled -- otherwise find out the cause(s) or make it not 

783 # cancel under indistinguishable circumstances 

784 weak_observation().error(error.ObservationCancelled()) 

785 except asyncio.CancelledError: 

786 return 

787 except Exception as e: 

788 weak_observation().error(e) 

789 

790 @classmethod 

791 async def _complete_by_requesting_block2(cls, protocol, request_to_repeat, initial_response, log): 

792 # FIXME this can probably be deduplicated against BlockwiseRequest 

793 

794 if initial_response.opt.block2 is None or initial_response.opt.block2.more is False: 

795 initial_response.opt.block2 = None 

796 return initial_response 

797 

798 if initial_response.opt.block2.block_number != 0: 

799 log.error("Error assembling blockwise response (expected first block)") 

800 raise error.UnexpectedBlock2() 

801 

802 assembled_response = initial_response 

803 last_response = initial_response 

804 while True: 

805 current_block2 = request_to_repeat._generate_next_block2_request(assembled_response) 

806 

807 current_block2 = current_block2.copy(remote=initial_response.remote) 

808 

809 blockrequest = protocol.request(current_block2, handle_blockwise=False) 

810 last_response = await blockrequest.response 

811 

812 if last_response.opt.block2 is None: 

813 log.warning("Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response.") 

814 return last_response 

815 

816 block2 = last_response.opt.block2 

817 log.debug("Response with Block2 option received, number = %d, more = %d, size_exp = %d.", block2.block_number, block2.more, block2.size_exponent) 

818 try: 

819 assembled_response._append_response_block(last_response) 

820 except error.Error as e: 

821 log.error("Error assembling blockwise response, passing on error %r", e) 

822 raise 

823 

824 if block2.more is False: 

825 return assembled_response 

826 

827class ClientObservation: 

828 """An interface to observe notification updates arriving on a request. 

829 

830 This class does not actually provide any of the observe functionality, it 

831 is purely a container for dispatching the messages via callbacks or 

832 asynchronous iteration. It gets driven (ie. populated with responses or 

833 errors including observation termination) by a Request object. 

834 """ 

835 def __init__(self): 

836 self.callbacks = [] 

837 self.errbacks = [] 

838 

839 self.cancelled = False 

840 self._on_cancel = [] 

841 

842 self._latest_response = None 

843 # the analogous error is stored in _cancellation_reason when cancelled. 

844 

845 def __aiter__(self): 

846 """`async for` interface to observations. Currently, this still loses 

847 information to the application (the reason for the termination is 

848 unclear). 

849 

850 Experimental Interface.""" 

851 it = self._Iterator() 

852 self.register_callback(it.push) 

853 self.register_errback(it.push_err) 

854 return it 

855 

856 class _Iterator: 

857 def __init__(self): 

858 self._future = asyncio.get_event_loop().create_future() 

859 

860 def push(self, item): 

861 if self._future.done(): 

862 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

863 self._future = asyncio.get_event_loop().create_future() 

864 self._future.set_result(item) 

865 

866 def push_err(self, e): 

867 if self._future.done(): 

868 self._future = asyncio.get_event_loop().create_future() 

869 self._future.set_exception(e) 

870 

871 async def __anext__(self): 

872 f = self._future 

873 try: 

874 result = await self._future 

875 # FIXME see `await servobs._trigger` comment: might waiting for 

876 # the original future not yield the first future's result when 

877 # a quick second future comes in in a push? 

878 if f is self._future: 

879 self._future = asyncio.get_event_loop().create_future() 

880 return result 

881 except (error.NotObservable, error.ObservationCancelled): 

882 # only exit cleanly when the server -- right away or later -- 

883 # states that the resource is not observable any more 

884 # FIXME: check whether an unsuccessful message is still passed 

885 # as an observation result (or whether it should be) 

886 raise StopAsyncIteration 

887 

888 def __del__(self): 

889 if self._future.done(): 

890 try: 

891 # Fetch the result so any errors show up at least in the 

892 # finalizer output 

893 self._future.result() 

894 except (error.ObservationCancelled, error.NotObservable): 

895 # This is the case at the end of an observation cancelled 

896 # by the server. 

897 pass 

898 except (error.LibraryShutdown, asyncio.CancelledError): 

899 pass 

900 

901 def register_callback(self, callback): 

902 """Call the callback whenever a response to the message comes in, and 

903 pass the response to it.""" 

904 if self.cancelled: 

905 return 

906 

907 self.callbacks.append(callback) 

908 if self._latest_response is not None: 

909 callback(self._latest_response) 

910 

911 def register_errback(self, callback): 

912 """Call the callback whenever something goes wrong with the 

913 observation, and pass an exception to the callback. After such a 

914 callback is called, no more callbacks will be issued.""" 

915 if self.cancelled: 

916 callback(self._cancellation_reason) 

917 return 

918 self.errbacks.append(callback) 

919 

920 def callback(self, response): 

921 """Notify all listeners of an incoming response""" 

922 

923 self._latest_response = response 

924 

925 for c in self.callbacks: 

926 c(response) 

927 

928 def error(self, exception): 

929 """Notify registered listeners that the observation went wrong. This 

930 can only be called once.""" 

931 

932 for c in self.errbacks: 

933 c(exception) 

934 

935 self.cancel() 

936 self._cancellation_reason = exception 

937 

938 def cancel(self): 

939 # FIXME determine whether this is called by anything other than error, 

940 # and make it private so there is always a _cancellation_reason 

941 """Cease to generate observation or error events. This will not 

942 generate an error by itself.""" 

943 

944 assert not self.cancelled 

945 

946 # make sure things go wrong when someone tries to continue this 

947 self.errbacks = None 

948 self.callbacks = None 

949 

950 self.cancelled = True 

951 while self._on_cancel: 

952 self._on_cancel.pop()() 

953 

954 self._cancellation_reason = None 

955 

956 def on_cancel(self, callback): 

957 if self.cancelled: 

958 callback() 

959 self._on_cancel.append(callback) 

960 

961 def __repr__(self): 

962 return '<%s %s at %#x>' % (type(self).__name__, "(cancelled)" if self.cancelled else "(%s call-, %s errback(s))" % (len(self.callbacks), len(self.errbacks)), id(self)) 

963 

964class ServerObservation: 

965 def __init__(self): 

966 self._accepted = False 

967 self._trigger = asyncio.get_event_loop().create_future() 

968 # A deregistration is "early" if it happens before the response message 

969 # is actually sent; calling deregister() in that time (typically during 

970 # `render()`) will not send an unsuccessful response message but just 

971 # sent this flag which is set to None as soon as it is too late for an 

972 # early deregistration. 

973 # This mechanism is temporary until more of aiocoap behaves like 

974 # Pipe which does not suffer from this limitation. 

975 self._early_deregister = False 

976 self._late_deregister = False 

977 

978 def accept(self, cancellation_callback): 

979 self._accepted = True 

980 self._cancellation_callback = cancellation_callback 

981 

982 def deregister(self, reason=None): 

983 if self._early_deregister is False: 

984 self._early_deregister = True 

985 return 

986 

987 warnings.warn("Late use of ServerObservation.deregister() is" 

988 " deprecated, use .trigger with an unsuccessful value" 

989 " instead", 

990 DeprecationWarning) 

991 self.trigger(Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable")) 

992 

993 def trigger(self, response=None, *, is_last=False): 

994 """Send an updated response; if None is given, the observed resource's 

995 rendering will be invoked to produce one. 

996 

997 `is_last` can be set to True to indicate that no more responses will be 

998 sent. Note that an unsuccessful response will be the last no matter 

999 what is_last says, as such a message always terminates a CoAP 

1000 observation.""" 

1001 if is_last: 

1002 self._late_deregister = True 

1003 if self._trigger.done(): 

1004 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1005 self._trigger = asyncio.get_event_loop().create_future() 

1006 self._trigger.set_result(response)