Coverage for aiocoap/transports/simple6.py: 89%

96 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-10 11:47 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module implements a MessageInterface for UDP based on the asyncio 

6DatagramProtocol. 

7 

8This is a simple version that works only for clients (by creating a dedicated 

9unbound but connected socket for each communication partner) and probably not 

10with multicast (it is assumed to be unsafe for multicast), which can be 

11expected to work even on platforms where the :mod:`.udp6` module can not be 

12made to work (Android, OSX, Windows for missing ``recvmsg`` and socket options, 

13or any event loops that don't have an add_reader method). 

14 

15Note that the name of the module is a misnomer (and the module is likely to be 

16renamed): Nothing in it is IPv6 specific; the socket is created using whichever 

17address family the OS chooses based on the given host name. 

18 

19One small but noteworthy detail about this transport is that it does not 

20distinguish between IP literals and host names. As a result, requests and 

21responses from remotes will appear to arrive from a remote whose netloc is the 

22requested name, not an IP literal. 

23 

24This transport is experimental, likely to change, and not fully tested yet 

25(because the test suite is not yet ready to matrix-test the same tests with 

26different transport implementations, and because it still fails in proxy 

27blockwise tests). 

28 

29For one particular use case, this may be usable for servers in a sense: If (and 

30only if) all incoming requests are only ever sent from clients that were 

31previously addressed as servers by the running instance. (This is generally 

32undesirable as it greatly limits the usefulness of the server, but is used in 

33LwM2M setups). As such a setup makes demands on the peer that are not justified 

34by the CoAP specification (in particular, that it send requests from a 

35particular port), this should still only be used for cases where the udp6 

36transport is unavailable due to platform limitations. 

37""" 

38 

39import asyncio 

40from collections import OrderedDict 

41import socket 

42from typing import Tuple 

43 

44from aiocoap import error 

45from aiocoap import interfaces 

46from ..numbers import COAP_PORT, constants 

47from ..util import hostportjoin 

48from .generic_udp import GenericMessageInterface 

49 

50 

51class _Connection(asyncio.DatagramProtocol, interfaces.EndpointAddress): 

52 def __init__( 

53 self, 

54 ready_callback, 

55 message_interface: "GenericMessageInterface", 

56 stored_sockaddr, 

57 ): 

58 self._ready_callback = ready_callback 

59 self._message_interface = message_interface 

60 

61 # This gets stored in the _Connection because not all implementations 

62 # of datagram transports will expose the get_extra_info('socket') 

63 # (right now, all I knew do), or their backend might not be a connected 

64 # socket (like in uvloop), so the information can't be just obtained 

65 # from the transport, but is needed to implement .hostinfo 

66 # 

67 # If _Connections become used in other contexts (eg. tinydtls starts 

68 # using them), it might be a good idea to move all this into a subclass 

69 # and split it from the pure networking stuff. 

70 self.hostinfo = hostportjoin( 

71 stored_sockaddr[0], 

72 None if stored_sockaddr[1] == COAP_PORT else stored_sockaddr[1], 

73 ) 

74 

75 self._stage = "initializing" #: Status property purely for debugging 

76 

77 def __repr__(self): 

78 return "<%s at %#x on transport %s, %s>" % ( 

79 type(self).__name__, 

80 id(self), 

81 getattr(self, "_transport", "(none)"), 

82 self._stage, 

83 ) 

84 

85 # address interface 

86 

87 is_multicast = False 

88 

89 is_multicast_locally = False 

90 

91 scheme = "coap" 

92 

93 # Unlike for other remotes, this is settable per instance. 

94 maximum_block_size_exp = constants.MAX_REGULAR_BLOCK_SIZE_EXP 

95 

96 # statically initialized in init 

97 hostinfo = None 

98 uri_base = None 

99 uri_base = property(lambda self: "coap://" + self.hostinfo) 

100 

101 @property 

102 def hostinfo_local(self): 

103 # This can only be done on a best-effort base here. Unlike the below 

104 # hostinfo (see comments there), there is no easy way around this, so 

105 # if there are still implementations out that don't do the extras, 

106 # that's it and the calling site should reconsider whether they need 

107 # something that can not be determined. (Some more effort could go into 

108 # falling back to get_extra_info('socket').getsockname(), but that 

109 # should really be fixed in the transport provider). 

110 if not hasattr(self, "_transport"): 

111 raise RuntimeError( 

112 "Simple6 does not have defined local host info in current stage %s" 

113 % self._stage 

114 ) 

115 sockname = self._transport.get_extra_info("sockname") 

116 if sockname is None: 

117 raise RuntimeError( 

118 "Simple6 can not determine local address from the underlying UDP implementation" 

119 ) 

120 return hostportjoin(*sockname[:2]) 

121 

122 uri_base_local = property(lambda self: "coap://" + self.hostinfo_local) 

123 

124 @property 

125 def blockwise_key(self): 

126 # Not pulling in hostinfo_local as that's unreliable anyway -- if we 

127 # change identity and the (UDP sever) follows its requests across our 

128 # changing port, that's probably fine. 

129 return self.hostinfo 

130 

131 # fully disabled because some implementations of asyncio don't make the 

132 # information available; going the easy route and storing it for all (see 

133 # attribute population in __init__) 

134 

135 # # FIXME continued: probably this is, and the above is not (or should not be) 

136 # @property 

137 # def hostinfo(self): 

138 # print("ACCESSING HOSTINFO") 

139 # host, port = self._transport.get_extra_info('socket').getpeername()[:2] 

140 # if port == COAP_PORT: 

141 # port = None 

142 # # FIXME this should use some of the _plainaddress mechanisms of the udp6 addresses 

143 # return hostportjoin(host, port) 

144 

145 # datagram protocol interface 

146 

147 def connection_made(self, transport): 

148 self._transport = transport 

149 self._ready_callback() 

150 self._stage = "active" 

151 del self._ready_callback 

152 

153 def datagram_received(self, data, address): 

154 self._message_interface._received_datagram(self, data) 

155 

156 def error_received(self, exception): 

157 self._message_interface._received_exception(self, exception) 

158 

159 def connection_lost(self, exception): 

160 if exception is None: 

161 pass 

162 else: 

163 self._new_error_callback(self, exception) 

164 

165 # whatever it is _DatagramClientSocketpoolSimple6 expects 

166 

167 # ... because generic_udp expects it from _DatagramClientSocketpoolSimple6 

168 def send(self, data): 

169 self._transport.sendto(data, None) 

170 

171 async def shutdown(self): 

172 self._stage = "shutting down" 

173 self._transport.abort() 

174 del self._message_interface 

175 self._stage = "destroyed" 

176 

177 

178class _DatagramClientSocketpoolSimple6: 

179 """This class is used to explore what an Python/asyncio abstraction around 

180 a hypothetical "UDP connections" mechanism could look like. 

181 

182 Assume there were a socket variety that had UDP messages (ie. unreliable, 

183 unordered, boundary-preserving) but that can do an accept() like a TCP 

184 listening socket can, and can create outgoing connection-ish sockets from 

185 the listeing port. 

186 

187 That interface would be usable for all UDP-based CoAP transport 

188 implementations; this particular implementation, due to limitations of 

189 POSIX sockets (and the additional limitations imposed on it like not using 

190 PKTINFO) provides the interface, but only implements the outgoing part, and 

191 will not allow setting the outgoing port or interface.""" 

192 

193 max_sockets = 64 

194 

195 # FIXME (new_message_callback, new_error_callback) should probably rather 

196 # be one object with a defined interface; either that's the 

197 # MessageInterfaceSimple6 and stored accessibly (so the Protocol can know 

198 # which MessageInterface to talk to for sending), or we move the 

199 # MessageInterface out completely and have that object be the Protocol, 

200 # and the Protocol can even send new packages via the address 

201 def __init__(self, loop, mi: "GenericMessageInterface"): 

202 # using an OrderedDict to implement an LRU cache as it's suitable for that purpose according to its documentation 

203 self._sockets: OrderedDict[Tuple[str, int], _Connection] = OrderedDict() 

204 

205 self._loop = loop 

206 self._message_interface = mi 

207 

208 async def _maybe_purge_sockets(self): 

209 while len(self._sockets) >= self.max_sockets: # more of an if 

210 oldaddr, oldest = next(iter(self._sockets.items())) 

211 await oldest.shutdown() 

212 del self._sockets[oldaddr] 

213 

214 async def connect(self, sockaddr): 

215 """Create a new socket with a given remote socket address 

216 

217 Note that the sockaddr does not need to be fully resolved or complete, 

218 as it is not used for matching incoming packages; ('host.example.com', 

219 5683) is perfectly OK (and will create a different outgoing socket that 

220 ('hostalias.example.com', 5683) even if that has the same address, for 

221 better or for worse). 

222 

223 For where the general underlying interface is concerned, it is not yet 

224 fixed at all when this must return identical objects.""" 

225 

226 protocol = self._sockets.get(sockaddr) 

227 if protocol is not None: 

228 self._sockets.move_to_end(sockaddr) 

229 return protocol 

230 

231 await self._maybe_purge_sockets() 

232 

233 ready = asyncio.get_running_loop().create_future() 

234 try: 

235 transport, protocol = await self._loop.create_datagram_endpoint( 

236 lambda: _Connection( 

237 lambda: ready.set_result(None), self._message_interface, sockaddr 

238 ), 

239 remote_addr=sockaddr, 

240 ) 

241 except socket.gaierror as e: 

242 raise error.ResolutionError( 

243 "No address information found for requests to %r" % (sockaddr,) 

244 ) from e 

245 await ready 

246 

247 # # Enable this to easily make every connection to localhost a new one 

248 # # during testing 

249 # import random 

250 # sockaddr = sockaddr + (random.random(),) 

251 

252 # FIXME twice: 1., those never get removed yet (should timeout or 

253 # remove themselves on error), and 2., this is racy against a shutdown right after a connect 

254 self._sockets[sockaddr] = protocol 

255 

256 return protocol 

257 

258 async def shutdown(self): 

259 # preventing the creation of new sockets early on, and generally 

260 # breaking cycles 

261 del self._message_interface 

262 

263 if self._sockets: 

264 done, pending = await asyncio.wait( 

265 [ 

266 asyncio.create_task( 

267 s.shutdown(), 

268 name="Socket shutdown of %r" % s, 

269 ) 

270 for s in self._sockets.values() 

271 ] 

272 ) 

273 for item in done: 

274 await item 

275 del self._sockets 

276 

277 

278class MessageInterfaceSimple6(GenericMessageInterface): 

279 @classmethod 

280 async def create_client_transport_endpoint(cls, ctx, log, loop): 

281 self = cls(ctx, log, loop) 

282 

283 # Cyclic reference broken during shutdown 

284 self._pool = _DatagramClientSocketpoolSimple6(self._loop, self) 

285 return self 

286 

287 async def recognize_remote(self, remote): 

288 return isinstance(remote, _Connection) and remote in self._pool._sockets