Coverage for aiocoap/transports/udp6.py: 77%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

258 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9"""This module implements a MessageInterface for UDP based on a variation of 

10the asyncio DatagramProtocol. 

11 

12This implementation strives to be correct and complete behavior while still 

13only using a single socket; that is, to be usable for all kinds of multicast 

14traffic, to support server and client behavior at the same time, and to work 

15correctly even when multiple IPv6 and IPv4 (using V4MAPPED addresses) 

16interfaces are present, and any of the interfaces has multiple addresses. 

17 

18This requires using some standardized but not necessarily widely ported 

19features: ``AI_V4MAPPED`` to support IPv4 without resorting to less 

20standardized mechanisms for later options, ``IPV6_RECVPKTINFO`` to determine 

21incoming packages' destination addresses (was it multicast) and to return 

22packages from the same address, ``IPV6_JOIN_GROUP`` for multicast 

23membership management and ``recvmsg`` to obtain data configured with the above 

24options. 

25 

26To the author's knowledge, there is no standardized mechanism for receiving 

27ICMP errors in such a setup. On Linux, ``IPV6_RECVERR`` and ``MSG_ERRQUEUE`` 

28are used to receive ICMP errors from the socket; on other platforms, a warning 

29is emitted that ICMP errors are ignored. Using a :mod:`.simple6` for clients is 

30recommended for those when working as a client only. 

31 

32Exceeding for the above error handling, no attempts are made to fall back to a 

33kind-of-correct or limited-functionality behavior if these options are 

34unavailable, for the resulting code would be hard to maintain ("``ifdef`` 

35hell") or would cause odd bugs at users (eg. servers that stop working when an 

36additional IPv6 address gets assigned). If the module does not work for you, 

37and the options can not be added easily to your platform, consider using the 

38:mod:`.simple6` module instead. 

39""" 

40 

41import asyncio 

42import socket 

43import ipaddress 

44import struct 

45import weakref 

46from collections import namedtuple 

47 

48from ..message import Message 

49from ..numbers import constants 

50from .. import defaults 

51from .. import error 

52from .. import interfaces 

53from ..numbers import COAP_PORT 

54from ..util.asyncio.recvmsg import RecvmsgDatagramProtocol, create_recvmsg_datagram_endpoint 

55from ..util import hostportjoin, hostportsplit 

56from ..util import socknumbers 

57 

58"""The `struct in6_pktinfo` from RFC3542""" 

59_in6_pktinfo = struct.Struct("16sI") 

60 

61_ipv6_unspecified = socket.inet_pton(socket.AF_INET6, '::') 

62_ipv4_unspecified = socket.inet_pton(socket.AF_INET6, '::ffff:0.0.0.0') 

63 

64class InterfaceOnlyPktinfo(bytes): 

65 """A thin wrapper over bytes that represent a pktinfo built just to select 

66 an outgoing interface. 

67 

68 This must not be treated any different than a regular pktinfo, and is just 

69 tagged for better debug output. (Ie. if this is replaced everywhere with 

70 plain `bytes`, things must still work).""" 

71 

72class UDP6EndpointAddress(interfaces.EndpointAddress): 

