Coverage for aiocoap/protocol.py: 86%

490 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-10 11:47 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the classes that are responsible for keeping track of 

6messages: 

7 

8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP 

9 socket) -- something that can send requests and possibly can answer 

10 incoming requests. 

11 

12 Incoming requests are processed in tasks created by the context. 

13 

14* a :class:`Request` gets generated whenever a request gets sent to keep 

15 track of the response 

16 

17Logging 

18~~~~~~~ 

19 

20Several constructors of the Context accept a logger name; these names go into 

21the construction of a Python logger. 

22 

23Log events will be emitted to these on different levels, with "warning" and 

24above being a practical default for things that should may warrant reviewing by 

25an operator: 

26 

27* DEBUG is used for things that occur even under perfect conditions. 

28* INFO is for things that are well expected, but might be interesting during 

29 testing a network of nodes and not just when debugging the library. (This 

30 includes timeouts, retransmissions, and pings.) 

31* WARNING is for everything that indicates a malbehaved peer. These don't 

32 *necessarily* indicate a client bug, though: Things like requesting a 

33 nonexistent block can just as well happen when a resource's content has 

34 changed between blocks. The library will not go out of its way to determine 

35 whether there is a plausible explanation for the odd behavior, and will 

36 report something as a warning in case of doubt. 

37* ERROR is used when something clearly went wrong. This includes irregular 

38 connection terminations and resource handler errors (which are demoted to 

39 error responses), and can often contain a backtrace. 

40""" 

41 

42import asyncio 

43import weakref 

44import time 

45from typing import Optional, List 

46 

47from . import defaults 

48from .credentials import CredentialsMap 

49from .message import Message 

50from .messagemanager import MessageManager 

51from .tokenmanager import TokenManager 

52from .pipe import Pipe, run_driving_pipe, error_to_message 

53from . import interfaces 

54from . import error 

55from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT 

56 

57import warnings 

58import logging 

59 

60 

61class Context(interfaces.RequestProvider): 

62 """Applications' entry point to the network 

63 

64 A :class:`.Context` coordinates one or more network :mod:`.transports` 

65 implementations and dispatches data between them and the application. 

66 

67 The application can start requests using the message dispatch methods, and 

68 set a :class:`resources.Site` that will answer requests directed to the 

69 application as a server. 

70 

71 On the library-internals side, it is the prime implementation of the 

72 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and 

73 :class:`Response` classes on demand, and decides which transport 

74 implementations to start and which are to handle which messages. 

75 

76 **Context creation and destruction** 

77 

78 The following functions are provided for creating and stopping a context: 

79 

80 .. note:: 

81 

82 A typical application should only ever create one context, even (or 

83 especially when) it acts both as a server and as a client (in which 

84 case a server context should be created). 

85 

86 A context that is not used any more must be shut down using 

87 :meth:`.shutdown()`, but typical applications will not need to because 

88 they use the context for the full process lifetime. 

89 

90 .. automethod:: create_client_context 

91 .. automethod:: create_server_context 

92 

93 .. automethod:: shutdown 

94 

95 **Dispatching messages** 

96 

97 CoAP requests can be sent using the following functions: 

98 

99 .. automethod:: request 

100 

101 If more control is needed, you can create a :class:`Request` yourself and 

102 pass the context to it. 

103 

104 

105 **Other methods and properties** 

106 

107 The remaining methods and properties are to be considered unstable even 

108 when the project reaches a stable version number; please file a feature 

109 request for stabilization if you want to reliably access any of them. 

110 """ 

111 

112 def __init__( 

113 self, 

114 loop=None, 

115 serversite=None, 

116 loggername="coap", 

117 client_credentials=None, 

118 server_credentials=None, 

119 ): 

120 self.log = logging.getLogger(loggername) 

121 

122 self.loop = loop or asyncio.get_event_loop() 

123 

124 self.serversite = serversite 

125 

126 self.request_interfaces = [] 

127 

128 self.client_credentials = client_credentials or CredentialsMap() 

129 self.server_credentials = server_credentials or CredentialsMap() 

130 

131 # 

132 # convenience methods for class instanciation 

133 # 

134 

135 async def _append_tokenmanaged_messagemanaged_transport( 

136 self, message_interface_constructor 

137 ): 

138 tman = TokenManager(self) 

139 mman = MessageManager(tman) 

140 transport = await message_interface_constructor(mman) 

141 

142 mman.message_interface = transport 

143 tman.token_interface = mman 

144 

145 self.request_interfaces.append(tman) 

146 

147 async def _append_tokenmanaged_transport(self, token_interface_constructor): 

148 tman = TokenManager(self) 

149 transport = await token_interface_constructor(tman) 

150 

151 tman.token_interface = transport 

152 

153 self.request_interfaces.append(tman) 

154 

155 @classmethod 

156 async def create_client_context( 

157 cls, *, loggername="coap", loop=None, transports: Optional[List[str]] = None 

158 ): 

159 """Create a context bound to all addresses on a random listening port. 

160 

161 This is the easiest way to get a context suitable for sending client 

162 requests. 

163 

164 :meta private: 

165 (not actually private, just hiding from automodule due to being 

166 grouped with the important functions) 

