Coverage for aiocoap/transports/ws.py: 88%

196 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-16 16:09 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5""" 

6This moduel implements a TokenInterface for `CoAP over WebSockets`_. 

7 

8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4 

9 

10As with CoAP-over-TCP, while the transport distinguishes a connection initiator 

11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"), 

12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP 

13client). As the WebSocket client can not possibly be connected to (even by the 

14same server -- once the connection is closed, it's gone and even a new one 

15likely has a different port), aiocoap does not allow expressing their addresses 

16in URIs (given they wouldn't serve their purpose as URLs and don't provide any 

17stability either). Requests to a CoAP-over-WS client can be made by assigning 

18the remote to an outgoing request. 

19 

20Port choice 

21----------- 

22 

23Unlike the other transports, CoAP-over-WS is specified with a privileged port 

24(port 80) as the default port. This is impractical for aiocoap servers for two 

25reasons: 

26 

27 * Unless explicitly configured, aiocoap is typically run as an unprivileged 

28 user (and has no provisions in place to receive a socket by other means 

29 than opening it). 

30 

31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website 

32 running on the same port on a full HTTP server. That server is usually 

33 capable of forwarding requests, whereas the ``websockets`` module used by 

34 aiocoap is in no position to either serve websites nor to proxy to an 

35 underlying server. 

36 

37The recommended setup is therefore to run a full web server at port 80, and 

38configure it to proxy incoming requests for WebSockets at `/.well-known/coap` 

39to aiocoap's server, which defaults to binding to port 8683. 

40 

41The port choice of outgoing connections, or the interpretation of the 

42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of 

43course unaffected by this. 

44 

45.. warning:: 

46 

47 Due to a shortcoming of aiocoap's way of specifying ports to bind 

48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that 

49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server 

50 keys are given, the TLS server is launched on the next port after the HTTP 

51 server (typically 8684). 

52 

53Using on pyodide_ 

54----------------- 

55 

56When use on pyodide_, 

57instead of using the ``websockets`` module, 

58a simplified client-only back-end is used, 

59which utilizes the browser's WebSocket API. 

60 

61.. _pyodide: https://pyodide.org/ 

62""" 

63 

64from __future__ import annotations 

65 

66from typing import Dict, List 

67from collections import namedtuple 

68import asyncio 

69import functools 

70import http 

71import weakref 

72 

73from aiocoap import Message, interfaces, ABORT, util, error 

74from aiocoap.transports import rfc8323common 

75from ..util.asyncio import py38args 

76from ..defaults import is_pyodide 

77 

78if is_pyodide: 

79 import aiocoap.util.pyodide_websockets as websockets 

80else: 

81 import websockets 

82 

83def _decode_message(data: bytes) -> Message: 

84 codeoffset = 1 

85 tokenoffset = 2 

86 

87 tkl = data[0] 

88 if tkl > 8: 

89 raise error.UnparsableMessage("Overly long token") 

90 code = data[codeoffset] 

91 token = data[tokenoffset:tokenoffset + tkl] 

92 

93 msg = Message(code=code, token=token) 

94 

95 msg.payload = msg.opt.decode(data[tokenoffset + tkl:]) 

96 

97 return msg 

98 

99def _serialize(msg: Message) -> bytes: 

100 tkl = len(msg.token) 

101 if tkl > 8: 

102 raise ValueError("Overly long token") 

103 

104 data = [ 

105 bytes((tkl, msg.code,)), 

106 msg.token, 

107 msg.opt.encode(), 

108 ] 

109 if msg.payload: 

110 data += [b'\xff', msg.payload] 

111 

112 return b"".join(data) 

113 

114PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo")) 

115 

116class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress): 

117 _connection: websockets.WebSocketCommonProtocol 

118 # Only used to ensure that remotes are associated to the right pool -- not 

119 # that there'd be any good reason to have multiple of those. 

120 _pool: weakref.ReferenceType[WSPool] 

121 

122 scheme = None # Override property -- it's per instance here 

123 

