Coverage for aiocoap/transports/ws.py: 85%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

170 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9""" 

10This moduel implements a TokenInterface for `CoAP over WebSockets`_. 

11 

12.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4 

13 

14As with CoAP-over-TCP, while the transport distinguishes a connection initiator 

15("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"), 

16both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP 

17client). As the WebSocket client can not possibly be connected to (even by the 

18same server -- once the connection is closed, it's gone and even a new one 

19likely has a different port), aiocoap does not allow expressing their addresses 

20in URIs (given they wouldn't serve their purpose as URLs and don't provide any 

21stability either). Requests to a CoAP-over-WS client can be made by assigning 

22the remote to an outgoing request. 

23 

24Port choice 

25----------- 

26 

27Unlike the other transports, CoAP-over-WS is specified with a privileged port 

28(port 80) as the default port. This is impractical for aiocoap servers for two 

29reasons: 

30 

31 * Unless explicitly configured, aiocoap is typically run as an unprivileged 

32 user (and has no provisions in place to receive a socket by other means 

33 than opening it). 

34 

35 * Where a CoAP-over-WS proxy is run, there is often a "proper" website 

36 running on the same port on a full HTTP server. That server is usually 

37 capable of forwarding requests, whereas the ``websockets`` module used by 

38 aiocoap is in no position to either serve websites nor to proxy to an 

39 underlying server. 

40 

41The recommended setup is therefore to run a full web server at port 80, and 

42configure it to proxy incoming requests for WebSockets at `/.well-known/coap` 

43to aiocoap's server, which defaults to binding to port 8683. 

44 

45The port choice of outgoing connections, or the interpretation of the 

46protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of 

47course unaffected by this. 

48 

49.. warning:: 

50 

51 Due to a shortcoming of aiocoap's way of specifying ports to bind 

52 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that 

53 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server 

54 keys are given, the TLS server is launched on the next port after the HTTP 

55 server (typically 8684). 

56""" 

57 

58from __future__ import annotations 

59 

60from typing import Dict, List 

61from collections import namedtuple 

62import asyncio 

63import functools 

64import http 

65import weakref 

66 

67from aiocoap import Message, interfaces, ABORT, util, error 

68from aiocoap.transports import rfc8323common 

69from ..util.asyncio import py38args 

70 

71import websockets 

72 

73def _decode_message(data: bytes) -> Message: 

74 codeoffset = 1 

75 tokenoffset = 2 

76 

77 tkl = data[0] 

78 if tkl > 8: 

79 raise error.UnparsableMessage("Overly long token") 

80 code = data[codeoffset] 

81 token = data[tokenoffset:tokenoffset + tkl] 

82 

83 msg = Message(code=code, token=token) 

84 

85 msg.payload = msg.opt.decode(data[tokenoffset + tkl:]) 

86 

87 return msg 

88 

89def _serialize(msg: Message) -> bytes: 

90 tkl = len(msg.token) 

91 if tkl > 8: 

92 raise ValueError("Overly long token") 

93 

94 data = [ 

95 bytes((tkl, msg.code,)), 

96 msg.token, 

97 msg.opt.encode(), 

98 ] 

99 if msg.payload: 

100 data += [b'\xff', msg.payload] 

101 

102 return b"".join(data) 

103 

104PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo")) 

105 

106class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress): 

107 _connection: websockets.WebSocketCommonProtocol 

108 # Only used to ensure that remotes are associated to the right pool -- not 

109 # that there'd be any good reason to have multiple of those. 

110 _pool: weakref.ReferenceType[WSPool] 

111 

112 scheme = None # Override property -- it's per instance here 

113 

114 def __init__(self, pool, connection, loop, log, *, scheme, local_hostinfo=None, remote_hostinfo=None): 

115 super().__init__() 

116 self._pool = weakref.ref(pool) 

117 self._connection = connection 

118 self.loop = loop 

119 self.log = log 

120 

121 self._is_server = isinstance(connection, websockets.WebSocketServerProtocol) 

122 

123 if local_hostinfo is None: 

124 self._local_hostinfo = self._connection.local_address[:2] 

125 else: 

126 self._local_hostinfo = local_hostinfo 

127 if remote_hostinfo is None: 

128 self._remote_hostinfo = self._connection.remote_address[:2] 

129 else: 

130 self._remote_hostinfo = remote_hostinfo 

131 

132 self.scheme = scheme 

133 

134 # Necessary for RFC8323Remote 

135 

136 def _abort_with(self, msg, *, close_code=1002): 