167 """ 

168 

169 if loop is None: 

170 loop = asyncio.get_event_loop() 

171 

172 self = cls(loop=loop, serversite=None, loggername=loggername) 

173 

174 selected_transports = transports or defaults.get_default_clienttransports( 

175 loop=loop 

176 ) 

177 

178 # FIXME make defaults overridable (postponed until they become configurable too) 

179 for transportname in selected_transports: 

180 if transportname == "udp6": 

181 from .transports.udp6 import MessageInterfaceUDP6 

182 

183 await self._append_tokenmanaged_messagemanaged_transport( 

184 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint( 

185 mman, log=self.log, loop=loop 

186 ) 

187 ) 

188 elif transportname == "simple6": 

189 from .transports.simple6 import MessageInterfaceSimple6 

190 

191 await self._append_tokenmanaged_messagemanaged_transport( 

192 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

193 mman, log=self.log, loop=loop 

194 ) 

195 ) 

196 elif transportname == "tinydtls": 

197 from .transports.tinydtls import MessageInterfaceTinyDTLS 

198 

199 await self._append_tokenmanaged_messagemanaged_transport( 

200 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

201 mman, log=self.log, loop=loop 

202 ) 

203 ) 

204 elif transportname == "tcpclient": 

205 from .transports.tcp import TCPClient 

206 

207 await self._append_tokenmanaged_transport( 

208 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

209 ) 

210 elif transportname == "tlsclient": 

211 from .transports.tls import TLSClient 

212 

213 await self._append_tokenmanaged_transport( 

214 lambda tman: TLSClient.create_client_transport( 

215 tman, self.log, loop, self.client_credentials 

216 ) 

217 ) 

218 elif transportname == "ws": 

219 from .transports.ws import WSPool 

220 

221 await self._append_tokenmanaged_transport( 

222 lambda tman: WSPool.create_transport( 

223 tman, self.log, loop, client_credentials=self.client_credentials 

224 ) 

225 ) 

226 elif transportname == "oscore": 

227 from .transports.oscore import TransportOSCORE 

228 

229 oscoretransport = TransportOSCORE(self, self) 

230 self.request_interfaces.append(oscoretransport) 

231 else: 

232 raise RuntimeError( 

233 "Transport %r not know for client context creation" % transportname 

234 ) 

235 

236 return self 

237 

238 @classmethod 

239 async def create_server_context( 

240 cls, 

241 site, 

242 bind=None, 

243 *, 

244 loggername="coap-server", 

245 loop=None, 

246 _ssl_context=None, 

247 multicast=[], 

248 server_credentials=None, 

249 transports: Optional[List[str]] = None, 

250 ): 

251 """Create a context, bound to all addresses on the CoAP port (unless 

252 otherwise specified in the ``bind`` argument). 

253 

254 This is the easiest way to get a context suitable both for sending 

255 client and accepting server requests. 

256 

257 The ``bind`` argument, if given, needs to be a 2-tuple of IP address 

258 string and port number, where the port number can be None to use the default port. 

259 

260 If ``multicast`` is given, it needs to be a list of (multicast address, 

261 interface name) tuples, which will all be joined. (The IPv4 style of 

262 selecting the interface by a local address is not supported; users may 

263 want to use the netifaces package to arrive at an interface name for an 

264 address). 

265 

266 As a shortcut, the list may also contain interface names alone. Those 

267 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with 

268 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6. 

269 

270 Under some circumstances you may already need a context to pass into 

271 the site for creation; this is typically the case for servers that 

272 trigger requests on their own. For those cases, it is usually easiest 

273 to pass None in as a site, and set the fully constructed site later by 

274 assigning to the ``serversite`` attribute. 

275 

276 :meta private: 

277 (not actually private, just hiding from automodule due to being 

278 grouped with the important functions) 