124 def __init__(self, pool, connection, loop, log, *, scheme, local_hostinfo=None, remote_hostinfo=None): 

125 super().__init__() 

126 self._pool = weakref.ref(pool) 

127 self._connection = connection 

128 self.loop = loop 

129 self.log = log 

130 

131 self._local_is_server = isinstance(connection, websockets.WebSocketServerProtocol) 

132 

133 if local_hostinfo is None: 

134 self._local_hostinfo = self._connection.local_address[:2] 

135 else: 

136 self._local_hostinfo = local_hostinfo 

137 if remote_hostinfo is None: 

138 self._remote_hostinfo = self._connection.remote_address[:2] 

139 else: 

140 self._remote_hostinfo = remote_hostinfo 

141 

142 self.scheme = scheme 

143 

144 # Goes both for client and for server ends; on the server end, it 

145 # ensures that role reversal URIs can be used even when passed as URIs 

146 # and not as remotes (although that's of course only possible locally). 

147 self._poolkey = PoolKey(self.scheme, self.hostinfo) 

148 

149 # Necessary for RFC8323Remote 

150 

151 def _abort_with(self, msg, *, close_code=1002): 

152 # Like _send_message, this may take actual time -- but unlike there, 

153 # there's no need to regulate back-pressure 

154 self.loop.create_task( 

155 self._abort_with_waiting(msg, close_code=close_code), 

156 **py38args(name="Abortion WebSocket sonnection with %r" % msg) 

157 ) 

158 

159 # Unlike _send_message, this is pulled out of the the _abort_with function 

160 # as it's also used in _run_recv_loop 

161 async def _abort_with_waiting(self, msg, *, close_code): 

162 self.log.debug("Aborting with message: %r", msg) 

163 try: 

164 await self._connection.send(_serialize(msg)) 

165 except Exception as e: 

166 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

167 await self._connection.close(code=close_code) 

168 

169 def _send_message(self, msg): 

170 # FIXME overhaul back-pressure model 

171 async def send(): 

172 self.log.debug("Sending message: %r", msg) 

173 try: 

174 await self._connection.send(_serialize(msg)) 

175 except Exception as e: 

176 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

177 self.loop.create_task( 

178 send(), 

179 **py38args(name="WebSocket sending of %r" % msg) 

180 ) 

181 

182 async def release(self): 

183 await super().release() 

184 try: 

185 await self._connection.wait_closed() 

186 except asyncio.CancelledError: 

187 self.log.warning( 

188 "Connection %s was not closed by peer in time after release", 

189 self 

190 ) 

191 

192class WSPool(interfaces.TokenInterface): 

193 _outgoing_starting: Dict[PoolKey, asyncio.Task] 

194 _pool: Dict[PoolKey, WSRemote] 

195 

196 _servers: List[websockets.WebSocketServer] 

197 

198 def __init__(self, tman, log, loop): 

199 self.loop = loop 

200 

201 self._pool = {} 

202 self._outgoing_starting = {} 

203 

204 self._servers = [] 

205 

206 # See where it is used for documentation, remove when not needed any more 

207 self._in_shutdown = False 

208 

209 self._tokenmanager = tman 

210 self.log = log 

211 

212 @classmethod 

213 async def create_transport(cls, tman: interfaces.TokenManager, log, loop, *, client_credentials, server_bind=None, server_context=None): 

214 self = cls(tman, log, loop) 

215 

216 self._client_credentials = client_credentials 

217 

218 if server_bind: 

219 host, port = server_bind 

220 if port is None: 

221 port = 8683 

222 elif port != 0: 

223 # FIXME see module documentation 

224 port = port + 3000 

225 

226 server = await websockets.serve( 

227 functools.partial(self._new_connection, scheme='coap+ws'), 

228 host, port, 

229 subprotocols=['coap'], 

230 process_request=self._process_request, 

231 ping_interval=None, # "SHOULD NOT be used" 

232 ) 

233 self._servers.append(server) 

234 

235 if server_context is not None: 

