Coverage for aiocoap/protocol.py: 86%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

592 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9"""This module contains the classes that are responsible for keeping track of 

10messages: 

11 

12* :class:`Context` roughly represents the CoAP endpoint (basically a UDP 

13 socket) -- something that can send requests and possibly can answer 

14 incoming requests. 

15 

16* a :class:`Request` gets generated whenever a request gets sent to keep 

17 track of the response 

18 

19* a :class:`Responder` keeps track of a single incoming request 

20""" 

21 

22import asyncio 

23import functools 

24import weakref 

25import time 

26 

27from . import defaults 

28from .credentials import CredentialsMap 

29from .message import Message 

30from .optiontypes import BlockOption 

31from .messagemanager import MessageManager 

32from .tokenmanager import TokenManager 

33from .plumbingrequest import PlumbingRequest, run_driving_plumbing_request 

34from . import interfaces 

35from . import error 

36from .numbers import (INTERNAL_SERVER_ERROR, NOT_FOUND, 

37 CONTINUE, REQUEST_ENTITY_INCOMPLETE, 

38 OBSERVATION_RESET_TIME, MAX_TRANSMIT_WAIT) 

39from .numbers.optionnumbers import OptionNumber 

40from .util.asyncio.coro_or_contextmanager import AwaitOrAenter 

41from .util.asyncio import py38args 

42 

43import warnings 

44import logging 

45# log levels used: 

46# * debug is for things that occur even under perfect conditions. 

47# * info is for things that are well expected, but might be interesting during 

48# testing a network of nodes and not debugging the library. (timeouts, 

49# retransmissions, pings) 

50# * warning is for everything that indicates a malbehaved client. (these don't 

51# necessarily indicate a client bug, though; things like requesting a 

52# nonexistent block can just as well happen when a resource's content has 

53# changed between blocks). 

54 

55def _extract_block_key(message): 

56 """Extract a key that hashes equally for all blocks of a blockwise 

57 operation from a request message. 

58 

59 See discussion at <https://mailarchive.ietf.org/arch/msg/core/I-6LzAL6lIUVDA6_g9YM3Zjhg8E>. 

60 """ 

61 

62 return (message.remote, message.get_cache_key([ 

63 OptionNumber.BLOCK1, 

64 OptionNumber.BLOCK2, 

65 OptionNumber.OBSERVE, 

66 ])) 

67 

68 

69class Context(interfaces.RequestProvider): 

70 """Applications' entry point to the network 

71 

72 A :class:`.Context` coordinates one or more network :mod:`.transports` 

73 implementations and dispatches data between them and the application. 

74 

75 The application can start requests using the message dispatch methods, and 

76 set a :class:`resources.Site` that will answer requests directed to the 

77 application as a server. 

78 

79 On the library-internals side, it is the prime implementation of the 

80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and 

81 :class:`Response` classes on demand, and decides which transport 

82 implementations to start and which are to handle which messages. 

83 

84 **Context creation and destruction** 

85 

86 The following functions are provided for creating and stopping a context: 

87 

88 .. note:: 

89 

90 A typical application should only ever create one context, even (or 

91 especially when) it acts both as a server and as a client (in which 

92 case a server context should be created). 

93 

94 A context that is not used any more must be shut down using 

95 :meth:`.shutdown()`, but typical applications will not need to because 

96 they use the context for the full process lifetime. 

97 

98 The context creation functions also work as `asynchronous context 

99 managers`__, shutting down the (aiocoap) context when the (Python) context 

100 ends. 

101 

102 .. __: https://docs.python.org/3/reference/datamodel.html#async-context-managers 

103 

104 .. automethod:: create_client_context 

105 .. automethod:: create_server_context 

106 

107 .. automethod:: shutdown 

108 

109 **Dispatching messages** 

110 

111 CoAP requests can be sent using the following functions: 

112 

113 .. automethod:: request 

114 

115 .. automethod:: multicast_request 

116 

117 If more control is needed, you can create a :class:`Request` yourself and 

118 pass the context to it. 

119 

120 

121 **Other methods and properties** 

122 

123 The remaining methods and properties are to be considered unstable even 

124 when the project reaches a stable version number; please file a feature 

125 request for stabilization if you want to reliably access any of them. 

126 

127 (Sorry for the duplicates, still looking for a way to make autodoc list 

128 everything not already mentioned). 

129 

130 """ 

131 def __init__(self, loop=None, serversite=None, loggername="coap", client_credentials=None, server_credentials=None): 

132 self.log = logging.getLogger(loggername) 

133 

134 self.loop = loop or asyncio.get_event_loop() 

135 

136 self.serversite = serversite 

137 

138 self.request_interfaces = [] 

139 

140 self.client_credentials = client_credentials or CredentialsMap() 

141 self.server_credentials = server_credentials or CredentialsMap() 

142 

143 # FIXME: consider introducing a TimeoutDict 

144 self._block1_assemblies = {} 

145 """mapping block-key to (partial request, timeout handle)""" 

146 self._block2_assemblies = {} 

147 """mapping block-key to (complete response, timeout handle) 

148 

149 For both, block-key is as extracted by _extract_block_key.""" 

150 

151 # 

152 # Asynchronous context manager 

153 # 

154 

155 async def __aenter__(self): 

156 # Note that this is usually not called that way; the more common idiom 

157 # is `async with Context.create_client_context()` which returns a 

158 # future that also has an __aenter__ method. 

159 

160 return self 

161 

162 async def __aexit__(self, exc_type, exc, tb): 

163 await self.shutdown() 

164 

165 # 

166 # convenience methods for class instanciation 

167 # 

168 

169 async def _append_tokenmanaged_messagemanaged_transport(self, message_interface_constructor): 

170 tman = TokenManager(self) 

171 mman = MessageManager(tman) 

172 transport = await message_interface_constructor(mman) 

173 

174 mman.message_interface = transport 

175 tman.token_interface = mman 

176 

177 self.request_interfaces.append(tman) 

178 

179 async def _append_tokenmanaged_transport(self, token_interface_constructor): 