279 """ 

280 

281 if loop is None: 

282 loop = asyncio.get_event_loop() 

283 

284 self = cls( 

285 loop=loop, 

286 serversite=site, 

287 loggername=loggername, 

288 server_credentials=server_credentials, 

289 ) 

290 

291 multicast_done = not multicast 

292 

293 selected_transports = transports or defaults.get_default_servertransports( 

294 loop=loop 

295 ) 

296 

297 for transportname in selected_transports: 

298 if transportname == "udp6": 

299 from .transports.udp6 import MessageInterfaceUDP6 

300 

301 await self._append_tokenmanaged_messagemanaged_transport( 

302 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint( 

303 mman, log=self.log, loop=loop, bind=bind, multicast=multicast 

304 ) 

305 ) 

306 multicast_done = True 

307 # FIXME this is duplicated from the client version, as those are client-only anyway 

308 elif transportname == "simple6": 

309 from .transports.simple6 import MessageInterfaceSimple6 

310 

311 await self._append_tokenmanaged_messagemanaged_transport( 

312 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

313 mman, log=self.log, loop=loop 

314 ) 

315 ) 

316 elif transportname == "tinydtls": 

317 from .transports.tinydtls import MessageInterfaceTinyDTLS 

318 

319 await self._append_tokenmanaged_messagemanaged_transport( 

320 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

321 mman, log=self.log, loop=loop 

322 ) 

323 ) 

324 # FIXME end duplication 

325 elif transportname == "tinydtls_server": 

326 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer 

327 

328 await self._append_tokenmanaged_messagemanaged_transport( 

329 lambda mman: MessageInterfaceTinyDTLSServer.create_server( 

330 bind, 

331 mman, 

332 log=self.log, 

333 loop=loop, 

334 server_credentials=self.server_credentials, 

335 ) 

336 ) 

337 elif transportname == "simplesocketserver": 

338 from .transports.simplesocketserver import MessageInterfaceSimpleServer 

339 

340 await self._append_tokenmanaged_messagemanaged_transport( 

341 lambda mman: MessageInterfaceSimpleServer.create_server( 

342 bind, mman, log=self.log, loop=loop 

343 ) 

344 ) 

345 elif transportname == "tcpserver": 

346 from .transports.tcp import TCPServer 

347 

348 await self._append_tokenmanaged_transport( 

349 lambda tman: TCPServer.create_server(bind, tman, self.log, loop) 

350 ) 

351 elif transportname == "tcpclient": 

352 from .transports.tcp import TCPClient 

353 

354 await self._append_tokenmanaged_transport( 

355 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

356 ) 

357 elif transportname == "tlsserver": 

358 if _ssl_context is not None: 

359 from .transports.tls import TLSServer 

360 

361 await self._append_tokenmanaged_transport( 

362 lambda tman: TLSServer.create_server( 

363 bind, tman, self.log, loop, _ssl_context 

364 ) 

365 ) 

366 elif transportname == "tlsclient": 

367 from .transports.tls import TLSClient 

368 

369 await self._append_tokenmanaged_transport( 

370 lambda tman: TLSClient.create_client_transport( 

371 tman, self.log, loop, self.client_credentials 

372 ) 

373 ) 

374 elif transportname == "ws": 

375 from .transports.ws import WSPool 

376 

377 await self._append_tokenmanaged_transport( 

378 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind 

379 lambda tman: WSPool.create_transport( 

380 tman, 

381 self.log, 

382 loop, 

383 client_credentials=self.client_credentials, 

384 server_bind=bind or (None, None), 

385 server_context=_ssl_context, 

386 ) 

387 ) 

388 elif transportname == "oscore": 

389 from .transports.oscore import TransportOSCORE 

390 

391 oscoretransport = TransportOSCORE(self, self) 

392 self.request_interfaces.append(oscoretransport) 

393 else: 

394 raise RuntimeError( 

395 "Transport %r not know for server context creation" % transportname 

396 ) 

397 

398 if not multicast_done: 

399 self.log.warning( 

400 "Multicast was requested, but no multicast capable transport was selected." 

401 ) 

402 

403 # This is used in tests to wait for externally launched servers to be ready 

404 self.log.debug("Server ready to receive requests") 

405 

406 return self 

407 

408 async def shutdown(self): 

409 """Take down any listening sockets and stop all related timers. 

410 

411 After this coroutine terminates, and once all external references to 

412 the object are dropped, it should be garbage-collectable. 

413 

414 This method takes up to 

415 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing 

416 transports to perform any cleanup implemented in them (such as orderly 

417 connection shutdown and cancelling observations, where the latter is 

418 currently not implemented). 

419 

420 :meta private: 

421 (not actually private, just hiding from automodule due to being 

422 grouped with the important functions) 

423 """ 

424 

425 self.log.debug("Shutting down context") 

426 

427 done, pending = await asyncio.wait( 

428 [ 

429 asyncio.create_task( 

430 ri.shutdown(), 

431 name="Shutdown of %r" % ri, 

432 ) 

433 for ri in self.request_interfaces 

434 ], 

435 timeout=SHUTDOWN_TIMEOUT, 

436 ) 

437 for item in done: 

438 await item 

439 if pending: 

440 # Apart from being useful to see, this also ensures that developers 

441 # see the error in the logs during test suite runs -- and the error 

442 # should be easier to follow than the "we didn't garbage collect 

443 # everything" errors we see anyway (or otherwise, if the error is 

444 # escalated into a test failure) 

445 self.log.error( 

446 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s", 

447 pending, 

448 ) 

449 

450 # FIXME: determine how official this should be, or which part of it is 

451 # public -- now that BlockwiseRequest uses it. (And formalize what can 

452 # change about messages and what can't after the remote has been thusly 

453 # populated). 

454 async def find_remote_and_interface(self, message): 

455 for ri in self.request_interfaces: 

456 if await ri.fill_or_recognize_remote(message): 

457 return ri 

458 raise RuntimeError("No request interface could route message") 

459 

460 def request(self, request_message, handle_blockwise=True): 

461 if handle_blockwise: 

462 return BlockwiseRequest(self, request_message) 

463 

464 pipe = Pipe(request_message, self.log) 

465 # Request sets up callbacks at creation 

466 result = Request(pipe, self.loop, self.log) 

467 

468 async def send(): 

469 try: 

470 request_interface = await self.find_remote_and_interface( 

471 request_message 

472 ) 

473 request_interface.request(pipe) 

474 except Exception as e: 

475 pipe.add_exception(e) 

476 return 

477 

478 self.loop.create_task( 

479 send(), 

480 name="Request processing of %r" % result, 

481 ) 

482 return result 

483 

484 # the following are under consideration for moving into Site or something 

485 # mixed into it 

486 

487 def render_to_pipe(self, pipe): 

488 """Fill a pipe by running the site's render_to_pipe interface and 