73 """Remote address type for :cls:`MessageInterfaceUDP6`. Remote address is 

74 stored in form of a socket address; local address can be roundtripped by 

75 opaque pktinfo data. 

76 

77 For purposes of equality (and thus hashing), the local address is *not* 

78 checked. Neither is the scopeid that is part of the socket address. 

79 

80 >>> interface = type("FakeMessageInterface", (), {}) 

81 >>> if1_name = socket.if_indextoname(1) 

82 >>> local = UDP6EndpointAddress(socket.getaddrinfo('127.0.0.1', 5683, type=socket.SOCK_DGRAM, family=socket.AF_INET6, flags=socket.AI_V4MAPPED)[0][-1], interface) 

83 >>> local.is_multicast 

84 False 

85 >>> local.hostinfo 

86 '127.0.0.1' 

87 >>> all_coap_link1 = UDP6EndpointAddress(socket.getaddrinfo('ff02:0:0:0:0:0:0:fd%1', 1234, type=socket.SOCK_DGRAM, family=socket.AF_INET6)[0][-1], interface) 

88 >>> all_coap_link1.is_multicast 

89 True 

90 >>> all_coap_link1.hostinfo == '[ff02::fd%{}]:1234'.format(if1_name) 

91 True 

92 >>> all_coap_site = UDP6EndpointAddress(socket.getaddrinfo('ff05:0:0:0:0:0:0:fd', 1234, type=socket.SOCK_DGRAM, family=socket.AF_INET6)[0][-1], interface) 

93 >>> all_coap_site.is_multicast 

94 True 

95 >>> all_coap_site.hostinfo 

96 '[ff05::fd]:1234' 

97 >>> all_coap4 = UDP6EndpointAddress(socket.getaddrinfo('224.0.1.187', 5683, type=socket.SOCK_DGRAM, family=socket.AF_INET6, flags=socket.AI_V4MAPPED)[0][-1], interface) 

98 >>> all_coap4.is_multicast 

99 True 

100 """ 

101 

102 def __init__(self, sockaddr, interface, *, pktinfo=None): 

103 self.sockaddr = sockaddr 

104 self.pktinfo = pktinfo 

105 self._interface = weakref.ref(interface) 

106 

107 scheme = 'coap' 

108 

109 interface = property(lambda self: self._interface()) 

110 

111 def __hash__(self): 

112 return hash(self.sockaddr[:-1]) 

113 

114 def __eq__(self, other): 

115 return self.sockaddr[:-1] == other.sockaddr[:-1] 

116 

117 def __repr__(self): 

118 return "<%s %s%s>"%(type(self).__name__, self.hostinfo, " (locally %s)" % self._repr_pktinfo() if self.pktinfo is not None else "") 

119 

120 @staticmethod 

121 def _strip_v4mapped(address): 

122 """Turn anything that's a valid input to ipaddress.IPv6Address into a 

123 user-friendly string that's either an IPv6 or an IPv4 address. 

124 

125 This also compresses (normalizes) the IPv6 address as a convenient side 

126 effect.""" 

127 address = ipaddress.IPv6Address(address) 

128 mapped = address.ipv4_mapped 

129 if mapped is not None: 

130 return str(mapped) 

131 return str(address) 

132 

133 def _plainaddress(self): 

134 """Return the IP adress part of the sockaddr in IPv4 notation if it is 

135 mapped, otherwise the plain v6 address including the interface 

136 identifier if set.""" 

137 

138 if self.sockaddr[3] != 0: 

139 scopepart = "%" + socket.if_indextoname(self.sockaddr[3]) 

140 else: 

141 scopepart = "" 

142 if '%' in self.sockaddr[0]: 

143 # Fix for Python 3.6 and earlier that reported the scope information 

144 # in the IP literal (3.7 consistently expresses it in the tuple slot 3) 

145 scopepart = "" 

146 return self._strip_v4mapped(self.sockaddr[0]) + scopepart 

147 

148 def _repr_pktinfo(self): 

149 """What repr(self.pktinfo) would be if that were not a plain untyped bytestring""" 

150 addr, interface = _in6_pktinfo.unpack_from(self.pktinfo) 

151 if interface == 0: 

152 interface = "" 

153 else: 

154 try: 

155 interface = "%" + socket.if_indextoname(interface) 

156 except Exception as e: 

157 interface = "%%%d(%s)" % (interface, e) 

158 

159 return "%s%s" % (self._strip_v4mapped(addr), interface) 

160 

161 def _plainaddress_local(self): 

162 """Like _plainaddress, but on the address in the pktinfo. Unlike 

163 _plainaddress, this does not contain the interface identifier.""" 

164 

165 addr, interface = _in6_pktinfo.unpack_from(self.pktinfo) 