180 tman = TokenManager(self) 

181 transport = await token_interface_constructor(tman) 

182 

183 tman.token_interface = transport 

184 

185 self.request_interfaces.append(tman) 

186 

187 @classmethod 

188 @AwaitOrAenter.decorate 

189 async def create_client_context(cls, *, loggername="coap", loop=None): 

190 """Create a context bound to all addresses on a random listening port. 

191 

192 This is the easiest way to get a context suitable for sending client 

193 requests. 

194 

195 Note that while this looks in the documentation like a function rather 

196 than an asynchronous function, it does return an awaitable; the way it 

197 is set up allows using the result as an asynchronous context manager as 

198 well. 

199 """ 

200 

201 if loop is None: 

202 loop = asyncio.get_event_loop() 

203 

204 self = cls(loop=loop, serversite=None, loggername=loggername) 

205 

206 # FIXME make defaults overridable (postponed until they become configurable too) 

207 for transportname in defaults.get_default_clienttransports(loop=loop): 

208 if transportname == 'udp6': 

209 from .transports.udp6 import MessageInterfaceUDP6 

210 await self._append_tokenmanaged_messagemanaged_transport( 

211 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

212 elif transportname == 'simple6': 

213 from .transports.simple6 import MessageInterfaceSimple6 

214 await self._append_tokenmanaged_messagemanaged_transport( 

215 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

216 elif transportname == 'tinydtls': 

217 from .transports.tinydtls import MessageInterfaceTinyDTLS 

218 await self._append_tokenmanaged_messagemanaged_transport( 

219 

220 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

221 elif transportname == 'tcpclient': 

222 from .transports.tcp import TCPClient 

223 await self._append_tokenmanaged_transport( 

224 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)) 

225 elif transportname == 'tlsclient': 

226 from .transports.tls import TLSClient 

227 await self._append_tokenmanaged_transport( 

228 lambda tman: TLSClient.create_client_transport(tman, self.log, loop, self.client_credentials)) 

229 elif transportname == 'ws': 

230 from .transports.ws import WSPool 

231 await self._append_tokenmanaged_transport( 

232 lambda tman: WSPool.create_transport(tman, self.log, loop, client_credentials=self.client_credentials)) 

233 elif transportname == 'oscore': 

234 from .transports.oscore import TransportOSCORE 

235 oscoretransport = TransportOSCORE(self, self) 

236 self.request_interfaces.append(oscoretransport) 

237 else: 

238 raise RuntimeError("Transport %r not know for client context creation"%transportname) 

239 

240 return self 

241 

242 @classmethod 

243 @AwaitOrAenter.decorate 

244 async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None, multicast=[], server_credentials=None): 

245 """Create a context, bound to all addresses on the CoAP port (unless 

246 otherwise specified in the ``bind`` argument). 

247 

248 This is the easiest way to get a context suitable both for sending 

249 client and accepting server requests. 

250 

251 The ``bind`` argument, if given, needs to be a 2-tuple of IP address 

252 string and port number, where the port number can be None to use the default port. 

253 

254 If ``multicast`` is given, it needs to be a list of (multicast address, 

255 interface name) tuples, which will all be joined. (The IPv4 style of 

256 selecting the interface by a local address is not supported; users may 

257 want to use the netifaces package to arrive at an interface name for an 

258 address). 

259 

260 As a shortcut, the list may also contain interface names alone. Those 

261 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with 

262 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6. 

263 

264 Under some circumstances you may already need a context to pass into 

265 the site for creation; this is typically the case for servers that 

266 trigger requests on their own. For those cases, it is usually easiest 

267 to pass None in as a site, and set the fully constructed site later by 

268 assigning to the ``serversite`` attribute. 

269 """ 

270 

271 if loop is None: 

272 loop = asyncio.get_event_loop() 

273 

274 self = cls(loop=loop, serversite=site, loggername=loggername, server_credentials=server_credentials) 

275 

276 multicast_done = not multicast 

277 

278 for transportname in defaults.get_default_servertransports(loop=loop): 

279 if transportname == 'udp6': 

280 from .transports.udp6 import MessageInterfaceUDP6 

281 

282 await self._append_tokenmanaged_messagemanaged_transport( 

283 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind, multicast=multicast)) 

284 multicast_done = True 

285 # FIXME this is duplicated from the client version, as those are client-only anyway 

286 elif transportname == 'simple6': 

287 from .transports.simple6 import MessageInterfaceSimple6 

288 await self._append_tokenmanaged_messagemanaged_transport( 

289 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

290 elif transportname == 'tinydtls': 

291 from .transports.tinydtls import MessageInterfaceTinyDTLS 

292 

293 await self._append_tokenmanaged_messagemanaged_transport( 

294 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(mman, log=self.log, loop=loop)) 

295 # FIXME end duplication 

296 elif transportname == 'tinydtls_server': 

297 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer 

298 

299 await self._append_tokenmanaged_messagemanaged_transport( 

300 lambda mman: MessageInterfaceTinyDTLSServer.create_server(bind, mman, log=self.log, loop=loop, server_credentials=self.server_credentials)) 

301 elif transportname == 'simplesocketserver': 

302 from .transports.simplesocketserver import MessageInterfaceSimpleServer 

303 await self._append_tokenmanaged_messagemanaged_transport( 

304 lambda mman: MessageInterfaceSimpleServer.create_server(bind, mman, log=self.log, loop=loop)) 

305 elif transportname == 'tcpserver': 

306 from .transports.tcp import TCPServer 

307 await self._append_tokenmanaged_transport( 

308 lambda tman: TCPServer.create_server(bind, tman, self.log, loop)) 

309 elif transportname == 'tcpclient': 

310 from .transports.tcp import TCPClient 

311 await self._append_tokenmanaged_transport( 

312 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)) 

313 elif transportname == 'tlsserver': 

314 if _ssl_context is not None: 

315 from .transports.tls import TLSServer 

316 await self._append_tokenmanaged_transport( 

317 lambda tman: TLSServer.create_server(bind, tman, self.log, loop, _ssl_context)) 