489 handling errors.""" 

490 

491 pr_that_can_receive_errors = error_to_message(pipe, self.log) 

492 

493 run_driving_pipe( 

494 pr_that_can_receive_errors, 

495 self._render_to_pipe(pipe), 

496 name="Rendering for %r" % pipe.request, 

497 ) 

498 

499 async def _render_to_pipe(self, pipe): 

500 if self.serversite is None: 

501 pipe.add_response( 

502 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True 

503 ) 

504 return 

505 

506 return await self.serversite.render_to_pipe(pipe) 

507 

508 

509class BaseRequest(object): 

510 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`""" 

511 

512 

513class BaseUnicastRequest(BaseRequest): 

514 """A utility class that offers the :attr:`response_raising` and 

515 :attr:`response_nonraising` alternatives to waiting for the 

516 :attr:`response` future whose error states can be presented either as an 

517 unsuccessful response (eg. 4.04) or an exception. 

518 

519 It also provides some internal tools for handling anything that has a 

520 :attr:`response` future and an :attr:`observation`""" 

521 

522 @property 

523 async def response_raising(self): 

524 """An awaitable that returns if a response comes in and is successful, 

525 otherwise raises generic network exception or a 

526 :class:`.error.ResponseWrappingError` for unsuccessful responses. 

527 

528 Experimental Interface.""" 

529 

530 response = await self.response 

531 if not response.code.is_successful(): 

532 raise error.ResponseWrappingError(response) 

533 

534 return response 

535 

536 @property 

537 async def response_nonraising(self): 

538 """An awaitable that rather returns a 500ish fabricated message (as a 

539 proxy would return) instead of raising an exception. 

540 

541 Experimental Interface.""" 

542 

543 # FIXME: Can we smuggle error_to_message into the underlying pipe? 

544 # That should make observe notifications into messages rather 

545 # than exceptions as well, plus it has fallbacks for `e.to_message()` 

546 # raising. 

547 

548 try: 

549 return await self.response 

550 except error.RenderableError as e: 

551 return e.to_message() 

552 except Exception: 

553 return Message(code=INTERNAL_SERVER_ERROR) 

554 

555 

556class Request(interfaces.Request, BaseUnicastRequest): 

557 # FIXME: Implement timing out with REQUEST_TIMEOUT here 

558 

559 def __init__(self, pipe, loop, log): 

560 self._pipe = pipe 

561 

562 self.response = loop.create_future() 

563 

564 if pipe.request.opt.observe == 0: 

565 self.observation = ClientObservation() 

566 else: 

567 self.observation = None 

568 

569 self._runner = self._run() 

570 self._runner.send(None) 

571 

572 def process(event): 

573 try: 

574 # would be great to have self or the runner as weak ref, but 

575 # see ClientObservation.register_callback comments -- while 

576 # that is around, we can't weakref here. 

577 self._runner.send(event) 

578 return True 

579 except StopIteration: 

580 return False 

581 

582 self._stop_interest = self._pipe.on_event(process) 

583 

584 self.log = log 

585 

586 self.response.add_done_callback(self._response_cancellation_handler) 

587 

588 def _response_cancellation_handler(self, response): 

589 # Propagate cancellation to the runner (if interest in the first 

590 # response is lost, there won't be observation items to pull out), but 

591 # not general completion (because if it's completed and not cancelled, 

592 # eg. when an observation is active) 

593 if self.response.cancelled() and self._runner is not None: 

594 # Dropping the only reference makes it stop with GeneratorExit, 

595 # similar to a cancelled task 

596 self._runner = None 

597 self._stop_interest() 

598 # Otherwise, there will be a runner still around, and it's its task to 

599 # call _stop_interest. 

600 

601 @staticmethod 

602 def _add_response_properties(response, request): 

603 response.request = request 

604 

605 def _run(self): 

606 # FIXME: This is in iterator form because it used to be a task that 

607 # awaited futures, and that code could be easily converted to an 

608 # iterator. I'm not sure that's a bad state here, but at least it 

609 # should be a more conscious decision to make this an iterator rather 

610 # than just having it happen to be one. 

611 # 

612 # FIXME: check that responses come from the same remmote as long as we're assuming unicast 

613 

614 first_event = yield None 

615 

616 if first_event.message is not None: 

617 self._add_response_properties(first_event.message, self._pipe.request) 

618 self.response.set_result(first_event.message) 

619 else: 

620 self.response.set_exception(first_event.exception) 

621 if not isinstance(first_event.exception, error.Error): 

622 self.log.warning( 

623 "An exception that is not an aiocoap Error was raised " 

624 "from a transport; please report this as a bug in " 

625 "aiocoap: %r", 

626 first_event.exception, 

627 ) 

628 

629 if self.observation is None: 

630 if not first_event.is_last: 

631 self.log.error( 

632 "Pipe indicated more possible responses" 

633 " while the Request handler would not know what to" 

634 " do with them, stopping any further request." 

635 ) 

636 self._stop_interest() 

637 return 

638 

639 if first_event.is_last: 

640 self.observation.error(error.NotObservable()) 

641 return 

642 

643 if first_event.message.opt.observe is None: 

644 self.log.error( 

645 "Pipe indicated more possible responses" 

646 " while the Request handler would not know what to" 

647 " do with them, stopping any further request." 

648 ) 

649 self._stop_interest() 

650 return 

651 

652 # variable names from RFC7641 Section 3.4 

653 v1 = first_event.message.opt.observe 

654 t1 = time.time() 