166 

167 return self._strip_v4mapped(addr) 

168 

169 @property 

170 def netif(self): 

171 """Textual interface identifier of the explicitly configured remote 

172 interface, or the interface identifier reported in an incoming 

173 link-local message. None if not set.""" 

174 index = self.sockaddr[3] 

175 return socket.if_indextoname(index) if index else None 

176 

177 @property 

178 def hostinfo(self): 

179 port = self.sockaddr[1] 

180 if port == COAP_PORT: 

181 port = None 

182 

183 # plainaddress: don't assume other applications can deal with v4mapped addresses 

184 return hostportjoin(self._plainaddress(), port) 

185 

186 @property 

187 def hostinfo_local(self): 

188 host = self._plainaddress_local() 

189 port = self.interface._local_port() 

190 if port == 0: 

191 raise ValueError("Local port read before socket has bound itself") 

192 if port == COAP_PORT: 

193 port = None 

194 return hostportjoin(host, port) 

195 

196 @property 

197 def uri_base(self): 

198 return 'coap://' + self.hostinfo 

199 

200 @property 

201 def uri_base_local(self): 

202 return 'coap://' + self.hostinfo_local 

203 

204 @property 

205 def is_multicast(self): 

206 return ipaddress.ip_address(self._plainaddress().split('%', 1)[0]).is_multicast 

207 

208 @property 

209 def is_multicast_locally(self): 

210 return ipaddress.ip_address(self._plainaddress_local()).is_multicast 

211 

212 def as_response_address(self): 

213 if not self.is_multicast_locally: 

214 return self 

215 

216 # Create a copy without pktinfo, as responses to messages received to 

217 # multicast addresses can not have their request's destination address 

218 # as source address 

219 return type(self)(self.sockaddr, self.interface) 

220 

221 

222class SockExtendedErr(namedtuple("_SockExtendedErr", "ee_errno ee_origin ee_type ee_code ee_pad ee_info ee_data")): 

223 _struct = struct.Struct("IbbbbII") 

224 @classmethod 

225 def load(cls, data): 

226 # unpack_from: recvmsg(2) says that more data may follow 

227 return cls(*cls._struct.unpack_from(data)) 

228 

229class MessageInterfaceUDP6(RecvmsgDatagramProtocol, interfaces.MessageInterface): 

230 def __init__(self, ctx: interfaces.MessageManager, log, loop): 

231 self._ctx = ctx 

232 self.log = log 

233 self.loop = loop 

234 

235 self._shutting_down = None #: Future created and used in the .shutdown() method. 

236 

237 self.ready = asyncio.get_running_loop().create_future() #: Future that gets fullfilled by connection_made (ie. don't send before this is done; handled by ``create_..._context`` 

238 

239 def _local_port(self): 

240 # FIXME: either raise an error if this is 0, or send a message to self 

241 # to force the OS to decide on a port. Right now, this reports wrong 

242 # results while the first message has not been sent yet. 

243 return self.transport.get_extra_info('socket').getsockname()[1] 

244 

245 @classmethod 

246 async def _create_transport_endpoint(cls, sock, ctx: interfaces.MessageManager, log, loop, multicast=[]): 

247 try: 

248 sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVPKTINFO, 1) 

249 except NameError: 

250 raise RuntimeError("RFC3542 PKTINFO flags are unavailable, unable to create a udp6 transport.") 

251 if socknumbers.HAS_RECVERR: 

252 sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVERR, 1) 

253 # i'm curious why this is required; didn't IPV6_V6ONLY=0 already make 

254 # it clear that i don't care about the ip version as long as everything looks the same? 

255 sock.setsockopt(socket.IPPROTO_IP, socknumbers.IP_RECVERR, 1) 

256 else: 

257 log.warning("Transport udp6 set up on platform without RECVERR capability. ICMP errors will be ignored.") 

258 

