Coverage for aiocoap/transports/simple6.py: 88%
91 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module implements a MessageInterface for UDP based on the asyncio
6DatagramProtocol.
8This is a simple version that works only for clients (by creating a dedicated
9unbound but connected socket for each communication partner) and probably not
10with multicast (it is assumed to be unsafe for multicast), which can be
11expected to work even on platforms where the :mod:`.udp6` module can not be
12made to work (Android, OSX, Windows for missing ``recvmsg`` and socket options,
13or any event loops that don't have an add_reader method).
15Note that the name of the module is a misnomer (and the module is likely to be
16renamed): Nothing in it is IPv6 specific; the socket is created using whichever
17address family the OS chooses based on the given host name.
19One small but noteworthy detail about this transport is that it does not
20distinguish between IP literals and host names. As a result, requests and
21responses from remotes will appear to arrive from a remote whose netloc is the
22requested name, not an IP literal.
24This transport is experimental, likely to change, and not fully tested yet
25(because the test suite is not yet ready to matrix-test the same tests with
26different transport implementations, and because it still fails in proxy
27blockwise tests).
29For one particular use case, this may be usable for servers in a sense: If (and
30only if) all incoming requests are only ever sent from clients that were
31previously addressed as servers by the running instance. (This is generally
32undesirable as it greatly limits the usefulness of the server, but is used in
33LwM2M setups). As such a setup makes demands on the peer that are not justified
34by the CoAP specification (in particular, that it send requests from a
35particular port), this should still only be used for cases where the udp6
36transport is unavailable due to platform limitations.
37"""
39import asyncio
40from collections import OrderedDict
42from aiocoap import interfaces
43from aiocoap import COAP_PORT
44from ..util import hostportjoin
45from ..util.asyncio import py38args
46from .generic_udp import GenericMessageInterface
48class _Connection(asyncio.DatagramProtocol, interfaces.EndpointAddress):
49 def __init__(self, ready_callback, message_interface: "GenericMessageInterface", stored_sockaddr):
50 self._ready_callback = ready_callback
51 self._message_interface = message_interface
53 # This gets stored in the _Connection because not all implementations
54 # of datagram transports will expose the get_extra_info('socket')
55 # (right now, all I knew do), or their backend might not be a connected
56 # socket (like in uvloop), so the information can't be just obtained
57 # from the transport, but is needed to implement .hostinfo
58 #
59 # If _Connections become used in other contexts (eg. tinydtls starts
60 # using them), it might be a good idea to move all this into a subclass
61 # and split it from the pure networking stuff.
62 self.hostinfo = hostportjoin(stored_sockaddr[0], None if stored_sockaddr[1] == COAP_PORT else stored_sockaddr[1])
64 self._stage = "initializing" #: Status property purely for debugging
66 def __repr__(self):
67 return "<%s at %#x on transport %s, %s>" % (
68 type(self).__name__,
69 id(self),
70 getattr(self, "_transport", "(none)"),
71 self._stage)
73 # address interface
75 is_multicast = False
77 is_multicast_locally = False
79 scheme = 'coap'
81 # statically initialized in init
82 hostinfo = None
83 uri_base = None
84 uri_base = property(lambda self: 'coap://' + self.hostinfo)
86 @property
87 def hostinfo_local(self):
88 # This can only be done on a best-effort base here. Unlike the below
89 # hostinfo (see comments there), there is no easy way around this, so
90 # if there are still implementations out that don't do the extras,
91 # that's it and the calling site should reconsider whether they need
92 # something that can not be determined. (Some more effort could go into
93 # falling back to get_extra_info('socket').getsockname(), but that
94 # should really be fixed in the transport provider).
95 if not hasattr(self, '_transport'):
96 raise RuntimeError("Simple6 does not have defined local host info in current stage %s" % self._stage)
97 sockname = self._transport.get_extra_info('sockname')
98 if sockname is None:
99 raise RuntimeError("Simple6 can not determine local address from the underlying UDP implementation")
100 return hostportjoin(*sockname[:2])
101 uri_base_local = property(lambda self: 'coap://' + self.hostinfo_local)
103 @property
104 def blockwise_key(self):
105 # Not pulling in hostinfo_local as that's unreliable anyway -- if we
106 # change identity and the (UDP sever) follows its requests across our
107 # changing port, that's probably fine.
108 return self.hostinfo
110# fully disabled because some implementations of asyncio don't make the
111# information available; going the easy route and storing it for all (see
112# attribute population in __init__)
114# # FIXME continued: probably this is, and the above is not (or should not be)
115# @property
116# def hostinfo(self):
117# print("ACCESSING HOSTINFO")
118# host, port = self._transport.get_extra_info('socket').getpeername()[:2]
119# if port == COAP_PORT:
120# port = None
121# # FIXME this should use some of the _plainaddress mechanisms of the udp6 addresses
122# return hostportjoin(host, port)
124 # datagram protocol interface
126 def connection_made(self, transport):
127 self._transport = transport
128 self._ready_callback()
129 self._stage = "active"
130 del self._ready_callback
132 def datagram_received(self, data, address):
133 self._message_interface._received_datagram(self, data)
135 def error_received(self, exception):
136 self._message_interface._received_exception(self, exception)
138 def connection_lost(self, exception):
139 if exception is None:
140 pass
141 else:
142 self._new_error_callback(self, exception)
144 # whatever it is _DatagramClientSocketpoolSimple6 expects
146 # ... because generic_udp expects it from _DatagramClientSocketpoolSimple6
147 def send(self, data):
148 self._transport.sendto(data, None)
150 async def shutdown(self):
151 self._stage = "shutting down"
152 self._transport.abort()
153 del self._message_interface
154 self._stage = "destroyed"
156class _DatagramClientSocketpoolSimple6:
157 """This class is used to explore what an Python/asyncio abstraction around
158 a hypothetical "UDP connections" mechanism could look like.
160 Assume there were a socket variety that had UDP messages (ie. unreliable,
161 unordered, boundary-preserving) but that can do an accept() like a TCP
162 listening socket can, and can create outgoing connection-ish sockets from
163 the listeing port.
165 That interface would be usable for all UDP-based CoAP transport
166 implementations; this particular implementation, due to limitations of
167 POSIX sockets (and the additional limitations imposed on it like not using
168 PKTINFO) provides the interface, but only implements the outgoing part, and
169 will not allow setting the outgoing port or interface."""
171 max_sockets = 64
173 # FIXME (new_message_callback, new_error_callback) should probably rather
174 # be one object with a defined interface; either that's the
175 # MessageInterfaceSimple6 and stored accessibly (so the Protocol can know
176 # which MessageInterface to talk to for sending), or we move the
177 # MessageInterface out completely and have that object be the Protocol,
178 # and the Protocol can even send new packages via the address
179 def __init__(self, loop, mi: "GenericMessageInterface"):
180 # using an OrderedDict to implement an LRU cache as it's suitable for that purpose according to its documentation
181 self._sockets = OrderedDict()
183 self._loop = loop
184 self._message_interface = mi
186 async def _maybe_purge_sockets(self):
187 while len(self._sockets) >= self.max_sockets: # more of an if
188 oldaddr, oldest = next(iter(self._sockets.items()))
189 await oldest.shutdown()
190 del self._sockets[oldaddr]
192 async def connect(self, sockaddr):
193 """Create a new socket with a given remote socket address
195 Note that the sockaddr does not need to be fully resolved or complete,
196 as it is not used for matching incoming packages; ('host.example.com',
197 5683) is perfectly OK (and will create a different outgoing socket that
198 ('hostalias.example.com', 5683) even if that has the same address, for
199 better or for worse).
201 For where the general underlying interface is concerned, it is not yet
202 fixed at all when this must return identical objects."""
204 protocol = self._sockets.get(sockaddr)
205 if protocol is not None:
206 self._sockets.move_to_end(sockaddr)
207 return protocol
209 await self._maybe_purge_sockets()
211 ready = asyncio.get_running_loop().create_future()
212 transport, protocol = await self._loop.create_datagram_endpoint(
213 lambda: _Connection(lambda: ready.set_result(None), self._message_interface, sockaddr),
214 remote_addr=sockaddr)
215 await ready
217# # Enable this to easily make every connection to localhost a new one
218# # during testing
219# import random
220# sockaddr = sockaddr + (random.random(),)
222 # FIXME twice: 1., those never get removed yet (should timeout or
223 # remove themselves on error), and 2., this is racy against a shutdown right after a connect
224 self._sockets[sockaddr] = protocol
226 return protocol
228 async def shutdown(self):
229 # preventing the creation of new sockets early on, and generally
230 # breaking cycles
231 del self._message_interface
233 if self._sockets:
234 done, pending = await asyncio.wait([
235 asyncio.create_task(
236 s.shutdown(),
237 **py38args(name="Socket shutdown of %r" % s)
238 )
239 for s
240 in self._sockets.values()])
241 for item in done:
242 await item
243 del self._sockets
245class MessageInterfaceSimple6(GenericMessageInterface):
246 @classmethod
247 async def create_client_transport_endpoint(cls, ctx, log, loop):
248 self = cls(ctx, log, loop)
250 # Cyclic reference broken during shutdown
251 self._pool = _DatagramClientSocketpoolSimple6(self._loop, self)
252 return self
254 async def recognize_remote(self, remote):
255 return isinstance(remote, _Connection) and remote in self._pool._sockets