137 # Like _send_message, this may take actual time -- but unlike there, 

138 # there's no need to regulate back-pressure 

139 self.loop.create_task( 

140 self._abort_with_waiting(msg, close_code=close_code), 

141 **py38args(name="Abortion WebSocket sonnection with %r" % msg) 

142 ) 

143 

144 # Unlike _send_message, this is pulled out of the the _abort_with function 

145 # as it's also used in _run_recv_loop 

146 async def _abort_with_waiting(self, msg, *, close_code): 

147 self.log.debug("Aborting with message: %r", msg) 

148 try: 

149 await self._connection.send(_serialize(msg)) 

150 except Exception as e: 

151 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

152 await self._connection.close(code=close_code) 

153 

154 def _send_message(self, msg): 

155 # FIXME overhaul back-pressure model 

156 async def send(): 

157 self.log.debug("Sending message: %r", msg) 

158 try: 

159 await self._connection.send(_serialize(msg)) 

160 except Exception as e: 

161 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

162 self.loop.create_task( 

163 send(), 

164 **py38args(name="WebSocket sending of %r" % msg) 

165 ) 

166 

167class WSPool(interfaces.TokenInterface): 

168 _outgoing_starting: Dict[PoolKey, asyncio.Task] 

169 _pool: Dict[PoolKey, WSRemote] 

170 

171 _servers: List[websockets.WebSocketServer] 

172 

173 def __init__(self, tman, log, loop): 

174 self.loop = loop 

175 

176 self._pool = {} 

177 self._outgoing_starting = {} 

178 

179 self._servers = [] 

180 

181 self._tokenmanager = tman 

182 self.log = log 

183 

184 @classmethod 

185 async def create_transport(cls, tman: interfaces.TokenManager, log, loop, *, client_credentials, server_bind=None, server_context=None): 

186 self = cls(tman, log, loop) 

187 

188 self._client_credentials = client_credentials 

189 

190 if server_bind: 

191 host, port = server_bind 

192 if port is None: 

193 port = 8683 

194 else: 

195 # FIXME see module documentation 

196 port = port + 3000 

197 

198 server = await websockets.serve( 

199 functools.partial(self._new_connection, scheme='coap+ws'), 

200 host, port, 

201 subprotocols=['coap'], 

202 process_request=self._process_request, 

203 ping_interval=None, # "SHOULD NOT be used" 

204 ) 

205 self._servers.append(server) 

206 

207 if server_context is not None: 

208 server = await websockets.serve( 

209 functools.partial(self._new_connection, scheme='coaps+ws'), 

210 host, port + 1, 

211 subprotocols=['coap'], 

212 process_request=self._process_request, 

213 ping_interval=None, # "SHOULD NOT be used" 

214 ssl=server_context, 

215 ) 

216 self._servers.append(server) 

217 

218 return self 

219 

220 # Helpers for WebScoket server 

221 

222 async def _new_connection(self, websocket, path=None, *, scheme): 

223 # ignoring path: Already checked in _process_request 

224 # 

225 # (path is present up to 10.0 and absent in 10.1; keeping it around to 

226 # stay compatible with different versions). 

227 

228 hostheader = websocket.request_headers['Host'] 

229 if hostheader.count(':') > 1 and '[' not in hostheader: 

230 # Workaround for websockets version before 

231 # https://github.com/aaugustin/websockets/issues/802 

232 # 

233 # To be removed once a websockets version with this fix can be 

234 # depended on 

235 hostheader = '[' + hostheader[:hostheader.rfind(':')] + ']' + hostheader[hostheader.rfind(':'):] 

236 local_hostinfo = util.hostportsplit(hostheader) 

237 

238 remote = WSRemote(self, websocket, self.loop, self.log, scheme=scheme, local_hostinfo=local_hostinfo) 

239 

240 await self._run_recv_loop(remote) 

241 

242 @staticmethod 

243 async def _process_request(path, request_headers): 

244 if path != '/.well-known/coap': 

245 return (http.HTTPStatus.NOT_FOUND, [], b"") 

246 # Continue with WebSockets 

247 return None 

248 

249 # Helpers for WebScoket client 

250 

251 def _connect(self, key: PoolKey): 

252 self._outgoing_starting[key] = self.loop.create_task( 

253 self._connect_task(key), 

254 **py38args(name="WebSocket connection opening to %r" % (key,)) 

255 ) 

256 

257 async def _connect_task(self, key: PoolKey): 

258 try: 

259 if key.scheme == 'coaps+ws': 