259 for (address, interface) in sum(map( 

260 # Expand shortcut of "interface name means default CoAP all-nodes addresses" 

261 lambda i: [(a, i) for a in constants.MCAST_ALL] if isinstance(i, str) else [i], 

262 multicast 

263 ), []): 

264 address = ipaddress.ip_address(address) 

265 interface = socket.if_nametoindex(interface) 

266 

267 if isinstance(address, ipaddress.IPv4Address): 

268 s = struct.pack('4s4si', 

269 address.packed, 

270 socket.inet_aton("0.0.0.0"), interface) 

271 try: 

272 sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, s) 

273 except OSError: 

274 log.warning("Could not join IPv4 multicast group") 

275 

276 elif isinstance(address, ipaddress.IPv6Address): 

277 s = struct.pack('16si', 

278 address.packed, 

279 interface) 

280 try: 

281 sock.setsockopt(socket.IPPROTO_IPV6, 

282 socket.IPV6_JOIN_GROUP, s) 

283 except OSError: 

284 log.warning("Could not join IPv6 multicast group") 

285 

286 else: 

287 raise RuntimeError("Unknown address format") 

288 

289 transport, protocol = await create_recvmsg_datagram_endpoint(loop, 

290 lambda: cls(ctx, log=log, loop=loop), 

291 sock=sock) 

292 

293 await protocol.ready 

294 

295 return protocol 

296 

297 @classmethod 

298 async def create_client_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop): 

299 sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM) 

300 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) 

301 

302 return await cls._create_transport_endpoint(sock, ctx, log, loop) 

303 

304 @classmethod 

305 async def create_server_transport_endpoint(cls, ctx: interfaces.MessageManager, log, loop, bind, multicast): 

306 bind = bind or ('::', None) 

307 # Interpret None as 'default port', but still allow to bind to 0 for 

308 # servers that want a random port (eg. when the service URLs are 

309 # advertised out-of-band anyway, or in LwM2M clients) 

310 bind = (bind[0], COAP_PORT if bind[1] is None else bind[1]) 

311 

312 # The later bind() does most of what getaddr info usually does 

313 # (including resolving names), but is missing out subtly: It does not 

314 # populate the zone identifier of an IPv6 address, making it impossible 

315 # without a getaddrinfo (or manual mapping of the name to a number) to 

316 # bind to a specific link-local interface 

317 try: 

318 bind = await loop.getaddrinfo( 

319 bind[0], 

320 bind[1], 

321 family=socket.AF_INET6, 

322 type=socket.SOCK_DGRAM, 

323 flags=socket.AI_V4MAPPED, 

324 ) 

325 except socket.gaierror: 

326 raise error.ResolutionError("No local bindable address found for %s" % bind[0]) 

327 assert bind, "getaddrinfo returned zero-length list rather than erring out" 

328 (*_, bind), *additional = bind 

329 if additional: 

330 log.warning("Multiple addresses to bind to, ") 

331 

332 sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM) 

333 if defaults.has_reuse_port(): 

334 # I doubt that there is any platform that supports RECVPKTINFO but 

335 # not REUSEPORT, but why take chances. 

336 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

337 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) 

338 sock.bind(bind) 

339 

340 return (await cls._create_transport_endpoint(sock, ctx, log, loop, multicast)) 

341 

342 async def shutdown(self): 

343 self._shutting_down = asyncio.get_running_loop().create_future() 

344 

345 self.transport.close() 

346 

347 await self._shutting_down 

348 

349 del self._ctx 

350 

351 def send(self, message): 

352 ancdata = [] 

353 if message.remote.pktinfo is not None: 

354 ancdata.append((socket.IPPROTO_IPV6, socknumbers.IPV6_PKTINFO, 

355 message.remote.pktinfo)) 

356 self.transport.sendmsg(message.encode(), ancdata, 0, message.remote.sockaddr) 

357 

358 async def recognize_remote(self, remote): 

359 return isinstance(remote, UDP6EndpointAddress) and \ 