236 server = await websockets.serve( 

237 functools.partial(self._new_connection, scheme='coaps+ws'), 

238 host, port + 1, 

239 subprotocols=['coap'], 

240 process_request=self._process_request, 

241 ping_interval=None, # "SHOULD NOT be used" 

242 ssl=server_context, 

243 ) 

244 self._servers.append(server) 

245 

246 return self 

247 

248 # Helpers for WebScoket server 

249 

250 async def _new_connection(self, websocket, path=None, *, scheme): 

251 # ignoring path: Already checked in _process_request 

252 # 

253 # (path is present up to 10.0 and absent in 10.1; keeping it around to 

254 # stay compatible with different versions). 

255 

256 hostheader = websocket.request_headers['Host'] 

257 if hostheader.count(':') > 1 and '[' not in hostheader: 

258 # Workaround for websockets version before 

259 # https://github.com/aaugustin/websockets/issues/802 

260 # 

261 # To be removed once a websockets version with this fix can be 

262 # depended on 

263 hostheader = '[' + hostheader[:hostheader.rfind(':')] + ']' + hostheader[hostheader.rfind(':'):] 

264 local_hostinfo = util.hostportsplit(hostheader) 

265 

266 remote = WSRemote(self, websocket, self.loop, self.log, scheme=scheme, local_hostinfo=local_hostinfo) 

267 self._pool[remote._poolkey] = remote 

268 

269 await self._run_recv_loop(remote) 

270 

271 @staticmethod 

272 async def _process_request(path, request_headers): 

273 if path != '/.well-known/coap': 

274 return (http.HTTPStatus.NOT_FOUND, [], b"") 

275 # Continue with WebSockets 

276 return None 

277 

278 # Helpers for WebScoket client 

279 

280 def _connect(self, key: PoolKey): 

281 self._outgoing_starting[key] = self.loop.create_task( 

282 self._connect_task(key), 

283 **py38args(name="WebSocket connection opening to %r" % (key,)) 

284 ) 

285 

286 async def _connect_task(self, key: PoolKey): 

287 try: 

288 ssl_context = self._client_credentials.ssl_client_context(key.scheme, key.hostinfo) 

289 

290 hostinfo_split = util.hostportsplit(key.hostinfo) 

291 

292 websocket = await websockets.connect("%s://%s/.well-known/coap" % ( 

293 {'coap+ws': 'ws', 'coaps+ws': 'wss'}[key.scheme], key.hostinfo), 

294 subprotocols=['coap'], 

295 ping_interval=None, 

296 ssl=ssl_context, 

297 ) 

298 

299 remote = WSRemote(self, websocket, self.loop, self.log, scheme=key.scheme, remote_hostinfo=hostinfo_split) 

300 assert remote._poolkey == key, "Pool key construction is inconsistent" 

301 self._pool[key] = remote 

302 

303 self.loop.create_task( 

304 self._run_recv_loop(remote), 

305 **py38args(name="WebSocket receive loop for %r" % (key,)) 

306 ) 

307 

308 return remote 

309 finally: 

310 del self._outgoing_starting[key] 

311 

312 # Implementation of TokenInterface 

313 

314 async def fill_or_recognize_remote(self, message): 

315 if isinstance(message.remote, WSRemote) and \ 

316 message.remote._pool() is self: 

317 return True 

318 

319 if message.requested_scheme in ('coap+ws', 'coaps+ws'): 

320 key = PoolKey(message.requested_scheme, message.remote.hostinfo) 

321 

322 if key in self._pool: 

323 message.remote = self._pool[key] 

324 if message.remote._connection.open: 

325 return True 

326 # else try opening a new one 

327 

328 if key not in self._outgoing_starting: 

329 self._connect(key) 

330 # It's a bit unorthodox to wait for an (at least partially) 

331 # established connection in fill_or_recognize_remote, but it's 

332 # not completely off off either, and it makes it way easier to 

333 # not have partially initialized remotes around 

334 message.remote = await self._outgoing_starting[key] 