655 

656 while True: 

657 # We don't really support cancellation of observations yet (see 

658 # https://github.com/chrysn/aiocoap/issues/92), but at least 

659 # stopping the interest is a way to free the local resources after 

660 # the first observation update, and to make the MID handler RST the 

661 # observation on the next. 

662 # FIXME: there *is* now a .on_cancel callback, we should at least 

663 # hook into that, and possibly even send a proper cancellation 

664 # then. 

665 next_event = yield True 

666 if self.observation.cancelled: 

667 self._stop_interest() 

668 return 

669 

670 if next_event.exception is not None: 

671 self.observation.error(next_event.exception) 

672 if not next_event.is_last: 

673 self._stop_interest() 

674 if not isinstance(next_event.exception, error.Error): 

675 self.log.warning( 

676 "An exception that is not an aiocoap Error was " 

677 "raised from a transport during an observation; " 

678 "please report this as a bug in aiocoap: %r", 

679 next_event.exception, 

680 ) 

681 return 

682 

683 self._add_response_properties(next_event.message, self._pipe.request) 

684 

685 if next_event.message.opt.observe is not None: 

686 # check for reordering 

687 v2 = next_event.message.opt.observe 

688 t2 = time.time() 

689 

690 is_recent = ( 

691 (v1 < v2 and v2 - v1 < 2**23) 

692 or (v1 > v2 and v1 - v2 > 2**23) 

693 or ( 

694 t2 

695 > t1 

696 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME 

697 ) 

698 ) 

699 if is_recent: 

700 t1 = t2 

701 v1 = v2 

702 else: 

703 # the terminal message is always the last 

704 is_recent = True 

705 

706 if is_recent: 

707 self.observation.callback(next_event.message) 

708 

709 if next_event.is_last: 

710 self.observation.error(error.ObservationCancelled()) 

711 return 

712 

713 if next_event.message.opt.observe is None: 

714 self.observation.error(error.ObservationCancelled()) 

715 self.log.error( 

716 "Pipe indicated more possible responses" 

717 " while the Request handler would not know what to" 

718 " do with them, stopping any further request." 

719 ) 

720 self._stop_interest() 

721 return 

722 

723 

724class BlockwiseRequest(BaseUnicastRequest, interfaces.Request): 

725 def __init__(self, protocol, app_request): 

726 self.protocol = protocol 

727 self.log = self.protocol.log.getChild("blockwise-requester") 

728 

729 self.response = protocol.loop.create_future() 

730 

731 if app_request.opt.observe is not None: 

732 self.observation = ClientObservation() 

733 else: 

734 self.observation = None 

735 

736 self._runner = protocol.loop.create_task( 

737 self._run_outer( 

738 app_request, 

739 self.response, 

740 weakref.ref(self.observation) 

741 if self.observation is not None 

742 else lambda: None, 

743 self.protocol, 

744 self.log, 

745 ), 

746 name="Blockwise runner for %r" % app_request, 

747 ) 

748 self.response.add_done_callback(self._response_cancellation_handler) 

749 

750 def _response_cancellation_handler(self, response_future): 

751 # see Request._response_cancellation_handler 

752 if self.response.cancelled(): 

753 self._runner.cancel() 

754 

755 @classmethod 

756 async def _run_outer(cls, app_request, response, weak_observation, protocol, log): 

757 try: 

758 await cls._run(app_request, response, weak_observation, protocol, log) 

759 except asyncio.CancelledError: 

760 pass # results already set 

761 except Exception as e: 

762 logged = False 

763 if not response.done(): 

764 logged = True 

765 response.set_exception(e) 

766 obs = weak_observation() 

767 if app_request.opt.observe is not None and obs is not None: 

768 logged = True 

769 obs.error(e) 

770 if not logged: 

771 # should be unreachable 

772 log.error( 

773 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s", 

774 e, 

775 exc_info=e, 

776 ) 

777 

778 # This is a class method because that allows self and self.observation to 

779 # be freed even when this task is running, and the task to stop itself -- 

780 # otherwise we couldn't know when users just "forget" about a request 

781 # object after using its response (esp. in observe cases) and leave this 

782 # task running. 

783 @classmethod 

784 async def _run(cls, app_request, response, weak_observation, protocol, log): 

785 # we need to populate the remote right away, because the choice of 

786 # blocks depends on it. 

787 await protocol.find_remote_and_interface(app_request) 

788 

789 size_exp = app_request.remote.maximum_block_size_exp 

790 

791 if app_request.opt.block1 is not None: 

792 assert ( 

793 app_request.opt.block1.block_number == 0 

794 ), "Unexpected block number in app_request" 

795 assert ( 

796 not app_request.opt.block1.more 

797 ), "Unexpected more-flag in app_request" 

798 # this is where the library user can traditionally pass in size 

799 # exponent hints into the library. 

800 size_exp = app_request.opt.block1.size_exponent 

801 

802 # Offset in the message in blocks of size_exp. Whoever changes size_exp 

803 # is responsible for updating this number. 

804 block_cursor = 0 

805 

806 while True: 

807 # ... send a chunk 

808 

809 if size_exp >= 6: 

810 # FIXME from maximum_payload_size 

811 fragmentation_threshold = app_request.remote.maximum_payload_size 

812 else: 

813 fragmentation_threshold = 2 ** (size_exp + 4) 