360 remote.interface == self 

361 

362 async def determine_remote(self, request): 

363 if request.requested_scheme not in ('coap', None): 

364 return None 

365 

366 if request.unresolved_remote is not None: 

367 host, port = hostportsplit(request.unresolved_remote) 

368 port = port or COAP_PORT 

369 elif request.opt.uri_host: 

370 host = request.opt.uri_host 

371 if host.startswith('[') and host.endswith(']'): 

372 host = host[1:-1] 

373 port = request.opt.uri_port or COAP_PORT 

374 else: 

375 raise ValueError("No location found to send message to (neither in .opt.uri_host nor in .remote)") 

376 

377 # Take aside the zone identifier. While it can pass through getaddrinfo 

378 # in some situations (eg. 'fe80::1234%eth0' will give 'fe80::1234' 

379 # scope eth0, and similar for ff02:: addresses), in others (eg. ff05::) 

380 # it gives 'Name or service not known'. 

381 

382 if '%' in host: 

383 host, zone = host.split('%', 1) 

384 try: 

385 zone = socket.if_nametoindex(zone) 

386 except OSError: 

387 raise error.ResolutionError("Invalid zone identifier %s" % zone) 

388 else: 

389 zone = None 

390 

391 try: 

392 own_sock = self.transport.get_extra_info('socket') 

393 addrinfo = await self.loop.getaddrinfo( 

394 host, 

395 port, 

396 family=own_sock.family, 

397 type=0, # Not setting the sock's proto as that fails up to 

398 # Python 3.6; setting that would make debugging around 

399 # here less confusing but otherwise has no effect 

400 # (unless maybe very exotic protocols show up). 

401 proto=own_sock.proto, 

402 flags=socket.AI_V4MAPPED, 

403 ) 

404 except socket.gaierror: 

405 raise error.ResolutionError("No address information found for requests to %r" % host) 

406 

407 # TODO this is very rudimentary; happy-eyeballs or 

408 # similar could be employed. 

409 

410 sockaddr = addrinfo[0][-1] 

411 

412 ip, port, flowinfo, scopeid = sockaddr 

413 

414 if zone is not None: 

415 # Still trying to preserve the information returned (libc can't do 

416 # it as described at 

417 # <https://unix.stackexchange.com/questions/174767/ipv6-zone-id-in-etc-hosts>) 

418 # in case something sane does come out of that. 

419 if scopeid != 0 and scopeid != zone: 

420 self.log.warning("Resolved address of %s came with zone ID %d whereas explicit ID %d takes precedence", host, scopeid, zone) 

421 scopeid = zone 

422 

423 # We could be done here and return UDP6EndpointAddress(the reassembled 

424 # sockaddr, self), but: 

425 # 

426 # Linux (unlike FreeBSD) takes the sockaddr's scope ID only for 

427 # link-local scopes (as per ipv6(7), and discards it otherwise. It does 

428 # need the information of the selected interface, though, in order to 

429 # pick the right outgoing interface. Thus, we provide it in the local 

430 # portion. 

431 

432 if scopeid: 

433 # "Any" does not include "even be it IPv4" -- the underlying family 

434 # unfortunately needs to be set, or Linux will refuse to send. 

435 if ipaddress.IPv6Address(ip).ipv4_mapped is None: 

436 local_source = _ipv6_unspecified 

437 else: 

438 local_source = _ipv4_unspecified 

439 local = InterfaceOnlyPktinfo(_in6_pktinfo.pack(local_source, scopeid)) 

440 else: 

441 local = None 

442 

443 sockaddr = ip, port, flowinfo, scopeid 

444 return UDP6EndpointAddress(sockaddr, self, pktinfo=local) 

445 

446 # 

447 # implementing the typical DatagramProtocol interfaces. 

448 # 

449 # note from the documentation: we may rely on connection_made to be called 

450 # before datagram_received -- but sending immediately after context 

451 # creation will still fail 