318 elif transportname == 'tlsclient': 

319 from .transports.tls import TLSClient 

320 await self._append_tokenmanaged_transport( 

321 lambda tman: TLSClient.create_client_transport(tman, self.log, loop, self.client_credentials)) 

322 elif transportname == 'ws': 

323 from .transports.ws import WSPool 

324 await self._append_tokenmanaged_transport( 

325 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind 

326 lambda tman: WSPool.create_transport(tman, self.log, loop, client_credentials=self.client_credentials, server_bind=bind or (None, None), server_context=_ssl_context)) 

327 elif transportname == 'oscore': 

328 from .transports.oscore import TransportOSCORE 

329 oscoretransport = TransportOSCORE(self, self) 

330 self.request_interfaces.append(oscoretransport) 

331 else: 

332 raise RuntimeError("Transport %r not know for server context creation"%transportname) 

333 

334 if not multicast_done: 

335 self.log.warning("Multicast was requested, but no multicast capable transport was selected.") 

336 

337 return self 

338 

339 async def shutdown(self): 

340 """Take down any listening sockets and stop all related timers. 

341 

342 After this coroutine terminates, and once all external references to 

343 the object are dropped, it should be garbage-collectable. 

344 

345 This method may take the time to inform communications partners of 

346 stopped observations (but currently does not).""" 

347 

348 self.log.debug("Shutting down context") 

349 for _, canceler in self._block1_assemblies.values(): 

350 canceler() 

351 for _, canceler in self._block2_assemblies.values(): 

352 canceler() 

353 

354 done, pending = await asyncio.wait([ 

355 asyncio.create_task( 

356 ri.shutdown(), 

357 **py38args(name="Shutdown of %r" % ri) 

358 ) 

359 for ri 

360 in self.request_interfaces], 

361 timeout=3) 

362 for item in done: 

363 await item 

364 

365 # FIXME: determine how official this should be, or which part of it is 

366 # public -- now that BlockwiseRequest uses it. (And formalize what can 

367 # change about messages and what can't after the remote has been thusly 

368 # populated). 

369 async def find_remote_and_interface(self, message): 

370 for ri in self.request_interfaces: 

371 if await ri.fill_or_recognize_remote(message): 

372 return ri 

373 raise RuntimeError("No request interface could route message") 

374 

375 def request(self, request_message, handle_blockwise=True): 

376 if handle_blockwise: 

377 return BlockwiseRequest(self, request_message) 

378 

379 plumbing_request = PlumbingRequest(request_message, self.log) 

380 # Request sets up callbacks at creation 

381 result = Request(plumbing_request, self.loop, self.log) 

382 

383 async def send(): 

384 try: 

385 request_interface = await self.find_remote_and_interface(request_message) 

386 request_interface.request(plumbing_request) 

387 except Exception as e: 

388 plumbing_request.add_exception(e) 

389 return 

390 self.loop.create_task( 

391 send(), 

392 **py38args(name="Request processing of %r" % result) 

393 ) 

394 return result 

395 

396 # the following are under consideration for moving into Site or something 

397 # mixed into it 

398 

399 def render_to_plumbing_request(self, plumbing_request): 

400 """Satisfy a plumbing request from the full :meth:`render` / 

401 :meth:`needs_blockwise_assembly` / :meth:`add_observation` interfaces 

402 provided by the site.""" 

403 

404 run_driving_plumbing_request( 

405 plumbing_request, 

406 self._render_to_plumbing_request(plumbing_request), 

407 self.log, 

408 name="Rendering for %r" % plumbing_request.request, 

409 ) 

410 

411 async def _render_to_plumbing_request(self, plumbing_request): 

412 # will receive a result in the finally, so the observation's 

413 # cancellation callback can just be hooked into that rather than 

414 # catching CancellationError here 

415 cancellation_future = self.loop.create_future() 

416 

417 def cleanup(cancellation_future=cancellation_future): 

418 if not cancellation_future.done(): 

419 cancellation_future.set_result(None) 

420 

421 # not trying to cancel the whole rendering right now, as that would 

422 # mean that we'll need to cancel the task in a way that won't cause a 

423 # message sent back -- but reacting to an end of interest is very 

424 # relevant when network errors arrive from observers. 

425 plumbing_request.on_interest_end(cleanup) 

426 

427 try: 

428 await self._render_to_plumbing_request_inner(plumbing_request, 

429 cancellation_future) 

430 finally: 

431 cleanup() 

432 

433 

434 async def _render_to_plumbing_request_inner(self, plumbing_request, cancellation_future): 

435 request = plumbing_request.request 

436 

437 if self.serversite is None: 

438 plumbing_request.add_response(Message(code=NOT_FOUND, payload=b"not a server"), is_last=True) 

439 return 

440 

441 if await self.serversite.can_render_to_plumbingrequest(request): 

442 # On the long run, everything else might become deprecated 

443 

444 # As this is called exclusively through render_to_plumbingrequest, 

445 # we can be sure not to kill anything unrelated when cancelling 

446 # this task. (The alternative would be to pass the task along 

447 # explicitly, but that's cyclic and unneccesary). 

448 our_task = asyncio.current_task() 

449 # Not the cleanest way to do all this, but compatible with th rest 

450 # of _render_to_plumbing_request. This ensures that even if the 

451 # render_to_plumbingrequest coroutine does *not* on its own wait 

452 # for the cancellation (by registering .on_interest_end()), it 

453 # *still* gets cancelled. (FIXME: On a documentation level, should 

454 # it even (and risk different cancellation paths being taken), or 

455 # should it just await cancellation?) 

456 cancellation_future.add_done_callback(lambda _, f=our_task.cancel: f()) 

457 

458 return await self.serversite.render_to_plumbingrequest(plumbing_request) 

459 

460 needs_blockwise = await self.serversite.needs_blockwise_assembly(request) 

461 if needs_blockwise: 

462 block_key = _extract_block_key(request) 

463 

464 if needs_blockwise and request.opt.block2 and \ 

465 request.opt.block2.block_number != 0: 

466 if request.opt.block1 is not None: 