814 

815 if ( 

816 app_request.opt.block1 is not None 

817 or len(app_request.payload) > fragmentation_threshold 

818 ): 

819 current_block1 = app_request._extract_block( 

820 block_cursor, size_exp, app_request.remote.maximum_payload_size 

821 ) 

822 if block_cursor == 0: 

823 current_block1.opt.size1 = len(app_request.payload) 

824 else: 

825 current_block1 = app_request 

826 

827 blockrequest = protocol.request(current_block1, handle_blockwise=False) 

828 blockresponse = await blockrequest.response 

829 

830 # store for future blocks to ensure that the next blocks will be 

831 # sent from the same source address (in the UDP case; for many 

832 # other transports it won't matter). 

833 app_request.remote = blockresponse.remote 

834 

835 if blockresponse.opt.block1 is None: 

836 if blockresponse.code.is_successful() and current_block1.opt.block1: 

837 log.warning( 

838 "Block1 option completely ignored by server, assuming it knows what it is doing." 

839 ) 

840 # FIXME: handle 4.13 and retry with the indicated size option 

841 break 

842 

843 block1 = blockresponse.opt.block1 

844 log.debug( 

845 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.", 

846 block1.block_number, 

847 block1.more, 

848 block1.size_exponent, 

849 ) 

850 

851 if block1.block_number != current_block1.opt.block1.block_number: 

852 raise error.UnexpectedBlock1Option("Block number mismatch") 

853 

854 if size_exp == 7: 

855 block_cursor += len(current_block1.payload) // 1024 

856 else: 

857 block_cursor += 1 

858 

859 while block1.size_exponent < size_exp: 

860 block_cursor *= 2 

861 size_exp -= 1 

862 

863 if not current_block1.opt.block1.more: 

864 if block1.more or blockresponse.code == CONTINUE: 

865 # treating this as a protocol error -- letting it slip 

866 # through would misrepresent the whole operation as an 

867 # over-all 2.xx (successful) one. 

868 raise error.UnexpectedBlock1Option( 

869 "Server asked for more data at end of body" 

870 ) 

871 break 

872 

873 # checks before preparing the next round: 

874 

875 if blockresponse.opt.observe: 

876 # we're not *really* interested in that block, we just sent an 

877 # observe option to indicate that we'll want to observe the 

878 # resulting representation as a whole 

879 log.warning( 

880 "Server answered Observe in early Block1 phase, cancelling the erroneous observation." 

881 ) 

882 blockrequest.observe.cancel() 

883 

884 if block1.more: 

885 # FIXME i think my own server is dowing this wrong 

886 # if response.code != CONTINUE: 

887 # raise error.UnexpectedBlock1Option("more-flag set but no Continue") 

888 pass 

889 else: 

890 if not blockresponse.code.is_successful(): 

891 break 

892 else: 

893 # ignoring (discarding) the successul intermediate result, waiting for a final one 

894 continue 

895 

896 lower_observation = None 

897 if app_request.opt.observe is not None: 

898 if blockresponse.opt.observe is not None: 

899 lower_observation = blockrequest.observation 

900 else: 

901 obs = weak_observation() 

902 if obs: 

903 obs.error(error.NotObservable()) 

904 del obs 

905 

906 assert blockresponse is not None, "Block1 loop broke without setting a response" 

907 blockresponse.opt.block1 = None 

908 

909 # FIXME check with RFC7959: it just says "send requests similar to the 

910 # requests in the Block1 phase", what does that mean? using the last 

911 # block1 as a reference for now, especially because in the 

912 # only-one-request-block case, that's the original request we must send 

913 # again and again anyway 

914 assembled_response = await cls._complete_by_requesting_block2( 

915 protocol, current_block1, blockresponse, log 

916 ) 

917 

918 response.set_result(assembled_response) 

919 # finally set the result 

920 

921 if lower_observation is not None: 

922 # FIXME this can all be simplified a lot since it's no more 

923 # expected that observations shut themselves down when GC'd. 

924 obs = weak_observation() 

925 del weak_observation 

926 if obs is None: 

927 lower_observation.cancel() 

928 return 

929 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask 

930 subtask = asyncio.create_task( 

931 cls._run_observation( 

932 app_request, 

933 lower_observation, 

934 future_weak_observation, 

935 protocol, 

936 log, 

937 ), 

938 name="Blockwise observation for %r" % app_request, 

939 ) 

940 future_weak_observation.set_result( 

941 weakref.ref(obs, lambda obs: subtask.cancel()) 

942 ) 

943 obs.on_cancel(subtask.cancel) 

944 del obs 

945 await subtask 

946 

947 @classmethod 

948 async def _run_observation( 

949 cls, original_request, lower_observation, future_weak_observation, protocol, log 

950 ): 

951 weak_observation = await future_weak_observation 

952 # we can use weak_observation() here at any time, because whenever that 

953 # becomes None, this task gets cancelled 

954 try: 

955 async for block1_notification in lower_observation: 

956 log.debug("Notification received") 

957 full_notification = await cls._complete_by_requesting_block2( 

958 protocol, original_request, block1_notification, log 

959 ) 

960 log.debug("Reporting completed notification") 

961 weak_observation().callback(full_notification) 

962 # FIXME verify that this loop actually ends iff the observation 

963 # was cancelled -- otherwise find out the cause(s) or make it not 

