Coverage for aiocoap/protocol.py: 88%
481 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains the classes that are responsible for keeping track of
6messages:
8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP
9 socket) -- something that can send requests and possibly can answer
10 incoming requests.
12* a :class:`Request` gets generated whenever a request gets sent to keep
13 track of the response
15* a :class:`Responder` keeps track of a single incoming request
17Logging
18~~~~~~~
20Several constructors of the Context accept a logger name; these names go into
21the construction of a Python logger.
23Log events will be emitted to these on different levels, with "warning" and
24above being a practical default for things that should may warrant reviewing by
25an operator:
27* DEBUG is used for things that occur even under perfect conditions.
28* INFO is for things that are well expected, but might be interesting during
29 testing a network of nodes and not just when debugging the library. (This
30 includes timeouts, retransmissions, and pings.)
31* WARNING is for everything that indicates a malbehaved peer. These don't
32 *necessarily* indicate a client bug, though: Things like requesting a
33 nonexistent block can just as well happen when a resource's content has
34 changed between blocks. The library will not go out of its way to determine
35 whether there is a plausible explanation for the odd behavior, and will
36 report something as a warning in case of doubt.
37* ERROR is used when something clearly went wrong. This includes irregular
38 connection terminations and resource handler errors (which are demoted to
39 error responses), and can often contain a backtrace.
40"""
42import asyncio
43import weakref
44import time
45from typing import Optional, List
47from . import defaults
48from .credentials import CredentialsMap
49from .message import Message
50from .messagemanager import MessageManager
51from .tokenmanager import TokenManager
52from .pipe import Pipe, run_driving_pipe, error_to_message
53from . import interfaces
54from . import error
55from .numbers import (INTERNAL_SERVER_ERROR, NOT_FOUND,
56 CONTINUE, SHUTDOWN_TIMEOUT)
57from .util.asyncio import py38args
59import warnings
60import logging
63class Context(interfaces.RequestProvider):
64 """Applications' entry point to the network
66 A :class:`.Context` coordinates one or more network :mod:`.transports`
67 implementations and dispatches data between them and the application.
69 The application can start requests using the message dispatch methods, and
70 set a :class:`resources.Site` that will answer requests directed to the
71 application as a server.
73 On the library-internals side, it is the prime implementation of the
74 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and
75 :class:`Response` classes on demand, and decides which transport
76 implementations to start and which are to handle which messages.
78 **Context creation and destruction**
80 The following functions are provided for creating and stopping a context:
82 .. note::
84 A typical application should only ever create one context, even (or
85 especially when) it acts both as a server and as a client (in which
86 case a server context should be created).
88 A context that is not used any more must be shut down using
89 :meth:`.shutdown()`, but typical applications will not need to because
90 they use the context for the full process lifetime.
92 .. automethod:: create_client_context
93 .. automethod:: create_server_context
95 .. automethod:: shutdown
97 **Dispatching messages**
99 CoAP requests can be sent using the following functions:
101 .. automethod:: request
103 If more control is needed, you can create a :class:`Request` yourself and
104 pass the context to it.
107 **Other methods and properties**
109 The remaining methods and properties are to be considered unstable even
110 when the project reaches a stable version number; please file a feature
111 request for stabilization if you want to reliably access any of them.
113 (Sorry for the duplicates, still looking for a way to make autodoc list
114 everything not already mentioned).
116 """
117 def __init__(self, loop=None, serversite=None, loggername="coap", client_credentials=None, server_credentials=None):
118 self.log = logging.getLogger(loggername)
120 self.loop = loop or asyncio.get_event_loop()
122 self.serversite = serversite
124 self.request_interfaces = []
126 self.client_credentials = client_credentials or CredentialsMap()
127 self.server_credentials = server_credentials or CredentialsMap()
129 #
130 # convenience methods for class instanciation
131 #
133 async def _append_tokenmanaged_messagemanaged_transport(self, message_interface_constructor):
134 tman = TokenManager(self)
135 mman = MessageManager(tman)
136 transport = await message_interface_constructor(mman)
138 mman.message_interface = transport
139 tman.token_interface = mman
141 self.request_interfaces.append(tman)
143 async def _append_tokenmanaged_transport(self, token_interface_constructor):
144 tman = TokenManager(self)
145 transport = await token_interface_constructor(tman)
147 tman.token_interface = transport
149 self.request_interfaces.append(tman)
151 @classmethod
152 async def create_client_context(cls, *, loggername="coap", loop=None, transports: Optional[List[str]] = None):
153 """Create a context bound to all addresses on a random listening port.
155 This is the easiest way to get a context suitable for sending client
156 requests.
157 """
159 if loop is None:
160 loop = asyncio.get_event_loop()
162 self = cls(loop=loop, serversite=None, loggername=loggername)
164 selected_transports = transports or defaults.get_default_clienttransports(loop=loop)
166 # FIXME make defaults overridable (postponed until they become configurable too)
167 for transportname in selected_transports:
168 if transportname == 'udp6':
169 from .transports.udp6 import MessageInterfaceUDP6
170 await self._append_tokenmanaged_messagemanaged_transport(
171 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(mman, log=self.log, loop=loop))
172 elif transportname == 'simple6':
173 from .transports.simple6 import MessageInterfaceSimple6
174 await self._append_tokenmanaged_messagemanaged_transport(
175 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(mman, log=self.log, loop=loop))
176 elif transportname == 'tinydtls':
177 from .transports.tinydtls import MessageInterfaceTinyDTLS
178 await self._append_tokenmanaged_messagemanaged_transport(
180 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(mman, log=self.log, loop=loop))
181 elif transportname == 'tcpclient':
182 from .transports.tcp import TCPClient
183 await self._append_tokenmanaged_transport(
184 lambda tman: TCPClient.create_client_transport(tman, self.log, loop))
185 elif transportname == 'tlsclient':
186 from .transports.tls import TLSClient
187 await self._append_tokenmanaged_transport(
188 lambda tman: TLSClient.create_client_transport(tman, self.log, loop, self.client_credentials))
189 elif transportname == 'ws':
190 from .transports.ws import WSPool
191 await self._append_tokenmanaged_transport(
192 lambda tman: WSPool.create_transport(tman, self.log, loop, client_credentials=self.client_credentials))
193 elif transportname == 'oscore':
194 from .transports.oscore import TransportOSCORE
195 oscoretransport = TransportOSCORE(self, self)
196 self.request_interfaces.append(oscoretransport)
197 else:
198 raise RuntimeError("Transport %r not know for client context creation" % transportname)
200 return self
202 @classmethod
203 async def create_server_context(cls, site, bind=None, *, loggername="coap-server", loop=None, _ssl_context=None, multicast=[], server_credentials=None, transports: Optional[List[str]] = None):
204 """Create a context, bound to all addresses on the CoAP port (unless
205 otherwise specified in the ``bind`` argument).
207 This is the easiest way to get a context suitable both for sending
208 client and accepting server requests.
210 The ``bind`` argument, if given, needs to be a 2-tuple of IP address
211 string and port number, where the port number can be None to use the default port.
213 If ``multicast`` is given, it needs to be a list of (multicast address,
214 interface name) tuples, which will all be joined. (The IPv4 style of
215 selecting the interface by a local address is not supported; users may
216 want to use the netifaces package to arrive at an interface name for an
217 address).
219 As a shortcut, the list may also contain interface names alone. Those
220 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with
221 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6.
223 Under some circumstances you may already need a context to pass into
224 the site for creation; this is typically the case for servers that
225 trigger requests on their own. For those cases, it is usually easiest
226 to pass None in as a site, and set the fully constructed site later by
227 assigning to the ``serversite`` attribute.
228 """
230 if loop is None:
231 loop = asyncio.get_event_loop()
233 self = cls(loop=loop, serversite=site, loggername=loggername, server_credentials=server_credentials)
235 multicast_done = not multicast
237 selected_transports = transports or defaults.get_default_servertransports(loop=loop)
239 for transportname in selected_transports:
240 if transportname == 'udp6':
241 from .transports.udp6 import MessageInterfaceUDP6
243 await self._append_tokenmanaged_messagemanaged_transport(
244 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(mman, log=self.log, loop=loop, bind=bind, multicast=multicast))
245 multicast_done = True
246 # FIXME this is duplicated from the client version, as those are client-only anyway
247 elif transportname == 'simple6':
248 from .transports.simple6 import MessageInterfaceSimple6
249 await self._append_tokenmanaged_messagemanaged_transport(
250 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(mman, log=self.log, loop=loop))
251 elif transportname == 'tinydtls':
252 from .transports.tinydtls import MessageInterfaceTinyDTLS
254 await self._append_tokenmanaged_messagemanaged_transport(
255 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(mman, log=self.log, loop=loop))
256 # FIXME end duplication
257 elif transportname == 'tinydtls_server':
258 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer
260 await self._append_tokenmanaged_messagemanaged_transport(
261 lambda mman: MessageInterfaceTinyDTLSServer.create_server(bind, mman, log=self.log, loop=loop, server_credentials=self.server_credentials))
262 elif transportname == 'simplesocketserver':
263 from .transports.simplesocketserver import MessageInterfaceSimpleServer
264 await self._append_tokenmanaged_messagemanaged_transport(
265 lambda mman: MessageInterfaceSimpleServer.create_server(bind, mman, log=self.log, loop=loop))
266 elif transportname == 'tcpserver':
267 from .transports.tcp import TCPServer
268 await self._append_tokenmanaged_transport(
269 lambda tman: TCPServer.create_server(bind, tman, self.log, loop))
270 elif transportname == 'tcpclient':
271 from .transports.tcp import TCPClient
272 await self._append_tokenmanaged_transport(
273 lambda tman: TCPClient.create_client_transport(tman, self.log, loop))
274 elif transportname == 'tlsserver':
275 if _ssl_context is not None:
276 from .transports.tls import TLSServer
277 await self._append_tokenmanaged_transport(
278 lambda tman: TLSServer.create_server(bind, tman, self.log, loop, _ssl_context))
279 elif transportname == 'tlsclient':
280 from .transports.tls import TLSClient
281 await self._append_tokenmanaged_transport(
282 lambda tman: TLSClient.create_client_transport(tman, self.log, loop, self.client_credentials))
283 elif transportname == 'ws':
284 from .transports.ws import WSPool
285 await self._append_tokenmanaged_transport(
286 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind
287 lambda tman: WSPool.create_transport(tman, self.log, loop, client_credentials=self.client_credentials, server_bind=bind or (None, None), server_context=_ssl_context))
288 elif transportname == 'oscore':
289 from .transports.oscore import TransportOSCORE
290 oscoretransport = TransportOSCORE(self, self)
291 self.request_interfaces.append(oscoretransport)
292 else:
293 raise RuntimeError("Transport %r not know for server context creation" % transportname)
295 if not multicast_done:
296 self.log.warning("Multicast was requested, but no multicast capable transport was selected.")
298 # This is used in tests to wait for externally launched servers to be ready
299 self.log.debug("Server ready to receive requests")
301 return self
303 async def shutdown(self):
304 """Take down any listening sockets and stop all related timers.
306 After this coroutine terminates, and once all external references to
307 the object are dropped, it should be garbage-collectable.
309 This method takes up to
310 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing
311 transports to perform any cleanup implemented in them (such as orderly
312 connection shutdown and cancelling observations, where the latter is
313 currently not implemented)."""
315 self.log.debug("Shutting down context")
317 done, pending = await asyncio.wait([
318 asyncio.create_task(
319 ri.shutdown(),
320 **py38args(name="Shutdown of %r" % ri)
321 )
322 for ri
323 in self.request_interfaces],
324 timeout=SHUTDOWN_TIMEOUT)
325 for item in done:
326 await item
328 # FIXME: determine how official this should be, or which part of it is
329 # public -- now that BlockwiseRequest uses it. (And formalize what can
330 # change about messages and what can't after the remote has been thusly
331 # populated).
332 async def find_remote_and_interface(self, message):
333 for ri in self.request_interfaces:
334 if await ri.fill_or_recognize_remote(message):
335 return ri
336 raise RuntimeError("No request interface could route message")
338 def request(self, request_message, handle_blockwise=True):
339 if handle_blockwise:
340 return BlockwiseRequest(self, request_message)
342 pipe = Pipe(request_message, self.log)
343 # Request sets up callbacks at creation
344 result = Request(pipe, self.loop, self.log)
346 async def send():
347 try:
348 request_interface = await self.find_remote_and_interface(request_message)
349 request_interface.request(pipe)
350 except Exception as e:
351 pipe.add_exception(e)
352 return
353 self.loop.create_task(
354 send(),
355 **py38args(name="Request processing of %r" % result)
356 )
357 return result
359 # the following are under consideration for moving into Site or something
360 # mixed into it
362 def render_to_pipe(self, pipe):
363 """Fill a pipe by running the site's render_to_pipe interface and
364 handling errors."""
366 pr_that_can_receive_errors = error_to_message(pipe, self.log)
368 run_driving_pipe(
369 pr_that_can_receive_errors,
370 self._render_to_pipe(pipe),
371 name="Rendering for %r" % pipe.request,
372 )
374 async def _render_to_pipe(self, pipe):
375 if self.serversite is None:
376 pipe.add_response(Message(code=NOT_FOUND, payload=b"not a server"), is_last=True)
377 return
379 return await self.serversite.render_to_pipe(pipe)
381class BaseRequest(object):
382 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`"""
384class BaseUnicastRequest(BaseRequest):
385 """A utility class that offers the :attr:`response_raising` and
386 :attr:`response_nonraising` alternatives to waiting for the
387 :attr:`response` future whose error states can be presented either as an
388 unsuccessful response (eg. 4.04) or an exception.
390 It also provides some internal tools for handling anything that has a
391 :attr:`response` future and an :attr:`observation`"""
393 @property
394 async def response_raising(self):
395 """An awaitable that returns if a response comes in and is successful,
396 otherwise raises generic network exception or a
397 :class:`.error.ResponseWrappingError` for unsuccessful responses.
399 Experimental Interface."""
401 response = await self.response
402 if not response.code.is_successful():
403 raise error.ResponseWrappingError(response)
405 return response
407 @property
408 async def response_nonraising(self):
409 """An awaitable that rather returns a 500ish fabricated message (as a
410 proxy would return) instead of raising an exception.
412 Experimental Interface."""
414 # FIXME: Can we smuggle error_to_message into the underlying pipe?
415 # That should make observe notifications into messages rather
416 # than exceptions as well, plus it has fallbacks for `e.to_message()`
417 # raising.
419 try:
420 return await self.response
421 except error.RenderableError as e:
422 return e.to_message()
423 except Exception:
424 return Message(code=INTERNAL_SERVER_ERROR)
426class Request(interfaces.Request, BaseUnicastRequest):
428 # FIXME: Implement timing out with REQUEST_TIMEOUT here
430 def __init__(self, pipe, loop, log):
431 self._pipe = pipe
433 self.response = loop.create_future()
435 if pipe.request.opt.observe == 0:
436 self.observation = ClientObservation()
437 else:
438 self.observation = None
440 self._runner = self._run()
441 self._runner.send(None)
442 def process(event):
443 try:
444 self._runner.send(event)
445 return True
446 except StopIteration:
447 return False
448 self._stop_interest = self._pipe.on_event(process)
450 self.log = log
452 self.response.add_done_callback(self._response_cancellation_handler)
454 def _response_cancellation_handler(self, response):
455 # Propagate cancellation to the runner (if interest in the first
456 # response is lost, there won't be observation items to pull out), but
457 # not general completion (because if it's completed and not cancelled,
458 # eg. when an observation is active)
459 if self.response.cancelled() and self._runner is not None:
460 # Dropping the only reference makes it stop with GeneratorExit,
461 # similar to a cancelled task
462 self._runner = None
463 self._stop_interest()
464 # But either way we won't be calling _stop_interest any more, so let's
465 # not keep anything referenced
466 self._stop_interest = None
468 @staticmethod
469 def _add_response_properties(response, request):
470 response.request = request
472 def _run(self):
473 # FIXME: This is in iterator form because it used to be a task that
474 # awaited futures, and that code could be easily converted to an
475 # iterator. I'm not sure that's a bad state here, but at least it
476 # should be a more conscious decision to make this an iterator rather
477 # than just having it happen to be one.
478 #
479 # FIXME: check that responses come from the same remmote as long as we're assuming unicast
481 first_event = yield None
483 if first_event.message is not None:
484 self._add_response_properties(first_event.message, self._pipe.request)
485 self.response.set_result(first_event.message)
486 else:
487 self.response.set_exception(first_event.exception)
488 if not isinstance(first_event.exception, error.Error):
489 self.log.warning(
490 "An exception that is not an aiocoap Error was raised "
491 "from a transport; please report this as a bug in "
492 "aiocoap: %r", first_event.exception)
494 if self.observation is None:
495 if not first_event.is_last:
496 self.log.error("Pipe indicated more possible responses"
497 " while the Request handler would not know what to"
498 " do with them, stopping any further request.")
499 self._pipe.stop_interest()
500 return
502 if first_event.is_last:
503 self.observation.error(error.NotObservable())
504 return
506 if first_event.message.opt.observe is None:
507 self.log.error("Pipe indicated more possible responses"
508 " while the Request handler would not know what to"
509 " do with them, stopping any further request.")
510 self._pipe.stop_interest()
511 return
513 # variable names from RFC7641 Section 3.4
514 v1 = first_event.message.opt.observe
515 t1 = time.time()
517 while True:
518 # We don't really support cancellation of observations yet (see
519 # https://github.com/chrysn/aiocoap/issues/92), but at least
520 # stopping the interest is a way to free the local resources after
521 # the first observation update, and to make the MID handler RST the
522 # observation on the next.
523 # FIXME: there *is* now a .on_cancel callback, we should at least
524 # hook into that, and possibly even send a proper cancellation
525 # then.
526 next_event = yield True
527 if self.observation.cancelled:
528 self._pipe.stop_interest()
529 return
531 if next_event.exception is not None:
532 self.observation.error(next_event.exception)
533 if not next_event.is_last:
534 self._pipe.stop_interest()
535 if not isinstance(next_event.exception, error.Error):
536 self.log.warning(
537 "An exception that is not an aiocoap Error was "
538 "raised from a transport during an observation; "
539 "please report this as a bug in aiocoap: %r",
540 next_event.exception)
541 return
543 self._add_response_properties(next_event.message, self._pipe.request)
545 if next_event.message.opt.observe is not None:
546 # check for reordering
547 v2 = next_event.message.opt.observe
548 t2 = time.time()
550 is_recent = (v1 < v2 and v2 - v1 < 2**23) or \
551 (v1 > v2 and v1 - v2 > 2**23) or \
552 (t2 > t1 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME)
553 if is_recent:
554 t1 = t2
555 v1 = v2
556 else:
557 # the terminal message is always the last
558 is_recent = True
560 if is_recent:
561 self.observation.callback(next_event.message)
563 if next_event.is_last:
564 self.observation.error(error.ObservationCancelled())
565 return
567 if next_event.message.opt.observe is None:
568 self.observation.error(error.ObservationCancelled())
569 self.log.error("Pipe indicated more possible responses"
570 " while the Request handler would not know what to"
571 " do with them, stopping any further request.")
572 self._pipe.stop_interest()
573 return
576class BlockwiseRequest(BaseUnicastRequest, interfaces.Request):
577 def __init__(self, protocol, app_request):
578 self.protocol = protocol
579 self.log = self.protocol.log.getChild("blockwise-requester")
581 self.response = protocol.loop.create_future()
583 if app_request.opt.observe is not None:
584 self.observation = ClientObservation()
585 else:
586 self.observation = None
588 self._runner = protocol.loop.create_task(self._run_outer(
589 app_request,
590 self.response,
591 weakref.ref(self.observation) if self.observation is not None else lambda: None,
592 self.protocol,
593 self.log,
594 ),
595 **py38args(name="Blockwise runner for %r" % app_request))
596 self.response.add_done_callback(self._response_cancellation_handler)
598 def _response_cancellation_handler(self, response_future):
599 # see Request._response_cancellation_handler
600 if self.response.cancelled():
601 self._runner.cancel()
603 @classmethod
604 async def _run_outer(cls, app_request, response, weak_observation, protocol, log):
605 try:
606 await cls._run(app_request, response, weak_observation, protocol, log)
607 except asyncio.CancelledError:
608 pass # results already set
609 except Exception as e:
610 logged = False
611 if not response.done():
612 logged = True
613 response.set_exception(e)
614 obs = weak_observation()
615 if app_request.opt.observe is not None and obs is not None:
616 logged = True
617 obs.error(e)
618 if not logged:
619 # should be unreachable
620 log.error("Exception in BlockwiseRequest runner neither went to response nor to observation: %s", e, exc_info=e)
622 # This is a class method because that allows self and self.observation to
623 # be freed even when this task is running, and the task to stop itself --
624 # otherwise we couldn't know when users just "forget" about a request
625 # object after using its response (esp. in observe cases) and leave this
626 # task running.
627 @classmethod
628 async def _run(cls, app_request, response, weak_observation, protocol, log):
629 # we need to populate the remote right away, because the choice of
630 # blocks depends on it.
631 await protocol.find_remote_and_interface(app_request)
633 size_exp = app_request.remote.maximum_block_size_exp
635 if app_request.opt.block1 is not None:
636 assert app_request.opt.block1.block_number == 0, "Unexpected block number in app_request"
637 assert not app_request.opt.block1.more, "Unexpected more-flag in app_request"
638 # this is where the library user can traditionally pass in size
639 # exponent hints into the library.
640 size_exp = app_request.opt.block1.size_exponent
642 # Offset in the message in blocks of size_exp. Whoever changes size_exp
643 # is responsible for updating this number.
644 block_cursor = 0
646 while True:
647 # ... send a chunk
649 if size_exp >= 6:
650 # FIXME from maximum_payload_size
651 fragmentation_threshold = app_request.remote.maximum_payload_size
652 else:
653 fragmentation_threshold = (2 ** (size_exp + 4))
655 if app_request.opt.block1 is not None or \
656 len(app_request.payload) > fragmentation_threshold:
657 current_block1 = app_request._extract_block(
658 block_cursor,
659 size_exp,
660 app_request.remote.maximum_payload_size)
661 if block_cursor == 0:
662 current_block1.opt.size1 = len(app_request.payload)
663 else:
664 current_block1 = app_request
666 blockrequest = protocol.request(current_block1, handle_blockwise=False)
667 blockresponse = await blockrequest.response
669 # store for future blocks to ensure that the next blocks will be
670 # sent from the same source address (in the UDP case; for many
671 # other transports it won't matter).
672 app_request.remote = blockresponse.remote
674 if blockresponse.opt.block1 is None:
675 if blockresponse.code.is_successful() and current_block1.opt.block1:
676 log.warning("Block1 option completely ignored by server, assuming it knows what it is doing.")
677 # FIXME: handle 4.13 and retry with the indicated size option
678 break
680 block1 = blockresponse.opt.block1
681 log.debug("Response with Block1 option received, number = %d, more = %d, size_exp = %d.", block1.block_number, block1.more, block1.size_exponent)
683 if block1.block_number != current_block1.opt.block1.block_number:
684 raise error.UnexpectedBlock1Option("Block number mismatch")
686 if size_exp == 7:
687 block_cursor += len(current_block1.payload) // 1024
688 else:
689 block_cursor += 1
691 while block1.size_exponent < size_exp:
692 block_cursor *= 2
693 size_exp -= 1
695 if not current_block1.opt.block1.more:
696 if block1.more or blockresponse.code == CONTINUE:
697 # treating this as a protocol error -- letting it slip
698 # through would misrepresent the whole operation as an
699 # over-all 2.xx (successful) one.
700 raise error.UnexpectedBlock1Option("Server asked for more data at end of body")
701 break
703 # checks before preparing the next round:
705 if blockresponse.opt.observe:
706 # we're not *really* interested in that block, we just sent an
707 # observe option to indicate that we'll want to observe the
708 # resulting representation as a whole
709 log.warning("Server answered Observe in early Block1 phase, cancelling the erroneous observation.")
710 blockrequest.observe.cancel()
712 if block1.more:
713 # FIXME i think my own server is dowing this wrong
714 #if response.code != CONTINUE:
715 # raise error.UnexpectedBlock1Option("more-flag set but no Continue")
716 pass
717 else:
718 if not blockresponse.code.is_successful():
719 break
720 else:
721 # ignoring (discarding) the successul intermediate result, waiting for a final one
722 continue
724 lower_observation = None
725 if app_request.opt.observe is not None:
726 if blockresponse.opt.observe is not None:
727 lower_observation = blockrequest.observation
728 else:
729 obs = weak_observation()
730 if obs:
731 obs.error(error.NotObservable())
732 del obs
734 assert blockresponse is not None, "Block1 loop broke without setting a response"
735 blockresponse.opt.block1 = None
737 # FIXME check with RFC7959: it just says "send requests similar to the
738 # requests in the Block1 phase", what does that mean? using the last
739 # block1 as a reference for now, especially because in the
740 # only-one-request-block case, that's the original request we must send
741 # again and again anyway
742 assembled_response = await cls._complete_by_requesting_block2(protocol, current_block1, blockresponse, log)
744 response.set_result(assembled_response)
745 # finally set the result
747 if lower_observation is not None:
748 # FIXME this can all be simplified a lot since it's no more
749 # expected that observations shut themselves down when GC'd.
750 obs = weak_observation()
751 del weak_observation
752 if obs is None:
753 lower_observation.cancel()
754 return
755 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask
756 subtask = asyncio.create_task(
757 cls._run_observation(
758 app_request,
759 lower_observation,
760 future_weak_observation,
761 protocol,
762 log),
763 **py38args(name="Blockwise observation for %r" % app_request)
764 )
765 future_weak_observation.set_result(weakref.ref(obs, lambda obs: subtask.cancel()))
766 obs.on_cancel(subtask.cancel)
767 del obs
768 await subtask
770 @classmethod
771 async def _run_observation(cls, original_request, lower_observation, future_weak_observation, protocol, log):
772 weak_observation = await future_weak_observation
773 # we can use weak_observation() here at any time, because whenever that
774 # becomes None, this task gets cancelled
775 try:
776 async for block1_notification in lower_observation:
777 log.debug("Notification received")
778 full_notification = await cls._complete_by_requesting_block2(protocol, original_request, block1_notification, log)
779 log.debug("Reporting completed notification")
780 weak_observation().callback(full_notification)
781 # FIXME verify that this loop actually ends iff the observation
782 # was cancelled -- otherwise find out the cause(s) or make it not
783 # cancel under indistinguishable circumstances
784 weak_observation().error(error.ObservationCancelled())
785 except asyncio.CancelledError:
786 return
787 except Exception as e:
788 weak_observation().error(e)
790 @classmethod
791 async def _complete_by_requesting_block2(cls, protocol, request_to_repeat, initial_response, log):
792 # FIXME this can probably be deduplicated against BlockwiseRequest
794 if initial_response.opt.block2 is None or initial_response.opt.block2.more is False:
795 initial_response.opt.block2 = None
796 return initial_response
798 if initial_response.opt.block2.block_number != 0:
799 log.error("Error assembling blockwise response (expected first block)")
800 raise error.UnexpectedBlock2()
802 assembled_response = initial_response
803 last_response = initial_response
804 while True:
805 current_block2 = request_to_repeat._generate_next_block2_request(assembled_response)
807 current_block2 = current_block2.copy(remote=initial_response.remote)
809 blockrequest = protocol.request(current_block2, handle_blockwise=False)
810 last_response = await blockrequest.response
812 if last_response.opt.block2 is None:
813 log.warning("Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response.")
814 return last_response
816 block2 = last_response.opt.block2
817 log.debug("Response with Block2 option received, number = %d, more = %d, size_exp = %d.", block2.block_number, block2.more, block2.size_exponent)
818 try:
819 assembled_response._append_response_block(last_response)
820 except error.Error as e:
821 log.error("Error assembling blockwise response, passing on error %r", e)
822 raise
824 if block2.more is False:
825 return assembled_response
827class ClientObservation:
828 """An interface to observe notification updates arriving on a request.
830 This class does not actually provide any of the observe functionality, it
831 is purely a container for dispatching the messages via callbacks or
832 asynchronous iteration. It gets driven (ie. populated with responses or
833 errors including observation termination) by a Request object.
834 """
835 def __init__(self):
836 self.callbacks = []
837 self.errbacks = []
839 self.cancelled = False
840 self._on_cancel = []
842 self._latest_response = None
843 # the analogous error is stored in _cancellation_reason when cancelled.
845 def __aiter__(self):
846 """`async for` interface to observations. Currently, this still loses
847 information to the application (the reason for the termination is
848 unclear).
850 Experimental Interface."""
851 it = self._Iterator()
852 self.register_callback(it.push)
853 self.register_errback(it.push_err)
854 return it
856 class _Iterator:
857 def __init__(self):
858 self._future = asyncio.get_event_loop().create_future()
860 def push(self, item):
861 if self._future.done():
862 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
863 self._future = asyncio.get_event_loop().create_future()
864 self._future.set_result(item)
866 def push_err(self, e):
867 if self._future.done():
868 self._future = asyncio.get_event_loop().create_future()
869 self._future.set_exception(e)
871 async def __anext__(self):
872 f = self._future
873 try:
874 result = await self._future
875 # FIXME see `await servobs._trigger` comment: might waiting for
876 # the original future not yield the first future's result when
877 # a quick second future comes in in a push?
878 if f is self._future:
879 self._future = asyncio.get_event_loop().create_future()
880 return result
881 except (error.NotObservable, error.ObservationCancelled):
882 # only exit cleanly when the server -- right away or later --
883 # states that the resource is not observable any more
884 # FIXME: check whether an unsuccessful message is still passed
885 # as an observation result (or whether it should be)
886 raise StopAsyncIteration
888 def __del__(self):
889 if self._future.done():
890 try:
891 # Fetch the result so any errors show up at least in the
892 # finalizer output
893 self._future.result()
894 except (error.ObservationCancelled, error.NotObservable):
895 # This is the case at the end of an observation cancelled
896 # by the server.
897 pass
898 except (error.LibraryShutdown, asyncio.CancelledError):
899 pass
901 def register_callback(self, callback):
902 """Call the callback whenever a response to the message comes in, and
903 pass the response to it."""
904 if self.cancelled:
905 return
907 self.callbacks.append(callback)
908 if self._latest_response is not None:
909 callback(self._latest_response)
911 def register_errback(self, callback):
912 """Call the callback whenever something goes wrong with the
913 observation, and pass an exception to the callback. After such a
914 callback is called, no more callbacks will be issued."""
915 if self.cancelled:
916 callback(self._cancellation_reason)
917 return
918 self.errbacks.append(callback)
920 def callback(self, response):
921 """Notify all listeners of an incoming response"""
923 self._latest_response = response
925 for c in self.callbacks:
926 c(response)
928 def error(self, exception):
929 """Notify registered listeners that the observation went wrong. This
930 can only be called once."""
932 for c in self.errbacks:
933 c(exception)
935 self.cancel()
936 self._cancellation_reason = exception
938 def cancel(self):
939 # FIXME determine whether this is called by anything other than error,
940 # and make it private so there is always a _cancellation_reason
941 """Cease to generate observation or error events. This will not
942 generate an error by itself."""
944 assert not self.cancelled
946 # make sure things go wrong when someone tries to continue this
947 self.errbacks = None
948 self.callbacks = None
950 self.cancelled = True
951 while self._on_cancel:
952 self._on_cancel.pop()()
954 self._cancellation_reason = None
956 def on_cancel(self, callback):
957 if self.cancelled:
958 callback()
959 self._on_cancel.append(callback)
961 def __repr__(self):
962 return '<%s %s at %#x>' % (type(self).__name__, "(cancelled)" if self.cancelled else "(%s call-, %s errback(s))" % (len(self.callbacks), len(self.errbacks)), id(self))
964class ServerObservation:
965 def __init__(self):
966 self._accepted = False
967 self._trigger = asyncio.get_event_loop().create_future()
968 # A deregistration is "early" if it happens before the response message
969 # is actually sent; calling deregister() in that time (typically during
970 # `render()`) will not send an unsuccessful response message but just
971 # sent this flag which is set to None as soon as it is too late for an
972 # early deregistration.
973 # This mechanism is temporary until more of aiocoap behaves like
974 # Pipe which does not suffer from this limitation.
975 self._early_deregister = False
976 self._late_deregister = False
978 def accept(self, cancellation_callback):
979 self._accepted = True
980 self._cancellation_callback = cancellation_callback
982 def deregister(self, reason=None):
983 if self._early_deregister is False:
984 self._early_deregister = True
985 return
987 warnings.warn("Late use of ServerObservation.deregister() is"
988 " deprecated, use .trigger with an unsuccessful value"
989 " instead",
990 DeprecationWarning)
991 self.trigger(Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable"))
993 def trigger(self, response=None, *, is_last=False):
994 """Send an updated response; if None is given, the observed resource's
995 rendering will be invoked to produce one.
997 `is_last` can be set to True to indicate that no more responses will be
998 sent. Note that an unsuccessful response will be the last no matter
999 what is_last says, as such a message always terminates a CoAP
1000 observation."""
1001 if is_last:
1002 self._late_deregister = True
1003 if self._trigger.done():
1004 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1005 self._trigger = asyncio.get_event_loop().create_future()
1006 self._trigger.set_result(response)