Coverage for aiocoap/transports/ws.py: 88%
196 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""
6This moduel implements a TokenInterface for `CoAP over WebSockets`_.
8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4
10As with CoAP-over-TCP, while the transport distinguishes a connection initiator
11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"),
12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP
13client). As the WebSocket client can not possibly be connected to (even by the
14same server -- once the connection is closed, it's gone and even a new one
15likely has a different port), aiocoap does not allow expressing their addresses
16in URIs (given they wouldn't serve their purpose as URLs and don't provide any
17stability either). Requests to a CoAP-over-WS client can be made by assigning
18the remote to an outgoing request.
20Port choice
21-----------
23Unlike the other transports, CoAP-over-WS is specified with a privileged port
24(port 80) as the default port. This is impractical for aiocoap servers for two
25reasons:
27 * Unless explicitly configured, aiocoap is typically run as an unprivileged
28 user (and has no provisions in place to receive a socket by other means
29 than opening it).
31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website
32 running on the same port on a full HTTP server. That server is usually
33 capable of forwarding requests, whereas the ``websockets`` module used by
34 aiocoap is in no position to either serve websites nor to proxy to an
35 underlying server.
37The recommended setup is therefore to run a full web server at port 80, and
38configure it to proxy incoming requests for WebSockets at `/.well-known/coap`
39to aiocoap's server, which defaults to binding to port 8683.
41The port choice of outgoing connections, or the interpretation of the
42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of
43course unaffected by this.
45.. warning::
47 Due to a shortcoming of aiocoap's way of specifying ports to bind
48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that
49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server
50 keys are given, the TLS server is launched on the next port after the HTTP
51 server (typically 8684).
53Using on pyodide_
54-----------------
56When use on pyodide_,
57instead of using the ``websockets`` module,
58a simplified client-only back-end is used,
59which utilizes the browser's WebSocket API.
61.. _pyodide: https://pyodide.org/
62"""
64from __future__ import annotations
66from typing import Dict, List
67from collections import namedtuple
68import asyncio
69import functools
70import http
71import weakref
73from aiocoap import Message, interfaces, ABORT, util, error
74from aiocoap.transports import rfc8323common
75from ..util.asyncio import py38args
76from ..defaults import is_pyodide
78if is_pyodide:
79 import aiocoap.util.pyodide_websockets as websockets
80else:
81 import websockets
83def _decode_message(data: bytes) -> Message:
84 codeoffset = 1
85 tokenoffset = 2
87 tkl = data[0]
88 if tkl > 8:
89 raise error.UnparsableMessage("Overly long token")
90 code = data[codeoffset]
91 token = data[tokenoffset:tokenoffset + tkl]
93 msg = Message(code=code, token=token)
95 msg.payload = msg.opt.decode(data[tokenoffset + tkl:])
97 return msg
99def _serialize(msg: Message) -> bytes:
100 tkl = len(msg.token)
101 if tkl > 8:
102 raise ValueError("Overly long token")
104 data = [
105 bytes((tkl, msg.code,)),
106 msg.token,
107 msg.opt.encode(),
108 ]
109 if msg.payload:
110 data += [b'\xff', msg.payload]
112 return b"".join(data)
114PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo"))
116class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress):
117 _connection: websockets.WebSocketCommonProtocol
118 # Only used to ensure that remotes are associated to the right pool -- not
119 # that there'd be any good reason to have multiple of those.
120 _pool: weakref.ReferenceType[WSPool]
122 scheme = None # Override property -- it's per instance here
124 def __init__(self, pool, connection, loop, log, *, scheme, local_hostinfo=None, remote_hostinfo=None):
125 super().__init__()
126 self._pool = weakref.ref(pool)
127 self._connection = connection
128 self.loop = loop
129 self.log = log
131 self._local_is_server = isinstance(connection, websockets.WebSocketServerProtocol)
133 if local_hostinfo is None:
134 self._local_hostinfo = self._connection.local_address[:2]
135 else:
136 self._local_hostinfo = local_hostinfo
137 if remote_hostinfo is None:
138 self._remote_hostinfo = self._connection.remote_address[:2]
139 else:
140 self._remote_hostinfo = remote_hostinfo
142 self.scheme = scheme
144 # Goes both for client and for server ends; on the server end, it
145 # ensures that role reversal URIs can be used even when passed as URIs
146 # and not as remotes (although that's of course only possible locally).
147 self._poolkey = PoolKey(self.scheme, self.hostinfo)
149 # Necessary for RFC8323Remote
151 def _abort_with(self, msg, *, close_code=1002):
152 # Like _send_message, this may take actual time -- but unlike there,
153 # there's no need to regulate back-pressure
154 self.loop.create_task(
155 self._abort_with_waiting(msg, close_code=close_code),
156 **py38args(name="Abortion WebSocket sonnection with %r" % msg)
157 )
159 # Unlike _send_message, this is pulled out of the the _abort_with function
160 # as it's also used in _run_recv_loop
161 async def _abort_with_waiting(self, msg, *, close_code):
162 self.log.debug("Aborting with message: %r", msg)
163 try:
164 await self._connection.send(_serialize(msg))
165 except Exception as e:
166 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e)
167 await self._connection.close(code=close_code)
169 def _send_message(self, msg):
170 # FIXME overhaul back-pressure model
171 async def send():
172 self.log.debug("Sending message: %r", msg)
173 try:
174 await self._connection.send(_serialize(msg))
175 except Exception as e:
176 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e)
177 self.loop.create_task(
178 send(),
179 **py38args(name="WebSocket sending of %r" % msg)
180 )
182 async def release(self):
183 await super().release()
184 try:
185 await self._connection.wait_closed()
186 except asyncio.CancelledError:
187 self.log.warning(
188 "Connection %s was not closed by peer in time after release",
189 self
190 )
192class WSPool(interfaces.TokenInterface):
193 _outgoing_starting: Dict[PoolKey, asyncio.Task]
194 _pool: Dict[PoolKey, WSRemote]
196 _servers: List[websockets.WebSocketServer]
198 def __init__(self, tman, log, loop):
199 self.loop = loop
201 self._pool = {}
202 self._outgoing_starting = {}
204 self._servers = []
206 # See where it is used for documentation, remove when not needed any more
207 self._in_shutdown = False
209 self._tokenmanager = tman
210 self.log = log
212 @classmethod
213 async def create_transport(cls, tman: interfaces.TokenManager, log, loop, *, client_credentials, server_bind=None, server_context=None):
214 self = cls(tman, log, loop)
216 self._client_credentials = client_credentials
218 if server_bind:
219 host, port = server_bind
220 if port is None:
221 port = 8683
222 elif port != 0:
223 # FIXME see module documentation
224 port = port + 3000
226 server = await websockets.serve(
227 functools.partial(self._new_connection, scheme='coap+ws'),
228 host, port,
229 subprotocols=['coap'],
230 process_request=self._process_request,
231 ping_interval=None, # "SHOULD NOT be used"
232 )
233 self._servers.append(server)
235 if server_context is not None:
236 server = await websockets.serve(
237 functools.partial(self._new_connection, scheme='coaps+ws'),
238 host, port + 1,
239 subprotocols=['coap'],
240 process_request=self._process_request,
241 ping_interval=None, # "SHOULD NOT be used"
242 ssl=server_context,
243 )
244 self._servers.append(server)
246 return self
248 # Helpers for WebScoket server
250 async def _new_connection(self, websocket, path=None, *, scheme):
251 # ignoring path: Already checked in _process_request
252 #
253 # (path is present up to 10.0 and absent in 10.1; keeping it around to
254 # stay compatible with different versions).
256 hostheader = websocket.request_headers['Host']
257 if hostheader.count(':') > 1 and '[' not in hostheader:
258 # Workaround for websockets version before
259 # https://github.com/aaugustin/websockets/issues/802
260 #
261 # To be removed once a websockets version with this fix can be
262 # depended on
263 hostheader = '[' + hostheader[:hostheader.rfind(':')] + ']' + hostheader[hostheader.rfind(':'):]
264 local_hostinfo = util.hostportsplit(hostheader)
266 remote = WSRemote(self, websocket, self.loop, self.log, scheme=scheme, local_hostinfo=local_hostinfo)
267 self._pool[remote._poolkey] = remote
269 await self._run_recv_loop(remote)
271 @staticmethod
272 async def _process_request(path, request_headers):
273 if path != '/.well-known/coap':
274 return (http.HTTPStatus.NOT_FOUND, [], b"")
275 # Continue with WebSockets
276 return None
278 # Helpers for WebScoket client
280 def _connect(self, key: PoolKey):
281 self._outgoing_starting[key] = self.loop.create_task(
282 self._connect_task(key),
283 **py38args(name="WebSocket connection opening to %r" % (key,))
284 )
286 async def _connect_task(self, key: PoolKey):
287 try:
288 ssl_context = self._client_credentials.ssl_client_context(key.scheme, key.hostinfo)
290 hostinfo_split = util.hostportsplit(key.hostinfo)
292 websocket = await websockets.connect("%s://%s/.well-known/coap" % (
293 {'coap+ws': 'ws', 'coaps+ws': 'wss'}[key.scheme], key.hostinfo),
294 subprotocols=['coap'],
295 ping_interval=None,
296 ssl=ssl_context,
297 )
299 remote = WSRemote(self, websocket, self.loop, self.log, scheme=key.scheme, remote_hostinfo=hostinfo_split)
300 assert remote._poolkey == key, "Pool key construction is inconsistent"
301 self._pool[key] = remote
303 self.loop.create_task(
304 self._run_recv_loop(remote),
305 **py38args(name="WebSocket receive loop for %r" % (key,))
306 )
308 return remote
309 finally:
310 del self._outgoing_starting[key]
312 # Implementation of TokenInterface
314 async def fill_or_recognize_remote(self, message):
315 if isinstance(message.remote, WSRemote) and \
316 message.remote._pool() is self:
317 return True
319 if message.requested_scheme in ('coap+ws', 'coaps+ws'):
320 key = PoolKey(message.requested_scheme, message.remote.hostinfo)
322 if key in self._pool:
323 message.remote = self._pool[key]
324 if message.remote._connection.open:
325 return True
326 # else try opening a new one
328 if key not in self._outgoing_starting:
329 self._connect(key)
330 # It's a bit unorthodox to wait for an (at least partially)
331 # established connection in fill_or_recognize_remote, but it's
332 # not completely off off either, and it makes it way easier to
333 # not have partially initialized remotes around
334 message.remote = await self._outgoing_starting[key]
335 return True
337 return False
339 def send_message(self, message, messageerror_monitor):
340 # Ignoring messageerror_monitor: CoAP over reliable transports has no
341 # way of indicating that a particular message was bad, it always shuts
342 # down the complete connection
344 if message.code.is_response():
345 no_response = (message.opt.no_response or 0) & (1 << message.code.class_ - 1) != 0
346 if no_response:
347 return
349 message.opt.no_response = None
351 message.remote._send_message(message)
353 async def shutdown(self):
354 self._in_shutdown = True
355 self.log.debug("Shutting down any connections on %r", self)
357 client_shutdowns = [
358 asyncio.create_task(
359 c.release(),
360 **py38args(name="Close connection %s" % c)
361 )
362 for c in self._pool.values()]
364 server_shutdowns = []
365 while self._servers:
366 s = self._servers.pop()
367 # We could do something like
368 # >>> for websocket in s.websockets:
369 # >>> del websocket.logger.extra['websocket']
370 # to reduce the reference loops
371 # (websocket.logger.extra['websocket'] == websocket), but as the
372 # tests actually do run a GC collection once and that gets broken
373 # up, it's not worth adding fragilty here
374 s.close()
375 server_shutdowns.append(asyncio.create_task(
376 s.wait_closed(),
377 **py38args(name="Close server %s" % s)))
379 # Placing client shutdowns before server shutdowns to give them a
380 # chance to send out Abort messages; the .close() method could be more
381 # helpful here by stopping new connections but letting us finish off
382 # the old ones
383 shutdowns = client_shutdowns + server_shutdowns
384 if shutdowns:
385 # wait is documented to require a non-empty set
386 await asyncio.wait(shutdowns)
388 # Incoming message processing
390 async def _run_recv_loop(self, remote):
391 remote._send_initial_csm()
393 while True:
394 try:
395 received = await remote._connection.recv()
396 except websockets.exceptions.ConnectionClosed:
397 # This check is purely needed to silence the warning printed
398 # from tokenmanager, "Internal shutdown sequence msismatch:
399 # error dispatched through tokenmanager after shutdown" -- and
400 # is a symptom of https://github.com/chrysn/aiocoap/issues/284
401 # and of the odd circumstance that we can't easily cancel the
402 # _run_recv_loop tasks (as we should while that issue is
403 # unresolved) in the shutdown handler.
404 if not self._in_shutdown:
405 # FIXME if deposited somewhere, mark that as stale?
406 self._tokenmanager.dispatch_error(error.RemoteServerShutdown("Peer closed connection"), remote)
407 return
409 if not isinstance(received, bytes):
410 await remote._abort_with_waiting(Message(code=ABORT, payload=b"Text frame received"), close_code=1003)
411 return
413 try:
414 msg = _decode_message(received)
415 except error.UnparsableMessage:
416 await remote._abort_with_waiting(Message(code=ABORT, payload=b"Message parsing error"), close_code=1007)
417 return
419 msg.remote = remote
421 if msg.code.is_signalling():
422 try:
423 remote._process_signaling(msg)
424 except rfc8323common.CloseConnection as e:
425 self._tokenmanager.dispatch_error(e.args[0], msg.remote)
426 self._pool.pop(remote._poolkey)
427 await remote._connection.close()
428 continue
430 if remote._remote_settings is None:
431 remote.abort("No CSM received")
432 return
434 if msg.code.is_response():
435 self._tokenmanager.process_response(msg)
436 # ignoring the return value; unexpected responses can be the
437 # asynchronous result of cancelled observations
438 else:
439 self._tokenmanager.process_request(msg)