964 # cancel under indistinguishable circumstances 

965 weak_observation().error(error.ObservationCancelled()) 

966 except asyncio.CancelledError: 

967 return 

968 except Exception as e: 

969 weak_observation().error(e) 

970 finally: 

971 # We generally avoid idempotent cancellation, but we may have 

972 # reached this point either due to an earlier cancellation or 

973 # without one 

974 if not lower_observation.cancelled: 

975 lower_observation.cancel() 

976 

977 @classmethod 

978 async def _complete_by_requesting_block2( 

979 cls, protocol, request_to_repeat, initial_response, log 

980 ): 

981 # FIXME this can probably be deduplicated against BlockwiseRequest 

982 

983 if ( 

984 initial_response.opt.block2 is None 

985 or initial_response.opt.block2.more is False 

986 ): 

987 initial_response.opt.block2 = None 

988 return initial_response 

989 

990 if initial_response.opt.block2.block_number != 0: 

991 log.error("Error assembling blockwise response (expected first block)") 

992 raise error.UnexpectedBlock2() 

993 

994 assembled_response = initial_response 

995 last_response = initial_response 

996 while True: 

997 current_block2 = request_to_repeat._generate_next_block2_request( 

998 assembled_response 

999 ) 

1000 

1001 current_block2 = current_block2.copy(remote=initial_response.remote) 

1002 

1003 blockrequest = protocol.request(current_block2, handle_blockwise=False) 

1004 last_response = await blockrequest.response 

1005 

1006 if last_response.opt.block2 is None: 

1007 log.warning( 

1008 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response." 

1009 ) 

1010 return last_response 

1011 

1012 block2 = last_response.opt.block2 

1013 log.debug( 

1014 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.", 

1015 block2.block_number, 

1016 block2.more, 

1017 block2.size_exponent, 

1018 ) 

1019 try: 

1020 assembled_response._append_response_block(last_response) 

1021 except error.Error as e: 

1022 log.error("Error assembling blockwise response, passing on error %r", e) 

1023 raise 

1024 

1025 if block2.more is False: 

1026 return assembled_response 

1027 

1028 

1029class ClientObservation: 

1030 """An interface to observe notification updates arriving on a request. 

1031 

1032 This class does not actually provide any of the observe functionality, it 

1033 is purely a container for dispatching the messages via asynchronous 

1034 iteration. It gets driven (ie. populated with responses or errors including 

1035 observation termination) by a Request object. 

1036 """ 

1037 

1038 def __init__(self): 

1039 self.callbacks = [] 

1040 self.errbacks = [] 

1041 

1042 self.cancelled = False 

1043 self._on_cancel = [] 

1044 

1045 self._latest_response = None 

1046 # the analogous error is stored in _cancellation_reason when cancelled. 

1047 

1048 def __aiter__(self): 

1049 """`async for` interface to observations. 

1050 

1051 This is the preferred interface to obtaining observations.""" 

1052 it = self._Iterator() 

1053 self.register_callback(it.push, _suppress_deprecation=True) 

1054 self.register_errback(it.push_err, _suppress_deprecation=True) 

1055 return it 

1056 

1057 class _Iterator: 

1058 def __init__(self): 

1059 self._future = asyncio.get_event_loop().create_future() 

1060 

1061 def push(self, item): 

1062 if self._future.done(): 

1063 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1064 self._future = asyncio.get_event_loop().create_future() 

1065 self._future.set_result(item) 

1066 

1067 def push_err(self, e): 

1068 if self._future.done(): 

1069 self._future = asyncio.get_event_loop().create_future() 

1070 self._future.set_exception(e) 

1071 

1072 async def __anext__(self): 

1073 f = self._future 

1074 try: 

1075 result = await self._future 

1076 # FIXME see `await servobs._trigger` comment: might waiting for 

1077 # the original future not yield the first future's result when 

1078 # a quick second future comes in in a push? 

1079 if f is self._future: 

1080 self._future = asyncio.get_event_loop().create_future() 

1081 return result 

1082 except (error.NotObservable, error.ObservationCancelled): 

1083 # only exit cleanly when the server -- right away or later -- 

1084 # states that the resource is not observable any more 

1085 # FIXME: check whether an unsuccessful message is still passed 

1086 # as an observation result (or whether it should be) 

1087 raise StopAsyncIteration 

1088 

1089 def __del__(self): 

1090 if self._future.done(): 

1091 try: 

1092 # Fetch the result so any errors show up at least in the 

1093 # finalizer output 

1094 self._future.result() 

1095 except (error.ObservationCancelled, error.NotObservable): 

1096 # This is the case at the end of an observation cancelled 

1097 # by the server. 

1098 pass 

1099 except error.NetworkError: 

1100 # This will already have shown up in the main result too. 

1101 pass 

1102 except (error.LibraryShutdown, asyncio.CancelledError): 

1103 pass 

1104 # Anything else flying out of this is unexpected and probably a 

1105 # library error 

1106 

1107 # When this function is removed, we can finally do cleanup better. Right 

1108 # now, someone could register a callback that doesn't hold any references, 

1109 # so we can't just stop the request when nobody holds a reference to this 

1110 # any more. Once we're all in pull mode, we can make the `process` function 

1111 # that sends data in here use a weak reference (because any possible 

1112 # recipient would need to hold a reference to self or the iterator, and 