452 

453 def connection_made(self, transport): 

454 """Implementation of the DatagramProtocol interface, called by the transport.""" 

455 self.ready.set_result(True) 

456 self.transport = transport 

457 

458 def datagram_msg_received(self, data, ancdata, flags, address): 

459 """Implementation of the RecvmsgDatagramProtocol interface, called by the transport.""" 

460 pktinfo = None 

461 for cmsg_level, cmsg_type, cmsg_data in ancdata: 

462 if cmsg_level == socket.IPPROTO_IPV6 and cmsg_type == socknumbers.IPV6_PKTINFO: 

463 pktinfo = cmsg_data 

464 else: 

465 self.log.info("Received unexpected ancillary data to recvmsg: level %d, type %d, data %r", cmsg_level, cmsg_type, cmsg_data) 

466 if pktinfo is None: 

467 self.log.warning("Did not receive requested pktinfo ancdata on message from %s", address) 

468 try: 

469 message = Message.decode(data, UDP6EndpointAddress(address, self, pktinfo=pktinfo)) 

470 except error.UnparsableMessage: 

471 self.log.warning("Ignoring unparsable message from %s", address) 

472 return 

473 

474 try: 

475 self._ctx.dispatch_message(message) 

476 except BaseException as exc: 

477 # Catching here because util.asyncio.recvmsg inherits 

478 # _SelectorDatagramTransport's bad handling of callback errors; 

479 # this is the last time we have a log at hand. 

480 self.log.error("Exception raised through dispatch_message: %s", exc, exc_info=exc) 

481 raise 

482 

483 def datagram_errqueue_received(self, data, ancdata, flags, address): 

484 assert flags == socknumbers.MSG_ERRQUEUE 

485 pktinfo = None 

486 errno = None 

487 for cmsg_level, cmsg_type, cmsg_data in ancdata: 

488 assert cmsg_level == socket.IPPROTO_IPV6 

489 if cmsg_type == socknumbers.IPV6_RECVERR: 

490 extended_err = SockExtendedErr.load(cmsg_data) 

491 self.log.debug("Socket error recevied, details: %s", extended_err) 

492 errno = extended_err.ee_errno 

493 elif cmsg_level == socket.IPPROTO_IPV6 and cmsg_type == socknumbers.IPV6_PKTINFO: 

494 pktinfo = cmsg_data 

495 else: 

496 self.log.info("Received unexpected ancillary data to recvmsg errqueue: level %d, type %d, data %r", cmsg_level, cmsg_type, cmsg_data) 

497 remote = UDP6EndpointAddress(address, self, pktinfo=pktinfo) 

498 

499 # not trying to decode a message from data -- that works for 

500 # "connection refused", doesn't work for "no route to host", and 

501 # anyway, when an icmp error comes back, everything pending from that 

502 # port should err out. 

503 

504 try: 

505 self._ctx.dispatch_error(OSError(errno, "received through errqueue"), remote) 

506 except BaseException as exc: 

507 # Catching here because util.asyncio.recvmsg inherits 

508 # _SelectorDatagramTransport's bad handling of callback errors; 

509 # this is the last time we have a log at hand. 

510 self.log.error("Exception raised through dispatch_error: %s", exc, exc_info=exc) 

511 raise 

512 

513 def error_received(self, exc): 

514 """Implementation of the DatagramProtocol interface, called by the transport.""" 

515 # TODO: what can we do about errors we *only* receive here? (eg. sending to 127.0.0.0) 

516 self.log.error("Error received and ignored in this codepath: %s", exc) 

517 

518 def connection_lost(self, exc): 

519 # TODO better error handling -- find out what can cause this at all 

520 # except for a shutdown 

521 if exc is not None: 

522 self.log.error("Connection lost: %s", exc) 

523 

524 if self._shutting_down is None: 

525 self.log.error("Connection loss was not expected.") 

526 else: 

527 self._shutting_down.set_result(None)