Coverage for aiocoap/transports/simple6.py: 88%

91 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-16 16:09 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module implements a MessageInterface for UDP based on the asyncio 

6DatagramProtocol. 

7 

8This is a simple version that works only for clients (by creating a dedicated 

9unbound but connected socket for each communication partner) and probably not 

10with multicast (it is assumed to be unsafe for multicast), which can be 

11expected to work even on platforms where the :mod:`.udp6` module can not be 

12made to work (Android, OSX, Windows for missing ``recvmsg`` and socket options, 

13or any event loops that don't have an add_reader method). 

14 

15Note that the name of the module is a misnomer (and the module is likely to be 

16renamed): Nothing in it is IPv6 specific; the socket is created using whichever 

17address family the OS chooses based on the given host name. 

18 

19One small but noteworthy detail about this transport is that it does not 

20distinguish between IP literals and host names. As a result, requests and 

21responses from remotes will appear to arrive from a remote whose netloc is the 

22requested name, not an IP literal. 

23 

24This transport is experimental, likely to change, and not fully tested yet 

25(because the test suite is not yet ready to matrix-test the same tests with 

26different transport implementations, and because it still fails in proxy 

27blockwise tests). 

28 

29For one particular use case, this may be usable for servers in a sense: If (and 

30only if) all incoming requests are only ever sent from clients that were 

31previously addressed as servers by the running instance. (This is generally 

32undesirable as it greatly limits the usefulness of the server, but is used in 

33LwM2M setups). As such a setup makes demands on the peer that are not justified 

34by the CoAP specification (in particular, that it send requests from a 

35particular port), this should still only be used for cases where the udp6 

36transport is unavailable due to platform limitations. 

37""" 

38 

39import asyncio 

40from collections import OrderedDict 

41 

42from aiocoap import interfaces 

43from aiocoap import COAP_PORT 

44from ..util import hostportjoin 

45from ..util.asyncio import py38args 

46from .generic_udp import GenericMessageInterface 

47 

48class _Connection(asyncio.DatagramProtocol, interfaces.EndpointAddress): 

49 def __init__(self, ready_callback, message_interface: "GenericMessageInterface", stored_sockaddr): 

50 self._ready_callback = ready_callback 

51 self._message_interface = message_interface 

52 

53 # This gets stored in the _Connection because not all implementations 

54 # of datagram transports will expose the get_extra_info('socket') 

55 # (right now, all I knew do), or their backend might not be a connected 

56 # socket (like in uvloop), so the information can't be just obtained 

57 # from the transport, but is needed to implement .hostinfo 

58 # 

59 # If _Connections become used in other contexts (eg. tinydtls starts 

60 # using them), it might be a good idea to move all this into a subclass 

61 # and split it from the pure networking stuff. 

62 self.hostinfo = hostportjoin(stored_sockaddr[0], None if stored_sockaddr[1] == COAP_PORT else stored_sockaddr[1]) 

63 

64 self._stage = "initializing" #: Status property purely for debugging 

65 

66 def __repr__(self): 

67 return "<%s at %#x on transport %s, %s>" % ( 

68 type(self).__name__, 

69 id(self), 

70 getattr(self, "_transport", "(none)"), 

71 self._stage) 

72 

73 # address interface 

74 

75 is_multicast = False 

76 

77 is_multicast_locally = False 

78 

79 scheme = 'coap' 

80 

81 # statically initialized in init 

82 hostinfo = None 

83 uri_base = None 

84 uri_base = property(lambda self: 'coap://' + self.hostinfo) 

85 

86 @property 

87 def hostinfo_local(self): 

88 # This can only be done on a best-effort base here. Unlike the below 

89 # hostinfo (see comments there), there is no easy way around this, so 

90 # if there are still implementations out that don't do the extras, 

91 # that's it and the calling site should reconsider whether they need 

92 # something that can not be determined. (Some more effort could go into 

93 # falling back to get_extra_info('socket').getsockname(), but that 

94 # should really be fixed in the transport provider). 

95 if not hasattr(self, '_transport'): 

96 raise RuntimeError("Simple6 does not have defined local host info in current stage %s" % self._stage) 

97 sockname = self._transport.get_extra_info('sockname') 

98 if sockname is None: 

99 raise RuntimeError("Simple6 can not determine local address from the underlying UDP implementation") 

100 return hostportjoin(*sockname[:2]) 

101 uri_base_local = property(lambda self: 'coap://' + self.hostinfo_local) 

102 

103 @property 

104 def blockwise_key(self): 

105 # Not pulling in hostinfo_local as that's unreliable anyway -- if we 

106 # change identity and the (UDP sever) follows its requests across our 

107 # changing port, that's probably fine. 

108 return self.hostinfo 

109 

110# fully disabled because some implementations of asyncio don't make the 

111# information available; going the easy route and storing it for all (see 

112# attribute population in __init__) 

113 

114# # FIXME continued: probably this is, and the above is not (or should not be) 

115# @property 

116# def hostinfo(self): 

117# print("ACCESSING HOSTINFO") 

118# host, port = self._transport.get_extra_info('socket').getpeername()[:2] 

119# if port == COAP_PORT: 

120# port = None 

121# # FIXME this should use some of the _plainaddress mechanisms of the udp6 addresses 

122# return hostportjoin(host, port) 

123 

124 # datagram protocol interface 

125 

126 def connection_made(self, transport): 

127 self._transport = transport 

128 self._ready_callback() 

129 self._stage = "active" 

130 del self._ready_callback 

131 

132 def datagram_received(self, data, address): 

133 self._message_interface._received_datagram(self, data) 

134 

135 def error_received(self, exception): 

136 self._message_interface._received_exception(self, exception) 

137 

138 def connection_lost(self, exception): 

139 if exception is None: 

140 pass 

141 else: 

142 self._new_error_callback(self, exception) 

143 

144 # whatever it is _DatagramClientSocketpoolSimple6 expects 

145 

146 # ... because generic_udp expects it from _DatagramClientSocketpoolSimple6 

147 def send(self, data): 

148 self._transport.sendto(data, None) 

149 

150 async def shutdown(self): 

151 self._stage = "shutting down" 

152 self._transport.abort() 

153 del self._message_interface 

154 self._stage = "destroyed" 

155 

156class _DatagramClientSocketpoolSimple6: 

157 """This class is used to explore what an Python/asyncio abstraction around 

158 a hypothetical "UDP connections" mechanism could look like. 

159 

160 Assume there were a socket variety that had UDP messages (ie. unreliable, 

161 unordered, boundary-preserving) but that can do an accept() like a TCP 

162 listening socket can, and can create outgoing connection-ish sockets from 

163 the listeing port. 

164 

165 That interface would be usable for all UDP-based CoAP transport 

166 implementations; this particular implementation, due to limitations of 

167 POSIX sockets (and the additional limitations imposed on it like not using 

168 PKTINFO) provides the interface, but only implements the outgoing part, and 

169 will not allow setting the outgoing port or interface.""" 

170 

171 max_sockets = 64 

172 

173 # FIXME (new_message_callback, new_error_callback) should probably rather 

174 # be one object with a defined interface; either that's the 

175 # MessageInterfaceSimple6 and stored accessibly (so the Protocol can know 

176 # which MessageInterface to talk to for sending), or we move the 

177 # MessageInterface out completely and have that object be the Protocol, 

178 # and the Protocol can even send new packages via the address 

179 def __init__(self, loop, mi: "GenericMessageInterface"): 

180 # using an OrderedDict to implement an LRU cache as it's suitable for that purpose according to its documentation 

181 self._sockets = OrderedDict() 

182 

183 self._loop = loop 

184 self._message_interface = mi 

185 

186 async def _maybe_purge_sockets(self): 

187 while len(self._sockets) >= self.max_sockets: # more of an if 

188 oldaddr, oldest = next(iter(self._sockets.items())) 

189 await oldest.shutdown() 

190 del self._sockets[oldaddr] 

191 

192 async def connect(self, sockaddr): 

193 """Create a new socket with a given remote socket address 

194 

195 Note that the sockaddr does not need to be fully resolved or complete, 

196 as it is not used for matching incoming packages; ('host.example.com', 

197 5683) is perfectly OK (and will create a different outgoing socket that 

198 ('hostalias.example.com', 5683) even if that has the same address, for 

199 better or for worse). 

200 

201 For where the general underlying interface is concerned, it is not yet 

202 fixed at all when this must return identical objects.""" 

203 

204 protocol = self._sockets.get(sockaddr) 

205 if protocol is not None: 

206 self._sockets.move_to_end(sockaddr) 

207 return protocol 

208 

209 await self._maybe_purge_sockets() 

210 

211 ready = asyncio.get_running_loop().create_future() 

212 transport, protocol = await self._loop.create_datagram_endpoint( 

213 lambda: _Connection(lambda: ready.set_result(None), self._message_interface, sockaddr), 

214 remote_addr=sockaddr) 

215 await ready 

216 

217# # Enable this to easily make every connection to localhost a new one 

218# # during testing 

219# import random 

220# sockaddr = sockaddr + (random.random(),) 

221 

222 # FIXME twice: 1., those never get removed yet (should timeout or 

223 # remove themselves on error), and 2., this is racy against a shutdown right after a connect 

224 self._sockets[sockaddr] = protocol 

225 

226 return protocol 

227 

228 async def shutdown(self): 

229 # preventing the creation of new sockets early on, and generally 

230 # breaking cycles 

231 del self._message_interface 

232 

233 if self._sockets: 

234 done, pending = await asyncio.wait([ 

235 asyncio.create_task( 

236 s.shutdown(), 

237 **py38args(name="Socket shutdown of %r" % s) 

238 ) 

239 for s 

240 in self._sockets.values()]) 

241 for item in done: 

242 await item 

243 del self._sockets 

244 

245class MessageInterfaceSimple6(GenericMessageInterface): 

246 @classmethod 

247 async def create_client_transport_endpoint(cls, ctx, log, loop): 

248 self = cls(ctx, log, loop) 

249 

250 # Cyclic reference broken during shutdown 

251 self._pool = _DatagramClientSocketpoolSimple6(self._loop, self) 

252 return self 

253 

254 async def recognize_remote(self, remote): 

255 return isinstance(remote, _Connection) and remote in self._pool._sockets