467 raise error.BadOption("Block1 conflicts with non-initial Block2") 

468 

469 try: 

470 response, _ = self._block2_assemblies[block_key] 

471 except KeyError: 

472 plumbing_request.add_response(Message( 

473 code=REQUEST_ENTITY_INCOMPLETE), 

474 is_last=True) 

475 self.log.info("Received unmatched blockwise response" 

476 " operation message") 

477 return 

478 

479 # FIXME: update the timeout? maybe remove item when last is 

480 # requested in a confirmable message? 

481 

482 response = response._extract_block( 

483 request.opt.block2.block_number, 

484 request.opt.block2.size_exponent, 

485 request.remote.maximum_payload_size 

486 ) 

487 plumbing_request.add_response( 

488 response, 

489 is_last=True) 

490 return 

491 

492 if needs_blockwise and request.opt.block1: 

493 if request.opt.block1.block_number == 0: 

494 if block_key in self._block1_assemblies: 

495 _, canceler = self._block1_assemblies.pop(block_key) 

496 canceler() 

497 self.log.info("Aborting incomplete Block1 operation at" 

498 " arrival of new start block") 

499 new_aggregate = request 

500 else: 

501 try: 

502 previous, canceler = self._block1_assemblies.pop(block_key) 

503 except KeyError: 

504 plumbing_request.add_response(Message( 

505 code=REQUEST_ENTITY_INCOMPLETE), 

506 is_last=True) 

507 self.log.info("Received unmatched blockwise request" 

508 " operation message") 

509 return 

510 canceler() 

511 

512 try: 

513 previous._append_request_block(request) 

514 except ValueError: 

515 plumbing_request.add_response(Message( 

516 code=REQUEST_ENTITY_INCOMPLETE), 

517 is_last=True) 

518 self.log.info("Failed to assemble blockwise request (gaps or overlaps)") 

519 return 

520 new_aggregate = previous 

521 

522 if request.opt.block1.more: 

523 canceler = self.loop.call_later( 

524 MAX_TRANSMIT_WAIT, # FIXME: introduce an actual parameter here 

525 functools.partial(self._block1_assemblies.pop, block_key) 

526 ).cancel 

527 

528 self._block1_assemblies[block_key] = (new_aggregate, canceler) 

529 

530 plumbing_request.add_response(Message( 

531 code=CONTINUE, 

532 block1=BlockOption.BlockwiseTuple( 

533 request.opt.block1.block_number, 

534 True, 

535 request.opt.block1.size_exponent), 

536 ), 

537 is_last=True) 

538 return 

539 else: 

540 immediate_response_block1 = request.opt.block1 

541 request = new_aggregate 

542 else: 

543 immediate_response_block1 = None 

544 

545 observe_requested = request.opt.observe == 0 

546 if observe_requested: 

547 servobs = ServerObservation() 

548 await self.serversite.add_observation(request, servobs) 

549 

550 if servobs._accepted: 

551 cancellation_future.add_done_callback( 

552 lambda f, cb=servobs._cancellation_callback: cb()) 

553 

554 response = await self.serversite.render(request) 

555 

556 if response.code is None or not response.code.is_response(): 

557 self.log.warning("Response does not carry response code (%r)," 

558 " application probably violates protocol.", 

559 response.code) 

560 

561 if needs_blockwise and ( 

562 len(response.payload) > ( 

563 request.remote.maximum_payload_size 

564 if request.opt.block2 is None 

565 else request.opt.block2.size)): 

566 

567 if block_key in self._block2_assemblies: 

568 _, canceler = self._block2_assemblies.pop(block_key) 

569 canceler() 

570 

571 canceler = self.loop.call_later( 

572 MAX_TRANSMIT_WAIT, # FIXME: introduce an actual parameter here 

573 functools.partial(self._block2_assemblies.pop, block_key) 

574 ).cancel 

575 

576 self._block2_assemblies[block_key] = (response, canceler) 

577 

578 szx = request.opt.block2.size_exponent if request.opt.block2 is not None \ 

579 else request.remote.maximum_block_size_exp 

580 # if a requested block2 number were not 0, the code would have 

581 # diverted earlier to serve from active operations 

582 response = response._extract_block(0, szx, request.remote.maximum_payload_size) 

583 

584 if needs_blockwise: 

585 response.opt.block1 = immediate_response_block1 

586 

587 can_continue = observe_requested and servobs._accepted and \ 

588 response.code.is_successful() 

589 if observe_requested: 

590 # see comment on _early_deregister in ServerObservation 

591 if servobs._early_deregister: 

592 can_continue = False 

593 servobs._early_deregister = None 

594 if can_continue: 

595 # FIXME: observation numbers should actually not be per 

596 # asyncio.task, but per (remote, token). if a client renews an 

597 # observation (possibly with a new ETag or whatever is deemed 

598 # legal), the new observation events should still carry larger 

599 # numbers. (if they did not, the client might be tempted to discard 

600 # them). 

601 response.opt.observe = next_observation_number = 0 

602 plumbing_request.add_response(response, is_last=not can_continue) 

603 

604 while can_continue: 

605 await servobs._trigger 

606 # if you wonder why the lines around this are not just `response = 

607 # await servobs._trigger`, have a look at the 'double' tests in 

608 # test_observe.py: A later triggering could have replaced 

609 # servobs._trigger in the meantime. 

610 response = servobs._trigger.result() 

611 servobs._trigger = self.loop.create_future() 

612 

613 if response is None: 

614 response = await self.serversite.render(request) 

615 if response.code is None or not response.code.is_response(): 

616 self.log.warning("Response does not carry response code (%r)," 

617 " application probably violates protocol.", 

618 response.code) 

619 

620 # FIXME: this whole block is copy-pasted from the first response 

621 if needs_blockwise and ( 

622 len(response.payload) > ( 

623 request.remote.maximum_payload_size 

624 if request.opt.block2 is None 

625 else request.opt.block2.size)): 

626 if block_key in self._block2_assemblies: 

627 _, canceler = self._block2_assemblies.pop(block_key) 

