Coverage for aiocoap/transports/simple6.py: 89%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

88 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9"""This module implements a MessageInterface for UDP based on the asyncio 

10DatagramProtocol. 

11 

12This is a simple version that works only for clients (by creating a dedicated 

13unbound but connected socket for each communication partner) and probably not 

14with multicast (it is assumed to be unsafe for multicast), which can be 

15expected to work even on platforms where the :mod:`.udp6` module can not be 

16made to work (Android, OSX, Windows for missing ``recvmsg`` and socket options, 

17or any event loops that don't have an add_reader method). 

18 

19Note that the name of the module is a misnomer (and the module is likely to be 

20renamed): Nothing in it is IPv6 specific; the socket is created using whichever 

21address family the OS chooses based on the given host name. 

22 

23One small but noteworthy detail about this transport is that it does not 

24distinguish between IP literals and host names. As a result, requests and 

25responses from remotes will appear to arrive from a remote whose netloc is the 

26requested name, not an IP literal. 

27 

28This transport is experimental, likely to change, and not fully tested yet 

29(because the test suite is not yet ready to matrix-test the same tests with 

30different transport implementations, and because it still fails in proxy 

31blockwise tests). 

32 

33For one particular use case, this may be usable for servers in a sense: If (and 

34only if) all incoming requests are only ever sent from clients that were 

35previously addressed as servers by the running instance. (This is generally 

36undesirable as it greatly limits the usefulness of the server, but is used in 

37LwM2M setups). As such a setup makes demands on the peer that are not justified 

38by the CoAP specification (in particular, that it send requests from a 

39particular port), this should still only be used for cases where the udp6 

40transport is unavailable due to platform limitations. 

41""" 

42 

43import asyncio 

44from collections import OrderedDict 

45 

46from aiocoap import interfaces 

47from aiocoap import COAP_PORT 

48from ..util import hostportjoin 

49from ..util.asyncio import py38args 

50from .generic_udp import GenericMessageInterface 

51 

52class _Connection(asyncio.DatagramProtocol, interfaces.EndpointAddress): 

53 def __init__(self, ready_callback, message_interface: "GenericMessageInterface", stored_sockaddr): 

54 self._ready_callback = ready_callback 

55 self._message_interface = message_interface 

56 

57 # This gets stored in the _Connection because not all implementations 

58 # of datagram transports will expose the get_extra_info('socket') 

59 # (right now, all I knew do), or their backend might not be a connected 

60 # socket (like in uvloop), so the information can't be just obtained 

61 # from the transport, but is needed to implement .hostinfo 

62 # 

63 # If _Connections become used in other contexts (eg. tinydtls starts 

64 # using them), it might be a good idea to move all this into a subclass 

65 # and split it from the pure networking stuff. 

66 self.hostinfo = hostportjoin(stored_sockaddr[0], None if stored_sockaddr[1] == COAP_PORT else stored_sockaddr[1]) 

67 

68 self._stage = "initializing" #: Status property purely for debugging 

69 

70 def __repr__(self): 

71 return "<%s at %#x on transport %s, %s>" % ( 

72 type(self).__name__, 

73 id(self), 

74 getattr(self, "_transport", "(none)"), 

75 self._stage) 

76 

77 # address interface 

78 

79 is_multicast = False 

80 

81 is_multicast_locally = False 

82 

83 scheme = 'coap' 

84 

85 # statically initialized in init 

86 hostinfo = None 

87 uri_base = None 

88 uri_base = property(lambda self: 'coap://' + self.hostinfo) 

89 

90 @property 

91 def hostinfo_local(self): 

92 # This can only be done on a best-effort base here. Unlike the below 

93 # hostinfo (see comments there), there is no easy way around this, so 

94 # if there are still implementations out that don't do the extras, 

95 # that's it and the calling site should reconsider whether they need 

96 # something that can not be determined. (Some more effort could go into 

97 # falling back to get_extra_info('socket').getsockname(), but that 

98 # should really be fixed in the transport provider). 

99 if not hasattr(self, '_transport'): 

100 raise RuntimeError("Simple6 does not have defined local host info in current stage %s" % self._stage) 

101 sockname = self._transport.get_extra_info('sockname') 

102 if sockname is None: 

103 raise RuntimeError("Simple6 can not determine local address from the underlying UDP implementation") 

104 return hostportjoin(*sockname[:2]) 

105 uri_base_local = property(lambda self: 'coap://' + self.hostinfo_local) 

106 

107# fully disabled because some implementations of asyncio don't make the 

108# information available; going the easy route and storing it for all (see 

109# attribute population in __init__) 

110 

111# # FIXME continued: probably this is, and the above is not (or should not be) 

112# @property 

113# def hostinfo(self): 

114# print("ACCESSING HOSTINFO") 

115# host, port = self._transport.get_extra_info('socket').getpeername()[:2] 

116# if port == COAP_PORT: 

117# port = None 

118# # FIXME this should use some of the _plainaddress mechanisms of the udp6 addresses 

119# return hostportjoin(host, port) 

120 

121 # datagram protocol interface 