1113 # thus _run). 

1114 def register_callback(self, callback, _suppress_deprecation=False): 

1115 """Call the callback whenever a response to the message comes in, and 

1116 pass the response to it. 

1117 

1118 The use of this function is deprecated: Use the asynchronous iteration 

1119 interface instead.""" 

1120 if not _suppress_deprecation: 

1121 warnings.warn( 

1122 "register_callback on observe results is deprected: Use `async for notify in request.observation` instead.", 

1123 DeprecationWarning, 

1124 stacklevel=2, 

1125 ) 

1126 if self.cancelled: 

1127 return 

1128 

1129 self.callbacks.append(callback) 

1130 if self._latest_response is not None: 

1131 callback(self._latest_response) 

1132 

1133 def register_errback(self, callback, _suppress_deprecation=False): 

1134 """Call the callback whenever something goes wrong with the 

1135 observation, and pass an exception to the callback. After such a 

1136 callback is called, no more callbacks will be issued. 

1137 

1138 The use of this function is deprecated: Use the asynchronous iteration 

1139 interface instead.""" 

1140 if not _suppress_deprecation: 

1141 warnings.warn( 

1142 "register_errback on observe results is deprected: Use `async for notify in request.observation` instead.", 

1143 DeprecationWarning, 

1144 stacklevel=2, 

1145 ) 

1146 if self.cancelled: 

1147 callback(self._cancellation_reason) 

1148 return 

1149 self.errbacks.append(callback) 

1150 

1151 def callback(self, response): 

1152 """Notify all listeners of an incoming response""" 

1153 

1154 self._latest_response = response 

1155 

1156 for c in self.callbacks: 

1157 c(response) 

1158 

1159 def error(self, exception): 

1160 """Notify registered listeners that the observation went wrong. This 

1161 can only be called once.""" 

1162 

1163 if self.errbacks is None: 

1164 raise RuntimeError( 

1165 "Error raised in an already cancelled ClientObservation" 

1166 ) from exception 

1167 for c in self.errbacks: 

1168 c(exception) 

1169 

1170 self.cancel() 

1171 self._cancellation_reason = exception 

1172 

1173 def cancel(self): 

1174 # FIXME determine whether this is called by anything other than error, 

1175 # and make it private so there is always a _cancellation_reason 

1176 """Cease to generate observation or error events. This will not 

1177 generate an error by itself. 

1178 

1179 This function is only needed while register_callback and 

1180 register_errback are around; once their deprecations are acted on, 

1181 dropping the asynchronous iterator will automatically cancel the 

1182 observation. 

1183 """ 

1184 

1185 assert not self.cancelled, "ClientObservation cancelled twice" 

1186 

1187 # make sure things go wrong when someone tries to continue this 

1188 self.errbacks = None 

1189 self.callbacks = None 

1190 

1191 self.cancelled = True 

1192 while self._on_cancel: 

1193 self._on_cancel.pop()() 

1194 

1195 self._cancellation_reason = None 

1196 

1197 def on_cancel(self, callback): 

1198 if self.cancelled: 

1199 callback() 

1200 self._on_cancel.append(callback) 

1201 

1202 def __repr__(self): 

1203 return "<%s %s at %#x>" % ( 

1204 type(self).__name__, 

1205 "(cancelled)" 

1206 if self.cancelled 

1207 else "(%s call-, %s errback(s))" 

1208 % (len(self.callbacks), len(self.errbacks)), 

1209 id(self), 

1210 ) 

1211 

1212 

1213class ServerObservation: 

1214 def __init__(self): 

1215 self._accepted = False 

1216 self._trigger = asyncio.get_event_loop().create_future() 

1217 # A deregistration is "early" if it happens before the response message 

1218 # is actually sent; calling deregister() in that time (typically during 

1219 # `render()`) will not send an unsuccessful response message but just 

1220 # sent this flag which is set to None as soon as it is too late for an 

1221 # early deregistration. 

1222 # This mechanism is temporary until more of aiocoap behaves like 

1223 # Pipe which does not suffer from this limitation. 

1224 self._early_deregister = False 

1225 self._late_deregister = False 

1226 

1227 def accept(self, cancellation_callback): 

1228 self._accepted = True 

1229 self._cancellation_callback = cancellation_callback 

1230 

1231 def deregister(self, reason=None): 

1232 if self._early_deregister is False: 

1233 self._early_deregister = True 

1234 return 

1235 

1236 warnings.warn( 

1237 "Late use of ServerObservation.deregister() is" 

1238 " deprecated, use .trigger with an unsuccessful value" 

1239 " instead", 

1240 DeprecationWarning, 

1241 ) 

1242 self.trigger( 

1243 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable") 

1244 ) 

1245 

1246 def trigger(self, response=None, *, is_last=False): 

1247 """Send an updated response; if None is given, the observed resource's 

1248 rendering will be invoked to produce one. 

1249 

1250 `is_last` can be set to True to indicate that no more responses will be 

1251 sent. Note that an unsuccessful response will be the last no matter 

1252 what is_last says, as such a message always terminates a CoAP 

1253 observation.""" 

1254 if is_last: 

1255 self._late_deregister = True 

1256 if self._trigger.done(): 

1257 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1258 self._trigger = asyncio.get_event_loop().create_future() 

1259 self._trigger.set_result(response)