335 return True 

336 

337 return False 

338 

339 def send_message(self, message, messageerror_monitor): 

340 # Ignoring messageerror_monitor: CoAP over reliable transports has no 

341 # way of indicating that a particular message was bad, it always shuts 

342 # down the complete connection 

343 

344 if message.code.is_response(): 

345 no_response = (message.opt.no_response or 0) & (1 << message.code.class_ - 1) != 0 

346 if no_response: 

347 return 

348 

349 message.opt.no_response = None 

350 

351 message.remote._send_message(message) 

352 

353 async def shutdown(self): 

354 self._in_shutdown = True 

355 self.log.debug("Shutting down any connections on %r", self) 

356 

357 client_shutdowns = [ 

358 asyncio.create_task( 

359 c.release(), 

360 **py38args(name="Close connection %s" % c) 

361 ) 

362 for c in self._pool.values()] 

363 

364 server_shutdowns = [] 

365 while self._servers: 

366 s = self._servers.pop() 

367 # We could do something like 

368 # >>> for websocket in s.websockets: 

369 # >>> del websocket.logger.extra['websocket'] 

370 # to reduce the reference loops 

371 # (websocket.logger.extra['websocket'] == websocket), but as the 

372 # tests actually do run a GC collection once and that gets broken 

373 # up, it's not worth adding fragilty here 

374 s.close() 

375 server_shutdowns.append(asyncio.create_task( 

376 s.wait_closed(), 

377 **py38args(name="Close server %s" % s))) 

378 

379 # Placing client shutdowns before server shutdowns to give them a 

380 # chance to send out Abort messages; the .close() method could be more 

381 # helpful here by stopping new connections but letting us finish off 

382 # the old ones 

383 shutdowns = client_shutdowns + server_shutdowns 

384 if shutdowns: 

385 # wait is documented to require a non-empty set 

386 await asyncio.wait(shutdowns) 

387 

388 # Incoming message processing 

389 

390 async def _run_recv_loop(self, remote): 

391 remote._send_initial_csm() 

392 

393 while True: 

394 try: 

395 received = await remote._connection.recv() 

396 except websockets.exceptions.ConnectionClosed: 

397 # This check is purely needed to silence the warning printed 

398 # from tokenmanager, "Internal shutdown sequence msismatch: 

399 # error dispatched through tokenmanager after shutdown" -- and 

400 # is a symptom of https://github.com/chrysn/aiocoap/issues/284 

401 # and of the odd circumstance that we can't easily cancel the 

402 # _run_recv_loop tasks (as we should while that issue is 

403 # unresolved) in the shutdown handler. 

404 if not self._in_shutdown: 

405 # FIXME if deposited somewhere, mark that as stale? 

406 self._tokenmanager.dispatch_error(error.RemoteServerShutdown("Peer closed connection"), remote) 

407 return 

408 

409 if not isinstance(received, bytes): 

410 await remote._abort_with_waiting(Message(code=ABORT, payload=b"Text frame received"), close_code=1003) 

411 return 

412 

413 try: 

414 msg = _decode_message(received) 

415 except error.UnparsableMessage: 

416 await remote._abort_with_waiting(Message(code=ABORT, payload=b"Message parsing error"), close_code=1007) 

417 return 

418 

419 msg.remote = remote 

420 

421 if msg.code.is_signalling(): 

422 try: 

423 remote._process_signaling(msg) 

424 except rfc8323common.CloseConnection as e: 

425 self._tokenmanager.dispatch_error(e.args[0], msg.remote) 

426 self._pool.pop(remote._poolkey) 

427 await remote._connection.close() 

428 continue 

429 

430 if remote._remote_settings is None: 

431 remote.abort("No CSM received") 

432 return 

433 

434 if msg.code.is_response(): 

435 self._tokenmanager.process_response(msg) 

436 # ignoring the return value; unexpected responses can be the 

437 # asynchronous result of cancelled observations 

438 else: 

439 self._tokenmanager.process_request(msg)