260 ssl_context = self._client_credentials.ssl_client_context(key.scheme, key.hostinfo) 

261 else: 

262 # websockets library would not appreciate the extra info when connecting to ws:// 

263 ssl_context = None 

264 

265 hostinfo_split = util.hostportsplit(key.hostinfo) 

266 

267 websocket = await websockets.connect("%s://%s/.well-known/coap" % ( 

268 {'coap+ws': 'ws', 'coaps+ws': 'wss'}[key.scheme], key.hostinfo), 

269 subprotocols=['coap'], 

270 ping_interval=None, 

271 ssl=ssl_context, 

272 ) 

273 

274 remote = WSRemote(self, websocket, self.loop, self.log, scheme=key.scheme, remote_hostinfo=hostinfo_split) 

275 self._pool[remote] = remote 

276 

277 self.loop.create_task( 

278 self._run_recv_loop(remote), 

279 **py38args(name="WebSocket receive loop for %r" % (key,)) 

280 ) 

281 

282 return remote 

283 finally: 

284 del self._outgoing_starting[key] 

285 

286 # Implementation of TokenInterface 

287 

288 async def fill_or_recognize_remote(self, message): 

289 if isinstance(message.remote, WSRemote) and \ 

290 message.remote._pool() is self: 

291 return True 

292 

293 if message.requested_scheme in ('coap+ws', 'coaps+ws'): 

294 key = PoolKey(message.requested_scheme, message.remote.hostinfo) 

295 

296 if key in self._pool: 

297 message.remote = self._pool[key] 

298 if message.remote._connection.open: 

299 return True 

300 # else try opening a new one 

301 

302 if key not in self._outgoing_starting: 

303 self._connect(key) 

304 # It's a bit unorthodox to wait for an (at least partially) 

305 # established connection in fill_or_recognize_remote, but it's 

306 # not completely off off either, and it makes it way easier to 

307 # not have partially initialized remotes around 

308 message.remote = await self._outgoing_starting[key] 

309 return True 

310 

311 return False 

312 

313 def send_message(self, message, messageerror_monitor): 

314 # Ignoring messageerror_monitor: CoAP over reliable transports has no 

315 # way of indicating that a particular message was bad, it always shuts 

316 # down the complete connection 

317 

318 if message.code.is_response(): 

319 no_response = (message.opt.no_response or 0) & (1 << message.code.class_ - 1) != 0 

320 if no_response: 

321 return 

322 

323 message.opt.no_response = None 

324 

325 message.remote._send_message(message) 

326 

327 async def shutdown(self): 

328 while self._servers: 

329 # could be parallelized, but what are the chances there'll actually be multiple 

330 s = self._servers.pop() 

331 # We could do something like 

332 # >>> for websocket in s.websockets: 

333 # >>> del websocket.logger.extra['websocket'] 

334 # to reduce the reference loops 

335 # (websocket.logger.extra['websocket'] == websocket), but as the 

336 # tests actually do run a GC collection once and that gets broken 

337 # up, it's not worth adding fragilty here 

338 s.close() 

339 await s.wait_closed() 

340 

341 # FIXME any handling needed for outgoing connections? 

342 

343 # Incoming message processing 

344 

345 async def _run_recv_loop(self, remote): 

346 remote._send_initial_csm() 

347 

348 while True: 

349 try: 

350 received = await remote._connection.recv() 

351 except websockets.exceptions.ConnectionClosed: 

352 # FIXME if deposited somewhere, mark that as stale? 

353 self._tokenmanager.dispatch_error(error.RemoteServerShutdown(), remote) 

354 return 

355 

356 if not isinstance(received, bytes): 

357 await remote._abort_with_waiting(Message(code=ABORT, payload=b"Text frame received"), close_code=1003) 

358 return 

359 

360 try: 

361 msg = _decode_message(received) 

362 except error.UnparsableMessage: 

363 await remote._abort_with_waiting(Message(code=ABORT, payload=b"Message parsing error"), close_code=1007) 

364 return 

365 

366 msg.remote = remote 

367 

368 if msg.code.is_signalling(): 

369 remote._process_signaling(msg) 

370 continue 

371 

372 if remote._remote_settings is None: 

373 remote.abort("No CSM received") 

374 return 

375 

376 if msg.code.is_response(): 

377 self._tokenmanager.process_response(msg) 

378 # ignoring the return value; unexpected responses can be the 

379 # asynchronous result of cancelled observations 

380 else: 

381 self._tokenmanager.process_request(msg)