122 

123 def connection_made(self, transport): 

124 self._transport = transport 

125 self._ready_callback() 

126 self._stage = "active" 

127 del self._ready_callback 

128 

129 def datagram_received(self, data, address): 

130 self._message_interface._received_datagram(self, data) 

131 

132 def error_received(self, exception): 

133 self._message_interface._received_exception(self, exception) 

134 

135 def connection_lost(self, exception): 

136 if exception is None: 

137 pass 

138 else: 

139 self._new_error_callback(self, exception) 

140 

141 # whatever it is _DatagramClientSocketpoolSimple6 expects 

142 

143 # ... because generic_udp expects it from _DatagramClientSocketpoolSimple6 

144 def send(self, data): 

145 self._transport.sendto(data, None) 

146 

147 async def shutdown(self): 

148 self._stage = "shutting down" 

149 self._transport.abort() 

150 del self._message_interface 

151 self._stage = "destroyed" 

152 

153class _DatagramClientSocketpoolSimple6: 

154 """This class is used to explore what an Python/asyncio abstraction around 

155 a hypothetical "UDP connections" mechanism could look like. 

156 

157 Assume there were a socket variety that had UDP messages (ie. unreliable, 

158 unordered, boundary-preserving) but that can do an accept() like a TCP 

159 listening socket can, and can create outgoing connection-ish sockets from 

160 the listeing port. 

161 

162 That interface would be usable for all UDP-based CoAP transport 

163 implementations; this particular implementation, due to limitations of 

164 POSIX sockets (and the additional limitations imposed on it like not using 

165 PKTINFO) provides the interface, but only implements the outgoing part, and 

166 will not allow setting the outgoing port or interface.""" 

167 

168 max_sockets = 64 

169 

170 # FIXME (new_message_callback, new_error_callback) should probably rather 

171 # be one object with a defined interface; either that's the 

172 # MessageInterfaceSimple6 and stored accessibly (so the Protocol can know 

173 # which MessageInterface to talk to for sending), or we move the 

174 # MessageInterface out completely and have that object be the Protocol, 

175 # and the Protocol can even send new packages via the address 

176 def __init__(self, loop, mi: "GenericMessageInterface"): 

177 # using an OrderedDict to implement an LRU cache as it's suitable for that purpose according to its documentation 

178 self._sockets = OrderedDict() 

179 

180 self._loop = loop 

181 self._message_interface = mi 

182 

183 async def _maybe_purge_sockets(self): 

184 while len(self._sockets) >= self.max_sockets: # more of an if 

185 oldaddr, oldest = next(iter(self._sockets.items())) 

186 await oldest.shutdown() 

187 del self._sockets[oldaddr] 

188 

189 async def connect(self, sockaddr): 

190 """Create a new socket with a given remote socket address 

191 

192 Note that the sockaddr does not need to be fully resolved or complete, 

193 as it is not used for matching incoming packages; ('host.example.com', 

194 5683) is perfectly OK (and will create a different outgoing socket that 

195 ('hostalias.example.com', 5683) even if that has the same address, for 

196 better or for worse). 

197 

198 For where the general underlying interface is concerned, it is not yet 

199 fixed at all when this must return identical objects.""" 

200 

201 protocol = self._sockets.get(sockaddr) 

202 if protocol is not None: 

203 self._sockets.move_to_end(sockaddr) 

204 return protocol 

205 

206 await self._maybe_purge_sockets() 

207 

208 ready = asyncio.get_running_loop().create_future() 

209 transport, protocol = await self._loop.create_datagram_endpoint( 

210 lambda: _Connection(lambda: ready.set_result(None), self._message_interface, sockaddr), 

211 remote_addr=sockaddr) 

212 await ready 

213 

214# # Enable this to easily make every connection to localhost a new one 

215# # during testing 

216# import random 

217# sockaddr = sockaddr + (random.random(),) 

218 

219 # FIXME twice: 1., those never get removed yet (should timeout or 

220 # remove themselves on error), and 2., this is racy against a shutdown right after a connect 

221 self._sockets[sockaddr] = protocol 

222 

223 return protocol 

224 

225 async def shutdown(self): 

226 # preventing the creation of new sockets early on, and generally 

227 # breaking cycles 

228 del self._message_interface 

229 

230 if self._sockets: 

231 done, pending = await asyncio.wait([ 

232 asyncio.create_task( 

233 s.shutdown(), 

234 **py38args(name="Socket shutdown of %r" % s) 

235 ) 

236 for s 

237 in self._sockets.values()]) 

238 for item in done: 

239 await item 

240 del self._sockets 

241 

242class MessageInterfaceSimple6(GenericMessageInterface): 

243 @classmethod 

244 async def create_client_transport_endpoint(cls, ctx, log, loop): 

245 self = cls(ctx, log, loop) 

246 

247 # Cyclic reference broken during shutdown 

248 self._pool = _DatagramClientSocketpoolSimple6(self._loop, self) 

249 return self 

250 

251 async def recognize_remote(self, remote): 

252 return isinstance(remote, _Connection) and remote in self._pool._sockets