628 canceler() 

629 

630 canceler = self.loop.call_later( 

631 MAX_TRANSMIT_WAIT, # FIXME: introduce an actual parameter here 

632 functools.partial(self._block2_assemblies.pop, block_key) 

633 ).cancel 

634 

635 self._block2_assemblies[block_key] = (response, canceler) 

636 

637 szx = request.opt.block2.size_exponent if request.opt.block2 is not None \ 

638 else request.remote.maximum_block_size_exp 

639 # if a requested block2 number were not 0, the code would have 

640 # diverted earlier to serve from active operations 

641 response = response._extract_block(0, szx, request.remote.maximum_payload_size) 

642 

643 can_continue = response.code.is_successful() and \ 

644 not servobs._late_deregister 

645 

646 if can_continue: 

647 # TODO handle situations in which this gets called more often than 

648 # 2^32 times in 256 seconds (or document why we can be sure that 

649 # that will not happen) 

650 next_observation_number = next_observation_number + 1 

651 response.opt.observe = next_observation_number 

652 

653 plumbing_request.add_response(response, is_last=not can_continue) 

654 

655class BaseRequest(object): 

656 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`""" 

657 

658class BaseUnicastRequest(BaseRequest): 

659 """A utility class that offers the :attr:`response_raising` and 

660 :attr:`response_nonraising` alternatives to waiting for the 

661 :attr:`response` future whose error states can be presented either as an 

662 unsuccessful response (eg. 4.04) or an exception. 

663 

664 It also provides some internal tools for handling anything that has a 

665 :attr:`response` future and an :attr:`observation`""" 

666 

667 @property 

668 async def response_raising(self): 

669 """An awaitable that returns if a response comes in and is successful, 

670 otherwise raises generic network exception or a 

671 :class:`.error.ResponseWrappingError` for unsuccessful responses. 

672 

673 Experimental Interface.""" 

674 

675 response = await self.response 

676 if not response.code.is_successful(): 

677 raise error.ResponseWrappingError(response) 

678 

679 return response 

680 

681 @property 

682 async def response_nonraising(self): 

683 """An awaitable that rather returns a 500ish fabricated message (as a 

684 proxy would return) instead of raising an exception. 

685 

686 Experimental Interface.""" 

687 

688 try: 

689 return await self.response 

690 except error.RenderableError as e: 

691 return e.to_message() 

692 except Exception: 

693 return Message(code=INTERNAL_SERVER_ERROR) 

694 

695class Request(interfaces.Request, BaseUnicastRequest): 

696 

697 # FIXME: Implement timing out with REQUEST_TIMEOUT here 

698 

699 def __init__(self, plumbing_request, loop, log): 

700 self._plumbing_request = plumbing_request 

701 

702 self.response = loop.create_future() 

703 

704 if plumbing_request.request.opt.observe == 0: 

705 self.observation = ClientObservation() 

706 else: 

707 self.observation = None 

708 

709 self._runner = self._run() 

710 self._runner.send(None) 

711 def process(event): 

712 try: 

713 self._runner.send(event) 

714 return True 

715 except StopIteration: 

716 return False 

717 self._stop_interest = self._plumbing_request.on_event(process) 

718 

719 self.log = log 

720 

721 self.response.add_done_callback(self._response_cancellation_handler) 

722 

723 def _response_cancellation_handler(self, response): 

724 # Propagate cancellation to the runner (if interest in the first 

725 # response is lost, there won't be observation items to pull out), but 

726 # not general completion (because if it's completed and not cancelled, 

727 # eg. when an observation is active) 

728 if self.response.cancelled() and self._runner is not None: 

729 # Dropping the only reference makes it stop with GeneratorExit, 

730 # similar to a cancelled task 

731 self._runner = None 

732 self._stop_interest() 

733 # But either way we won't be calling _stop_interest any more, so let's 

734 # not keep anything referenced 

735 self._stop_interest = None 

736 

737 @staticmethod 

738 def _add_response_properties(response, request): 

739 response.request = request 

740 

741 def _run(self): 

742 # FIXME: This is in iterator form because it used to be a task that 

743 # awaited futures, and that code could be easily converted to an 

744 # iterator. I'm not sure that's a bad state here, but at least it 

745 # should be a more conscious decision to make this an iterator rather 

746 # than just having it happen to be one. 

747 # 

748 # FIXME: check that responses come from the same remmote as long as we're assuming unicast 

749 

750 first_event = yield None 

751 

752 if first_event.message is not None: 

753 self._add_response_properties(first_event.message, self._plumbing_request.request) 

754 self.response.set_result(first_event.message) 

755 else: 

756 self.response.set_exception(first_event.exception) 

757 if not isinstance(first_event.exception, error.Error): 

758 self.log.warning( 

759 "An exception that is not an aiocoap Error was raised " 

760 "from a transport; please report this as a bug in " 

761 "aiocoap: %r", first_event.exception) 

762 

763 if self.observation is None: 

764 if not first_event.is_last: 

765 self.log.error("PlumbingRequest indicated more possible responses" 

766 " while the Request handler would not know what to" 

767 " do with them, stopping any further request.") 

768 self._plumbing_request.stop_interest() 

769 return 

770 

771 if first_event.is_last: 

772 self.observation.error(error.NotObservable()) 

773 return 

774 

775 if first_event.message.opt.observe is None: 

776 self.log.error("PlumbingRequest indicated more possible responses" 

777 " while the Request handler would not know what to" 

778 " do with them, stopping any further request.") 

779 self._plumbing_request.stop_interest() 

780 return 

781 

782 # variable names from RFC7641 Section 3.4 

783 v1 = first_event.message.opt.observe 

784 t1 = time.time() 

785 

786 while True: 

787 # We don't really support cancellation of observations yet (see 

788 # https://github.com/chrysn/aiocoap/issues/92), but at least 

789 # stopping the interest is a way to free the local resources after 

790 # the first observation update, and to make the MID handler RST the 

791 # observation on the next. 

792 # FIXME: there *is* now a .on_cancel callback, we should at least 

793 # hook into that, and possibly even send a proper cancellation 

794 # then. 

795 next_event = yield True 

796 if self.observation.cancelled: 

797 self._plumbing_request.stop_interest() 

798 return 

799 

800 if next_event.exception is not None: 

801 self.observation.error(next_event.exception) 

802 if not next_event.is_last: 

803 self._plumbing_request.stop_interest() 

804 if not isinstance(next_event.exception, error.Error): 

805 self.log.warning( 

806 "An exception that is not an aiocoap Error was " 

807 "raised from a transport during an observation; " 

808 "please report this as a bug in aiocoap: %r", 

809 next_event.exception) 

810 return 

811 

812 self._add_response_properties(next_event.message, self._plumbing_request.request) 

813 

814 if next_event.message.opt.observe is not None: 

815 # check for reordering 

816 v2 = next_event.message.opt.observe 

817 t2 = time.time() 

818 

819 is_recent = (v1 < v2 and v2 - v1 < 2**23) or \ 

820 (v1 > v2 and v1 - v2 > 2**23) or \ 

821 (t2 > t1 + OBSERVATION_RESET_TIME) 

822 if is_recent: 

823 t1 = t2 

824 v1 = v2 

825 else: 

826 # the terminal message is always the last 

827 is_recent = True 

828 

829 if is_recent: 

830 self.observation.callback(next_event.message) 

831 

832 if next_event.is_last: 

833 self.observation.error(error.ObservationCancelled()) 

834 return 

835 

836 if next_event.message.opt.observe is None: 

837 self.observation.error(error.ObservationCancelled()) 

838 self.log.error("PlumbingRequest indicated more possible responses" 

839 " while the Request handler would not know what to" 

840 " do with them, stopping any further request.") 

841 self._plumbing_request.stop_interest() 

842 return 

843 

844 

845class BlockwiseRequest(BaseUnicastRequest, interfaces.Request): 

846 def __init__(self, protocol, app_request): 

847 self.protocol = protocol 

848 self.log = self.protocol.log.getChild("blockwise-requester") 

849 

850 self.response = protocol.loop.create_future() 

851 

852 if app_request.opt.observe is not None: 

853 self.observation = ClientObservation() 

854 else: 

855 self.observation = None 

856 

857 self._runner = protocol.loop.create_task(self._run_outer( 

858 app_request, 

859 self.response, 

860 weakref.ref(self.observation) if self.observation is not None else lambda: None, 

861 self.protocol, 

862 self.log, 

863 ), 

864 **py38args(name="Blockwise runner for %r" % app_request)) 

865 self.response.add_done_callback(self._response_cancellation_handler) 

866 

867 def _response_cancellation_handler(self, response_future): 

868 # see Request._response_cancellation_handler 

869 if self.response.cancelled(): 

870 self._runner.cancel() 

871 

872 @classmethod 

873 async def _run_outer(cls, app_request, response, weak_observation, protocol, log): 

874 try: 

875 await cls._run(app_request, response, weak_observation, protocol, log) 

876 except asyncio.CancelledError: 

877 pass # results already set 

878 except Exception as e: 

879 logged = False 

880 if not response.done(): 

881 logged = True 

882 response.set_exception(e) 

883 obs = weak_observation() 

884 if app_request.opt.observe is not None and obs is not None: 

885 logged = True 

886 obs.error(e) 

887 if not logged: 

888 # should be unreachable 

889 log.error("Exception in BlockwiseRequest runner neither went to response nor to observation: %s", e, exc_info=e) 

890 

891 # This is a class method because that allows self and self.observation to 

892 # be freed even when this task is running, and the task to stop itself -- 

893 # otherwise we couldn't know when users just "forget" about a request 

894 # object after using its response (esp. in observe cases) and leave this 

895 # task running. 

896 @classmethod 

897 async def _run(cls, app_request, response, weak_observation, protocol, log): 

898 # we need to populate the remote right away, because the choice of 

899 # blocks depends on it. 

900 await protocol.find_remote_and_interface(app_request) 

901 

902 size_exp = app_request.remote.maximum_block_size_exp 

903 

904 if app_request.opt.block1 is not None: 

905 assert app_request.opt.block1.block_number == 0, "Unexpected block number in app_request" 

906 assert not app_request.opt.block1.more, "Unexpected more-flag in app_request" 

907 # this is where the library user can traditionally pass in size 

908 # exponent hints into the library. 

909 size_exp = app_request.opt.block1.size_exponent 

910 

911 # Offset in the message in blocks of size_exp. Whoever changes size_exp 

912 # is responsible for updating this number. 

913 block_cursor = 0 

914 

915 while True: 

916 # ... send a chunk 

917 

918 if len(app_request.payload) > (2 ** (size_exp + 4)): 

919 current_block1 = app_request._extract_block( 

920 block_cursor, 

921 size_exp, 

922 app_request.remote.maximum_payload_size) 

923 else: 

924 current_block1 = app_request 

925 

926 blockrequest = protocol.request(current_block1, handle_blockwise=False) 

927 blockresponse = await blockrequest.response 

928 

929 # store for future blocks to ensure that the next blocks will be 

930 # sent from the same source address (in the UDP case; for many 

931 # other transports it won't matter). 

932 app_request.remote = blockresponse.remote 

933 

934 if blockresponse.opt.block1 is None: 

935 if blockresponse.code.is_successful() and current_block1.opt.block1: 

936 log.warning("Block1 option completely ignored by server, assuming it knows what it is doing.") 

937 # FIXME: handle 4.13 and retry with the indicated size option 

938 break 

939 

940 block1 = blockresponse.opt.block1 

941 log.debug("Response with Block1 option received, number = %d, more = %d, size_exp = %d.", block1.block_number, block1.more, block1.size_exponent) 

942 

943 if block1.block_number != current_block1.opt.block1.block_number: 

944 raise error.UnexpectedBlock1Option("Block number mismatch") 

945 

946 if size_exp == 7: 

947 block_cursor += len(current_block1.payload) // 1024 

948 else: 

949 block_cursor += 1 

950 

951 while block1.size_exponent < size_exp: 

952 block_cursor *= 2 

953 size_exp -= 1 

954 

955 if not current_block1.opt.block1.more: 

956 if block1.more or blockresponse.code == CONTINUE: 

957 # treating this as a protocol error -- letting it slip 

958 # through would misrepresent the whole operation as an 

959 # over-all 2.xx (successful) one. 

960 raise error.UnexpectedBlock1Option("Server asked for more data at end of body") 

961 break 

962 

963 # checks before preparing the next round: 

964 

965 if blockresponse.opt.observe: 

966 # we're not *really* interested in that block, we just sent an 

967 # observe option to indicate that we'll want to observe the 

968 # resulting representation as a whole 

969 log.warning("Server answered Observe in early Block1 phase, cancelling the erroneous observation.") 

970 blockrequest.observe.cancel() 

971 

972 if block1.more: 

973 # FIXME i think my own server is dowing this wrong 

974 #if response.code != CONTINUE: 

975 # raise error.UnexpectedBlock1Option("more-flag set but no Continue") 

976 pass 

977 else: 

978 if not blockresponse.code.is_successful(): 

979 break 

980 else: 

981 # ignoring (discarding) the successul intermediate result, waiting for a final one 

982 continue 

983 

984 lower_observation = None 

985 if app_request.opt.observe is not None: 

986 if blockresponse.opt.observe is not None: 

987 lower_observation = blockrequest.observation 

988 else: 

989 obs = weak_observation() 

990 if obs: 

991 obs.error(error.NotObservable()) 

992 del obs 

993 

994 assert blockresponse is not None, "Block1 loop broke without setting a response" 

995 blockresponse.opt.block1 = None 

996 

997 # FIXME check with RFC7959: it just says "send requests similar to the 

998 # requests in the Block1 phase", what does that mean? using the last 

999 # block1 as a reference for now, especially because in the 

1000 # only-one-request-block case, that's the original request we must send 

1001 # again and again anyway 

1002 assembled_response = await cls._complete_by_requesting_block2(protocol, current_block1, blockresponse, log) 

1003 

1004 response.set_result(assembled_response) 

1005 # finally set the result 

1006 

1007 if lower_observation is not None: 

1008 # FIXME this can all be simplified a lot since it's no more 

1009 # expected that observations shut themselves down when GC'd. 

1010 obs = weak_observation() 

1011 del weak_observation 

1012 if obs is None: 

1013 lower_observation.cancel() 

1014 return 

1015 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask 

1016 subtask = asyncio.create_task( 

1017 cls._run_observation( 

1018 app_request, 

1019 lower_observation, 

1020 future_weak_observation, 

1021 protocol, 

1022 log), 

1023 **py38args(name="Blockwise observation for %r" % app_request) 

1024 ) 

1025 future_weak_observation.set_result(weakref.ref(obs, lambda obs: subtask.cancel())) 

1026 obs.on_cancel(subtask.cancel) 

1027 del obs 

1028 await subtask 

1029 

1030 @classmethod 

1031 async def _run_observation(cls, original_request, lower_observation, future_weak_observation, protocol, log): 

1032 weak_observation = await future_weak_observation 

1033 # we can use weak_observation() here at any time, because whenever that 

1034 # becomes None, this task gets cancelled 

1035 try: 

1036 async for block1_notification in lower_observation: 

1037 log.debug("Notification received") 

1038 full_notification = await cls._complete_by_requesting_block2(protocol, original_request, block1_notification, log) 

1039 log.debug("Reporting completed notification") 

1040 weak_observation().callback(full_notification) 

1041 # FIXME verify that this loop actually ends iff the observation 

1042 # was cancelled -- otherwise find out the cause(s) or make it not 

1043 # cancel under indistinguishable circumstances 

1044 weak_observation().error(error.ObservationCancelled()) 

1045 except asyncio.CancelledError: 

1046 return 

1047 except Exception as e: 

1048 weak_observation().error(e) 

1049 

1050 @classmethod 

1051 async def _complete_by_requesting_block2(cls, protocol, request_to_repeat, initial_response, log): 

1052 # FIXME this can probably be deduplicated against BlockwiseRequest 

1053 

1054 if initial_response.opt.block2 is None or initial_response.opt.block2.more is False: 

1055 initial_response.opt.block2 = None 

1056 return initial_response 

1057 

1058 if initial_response.opt.block2.block_number != 0: 

1059 log.error("Error assembling blockwise response (expected first block)") 

1060 raise error.UnexpectedBlock2() 

1061 

1062 assembled_response = initial_response 

1063 last_response = initial_response 

1064 while True: 

1065 current_block2 = request_to_repeat._generate_next_block2_request(assembled_response) 

1066 

1067 current_block2 = current_block2.copy(remote=initial_response.remote) 

1068 

1069 blockrequest = protocol.request(current_block2, handle_blockwise=False) 

1070 last_response = await blockrequest.response 

1071 

1072 if last_response.opt.block2 is None: 

1073 log.warning("Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response.") 

1074 return last_response 

1075 

1076 block2 = last_response.opt.block2 

1077 log.debug("Response with Block2 option received, number = %d, more = %d, size_exp = %d.", block2.block_number, block2.more, block2.size_exponent) 

1078 try: 

1079 assembled_response._append_response_block(last_response) 

1080 except error.Error as e: 

1081 log.error("Error assembling blockwise response, passing on error %r", e) 

1082 raise 

1083 

1084 if block2.more is False: 

1085 return assembled_response 

1086 

1087class ClientObservation: 

1088 """An interface to observe notification updates arriving on a request. 

1089 

1090 This class does not actually provide any of the observe functionality, it 

1091 is purely a container for dispatching the messages via callbacks or 

1092 asynchronous iteration. It gets driven (ie. populated with responses or 

1093 errors including observation termination) by a Request object. 

1094 """ 

1095 def __init__(self): 

1096 self.callbacks = [] 

1097 self.errbacks = [] 

1098 

1099 self.cancelled = False 

1100 self._on_cancel = [] 

1101 

1102 self._latest_response = None 

1103 # the analogous error is stored in _cancellation_reason when cancelled. 

1104 

1105 def __aiter__(self): 

1106 """`async for` interface to observations. Currently, this still loses 

1107 information to the application (the reason for the termination is 

1108 unclear). 

1109 

1110 Experimental Interface.""" 

1111 it = self._Iterator() 

1112 self.register_callback(it.push) 

1113 self.register_errback(it.push_err) 

1114 return it 

1115 

1116 class _Iterator: 

1117 def __init__(self): 

1118 self._future = asyncio.get_event_loop().create_future() 

1119 

1120 def push(self, item): 

1121 if self._future.done(): 

1122 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1123 self._future = asyncio.get_event_loop().create_future() 

1124 self._future.set_result(item) 

1125 

1126 def push_err(self, e): 

1127 if self._future.done(): 

1128 self._future = asyncio.get_event_loop().create_future() 

1129 self._future.set_exception(e) 

1130 

1131 async def __anext__(self): 

1132 f = self._future 

1133 try: 

1134 result = await self._future 

1135 # FIXME see `await servobs._trigger` comment: might waiting for 

1136 # the original future not yield the first future's result when 

1137 # a quick second future comes in in a push? 

1138 if f is self._future: 

1139 self._future = asyncio.get_event_loop().create_future() 

1140 return result 

1141 except (error.NotObservable, error.ObservationCancelled): 

1142 # only exit cleanly when the server -- right away or later -- 

1143 # states that the resource is not observable any more 

1144 # FIXME: check whether an unsuccessful message is still passed 

1145 # as an observation result (or whether it should be) 

1146 raise StopAsyncIteration 

1147 

1148 def __del__(self): 

1149 if self._future.done(): 

1150 try: 

1151 # Fetch the result so any errors show up at least in the 

1152 # finalizer output 

1153 self._future.result() 

1154 except (error.ObservationCancelled, error.NotObservable): 

1155 # This is the case at the end of an observation cancelled 

1156 # by the server. 

1157 pass 

1158 except (error.LibraryShutdown, asyncio.exceptions.CancelledError): 

1159 pass 

1160 

1161 def register_callback(self, callback): 

1162 """Call the callback whenever a response to the message comes in, and 

1163 pass the response to it.""" 

1164 if self.cancelled: 

1165 return 

1166 

1167 self.callbacks.append(callback) 

1168 if self._latest_response is not None: 

1169 callback(self._latest_response) 

1170 

1171 def register_errback(self, callback): 

1172 """Call the callback whenever something goes wrong with the 

1173 observation, and pass an exception to the callback. After such a 

1174 callback is called, no more callbacks will be issued.""" 

1175 if self.cancelled: 

1176 callback(self._cancellation_reason) 

1177 return 

1178 self.errbacks.append(callback) 

1179 

1180 def callback(self, response): 

1181 """Notify all listeners of an incoming response""" 

1182 

1183 self._latest_response = response 

1184 

1185 for c in self.callbacks: 

1186 c(response) 

1187 

1188 def error(self, exception): 

1189 """Notify registered listeners that the observation went wrong. This 

1190 can only be called once.""" 

1191 

1192 for c in self.errbacks: 

1193 c(exception) 

1194 

1195 self.cancel() 

1196 self._cancellation_reason = exception 

1197 

1198 def cancel(self): 

1199 # FIXME determine whether this is called by anything other than error, 

1200 # and make it private so there is always a _cancellation_reason 

1201 """Cease to generate observation or error events. This will not 

1202 generate an error by itself.""" 

1203 

1204 assert not self.cancelled 

1205 

1206 # make sure things go wrong when someone tries to continue this 

1207 self.errbacks = None 

1208 self.callbacks = None 

1209 

1210 self.cancelled = True 

1211 while self._on_cancel: 

1212 self._on_cancel.pop()() 

1213 

1214 self._cancellation_reason = None 

1215 

1216 def on_cancel(self, callback): 

1217 if self.cancelled: 

1218 callback() 

1219 self._on_cancel.append(callback) 

1220 

1221 def __repr__(self): 

1222 return '<%s %s at %#x>'%(type(self).__name__, "(cancelled)" if self.cancelled else "(%s call-, %s errback(s))"%(len(self.callbacks), len(self.errbacks)), id(self)) 

1223 

1224class ServerObservation: 

1225 def __init__(self): 

1226 self._accepted = False 

1227 self._trigger = asyncio.get_event_loop().create_future() 

1228 # A deregistration is "early" if it happens before the response message 

1229 # is actually sent; calling deregister() in that time (typically during 

1230 # `render()`) will not send an unsuccessful response message but just 

1231 # sent this flag which is set to None as soon as it is too late for an 

1232 # early deregistration. 

1233 # This mechanism is temporary until more of aiocoap behaves like 

1234 # PlumbingRequest which does not suffer from this limitation. 

1235 self._early_deregister = False 

1236 self._late_deregister = False 

1237 

1238 def accept(self, cancellation_callback): 

1239 self._accepted = True 

1240 self._cancellation_callback = cancellation_callback 

1241 

1242 def deregister(self, reason=None): 

1243 if self._early_deregister is False: 

1244 self._early_deregister = True 

1245 return 

1246 

1247 warnings.warn("Late use of ServerObservation.deregister() is" 

1248 " deprecated, use .trigger with an unsuccessful value" 

1249 " instead", 

1250 DeprecationWarning) 

1251 self.trigger(Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable")) 

1252 

1253 def trigger(self, response=None, *, is_last=False): 

1254 """Send an updated response; if None is given, the observed resource's 

1255 rendering will be invoked to produce one. 

1256 

1257 `is_last` can be set to True to indicate that no more responses will be 

1258 sent. Note that an unsuccessful response will be the last no matter 

1259 what is_last says, as such a message always terminates a CoAP 

1260 observation.""" 

1261 if is_last: 

1262 self._late_deregister = True 

1263 if self._trigger.done(): 

1264 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1265 self._trigger = asyncio.get_event_loop().create_future() 

1266 self._trigger.set_result(response)