Coverage for aiocoap/credentials.py: 78%
143 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module describes how security credentials are expressed in aiocoap,
6how security protocols (TLS, DTLS, OSCOAP) can store and access their key
7material, and for which URIs they are used.
9For consistency, mappings between accessible resources and their credentials
10are always centered around URIs. This is slightly atypical, because a client
11will typically use a particular set of credentials for all operations on one
12server, while a server first loads all available credentials and then filters
13out whether the client may actually access a resource per-path, but it works
14with full URIs (or patterns thereof) just as well. That approach allows using
15more similar structures both on the server and the client, and works smoothly
16for virtual hosting, firewalling and clients accessing resources with varying
17credentials.
19Still, client and server credentials are kept apart, lest a server open up (and
20potentially reveal) to a PSK set it is only configured to use as a client.
21While client credentials already have their place in
22:attr:`aiocoap.protocol.Context.client_credentials`, server credentials are not
23in use at a standardized location yet because there is only code in the OSCORE
24plug tests that can use it so far.
26Library developer notes
27~~~~~~~~~~~~~~~~~~~~~~~
29This whole module currently relies on a mixture of introspection and manual
30parsing of the JSON-ish tree. A preferred expression of the same would rely on
31the credentials.cddl description and build an object tree from that, but the
32author is unaware of any existing CDDL Python implementation. That might also
33ease porting to platforms that don't support inspect like micropython does.
34"""
36import re
37import inspect
39from typing import Optional
42'''
43server: {
44 'coaps://mysite/*': { 'dtls-psk' (or other granularity): { 'psk': 'abcd' }},
45 'coap://mysite/*': { 'oscore': { 'contextfile': 'my-contextfile/' } },
46 'coap://myothersite/firmware': ':myotherkey',
47 'coap://myothersite/reset': ':myotherkey',
48 'coap://othersite*': { 'unprotected': true },
49 ':myotherkey': { 'oscore': { 'contextfile': 'my-contextfile/' } }
50 }
52server can of course just say it doesn't want to have the Site handle it and
53just say '*': { 'unprotected': true }, add some ':foo': {'dtls-psk': ...}
54entries (so communication can be established in the first place) and let
55individual resources decide whether they return 4.01 or something else.
57client can be the same with different implied role, or have something like
59client: {
60 'coap://myothersite/*': ':myotherkey',
61 ...
62 }
64in future also
66server: {
67 'coaps://mysite/*': { 'dtls-cert': {'key': '...pem', 'cert': '...crt'} }
68 }
70client: {
71 '*': { 'dtls-cert': { 'ca': '/etc/ssl/...' } }
72}
74or more complex ones:
76server: {
77 'coaps://myothersite/wellprotected': { 'all': [ ':mydtls', ':myotherkey' ]}
78 'coaps://myothersite/*': { 'any': [ ':mydtls', ':myotherkey' ]}
79}
80'''
82class CredentialsLoadError(ValueError):
83 """Raised by functions that create a CredentialsMap or its parts from
84 simple data structures"""
86class CredentialsMissingError(RuntimeError):
87 """Raised when no suiting credentials can be found for a message, or
88 credentials are found but inapplicable to a transport's security
89 mechanisms."""
91class CredentialReference:
92 def __init__(self, target, map):
93 if not target.startswith(':'):
94 raise CredentialsLoadError("Credential references must start with a colon (':')")
95 self.target = target
96 self.map = map
98 # FIXME either generalize this with getattr, or introduce a function to
99 # resolve any indirect credentials to a particular instance.
101 def as_dtls_psk(self):
102 return self.map[self.target].as_dtls_psk()
104class _Listish(list):
105 @classmethod
106 def from_item(cls, v):
107 if not isinstance(v, list):
108 raise CredentialsLoadError("%s goes with a list" % cls.__name__)
109 return cls(v)
111class AnyOf(_Listish):
112 pass
114class AllOf(_Listish):
115 pass
117def _call_from_structureddata(constructor, name, init_data):
118 if not isinstance(init_data, dict):
119 raise CredentialsLoadError("%s goes with an object" % name)
121 init_data = {k.replace('-', '_'): v for (k, v) in init_data.items()}
123 sig = inspect.signature(constructor)
125 checked_items = {}
127 for k, v in init_data.items():
128 try:
129 annotation = sig.parameters[k].annotation
130 except KeyError:
131 # let this raise later in binding
132 checked_items[k] = object()
134 if isinstance(v, dict) and 'ascii' in v:
135 if len(v) != 1:
136 raise CredentialsLoadError("ASCII objects can only have one elemnt.")
137 try:
138 v = v['ascii'].encode('ascii')
139 except UnicodeEncodeError:
140 raise CredentialsLoadError("Elements of the ASCII object can not be represented in ASCII, please use binary or hex representation.")
143 if isinstance(v, dict) and 'hex' in v:
144 if len(v) != 1:
145 raise CredentialsLoadError("Hex objects can only have one elemnt.")
146 try:
147 v = bytes.fromhex(v['hex'].replace('-', '').replace(' ', '').replace(':', ''))
148 except ValueError as e:
149 raise CredentialsLoadError("Hex object can not be read: %s" % (e.args[0]))
151 # Not using isinstance because I foundno way to extract the type
152 # information from an Optional/Union again; this whole thing works
153 # only for strings and ints anyway, so why not.
154 if type(v) != annotation and Optional[type(v)] != annotation:
155 # explicitly not excluding inspect._empty here: constructors
156 # need to be fully annotated
157 raise CredentialsLoadError("Type mismatch in attribute %s of %s: expected %s, got %r" % (k, name, annotation, v))
159 checked_items[k] = v
161 try:
162 bound = sig.bind(**checked_items)
163 except TypeError as e:
164 raise CredentialsLoadError("%s: %s" % (name, e.args[0]))
166 return constructor(*bound.args, **bound.kwargs)
168class _Objectish:
169 @classmethod
170 def from_item(cls, init_data):
171 return _call_from_structureddata(cls, cls.__name__, init_data)
173class DTLS(_Objectish):
174 def __init__(self, psk: bytes, client_identity: bytes):
175 self.psk = psk
176 self.client_identity = client_identity
178 def as_dtls_psk(self):
179 return (self.client_identity, self.psk)
181class TLSCert(_Objectish):
182 """Indicates that a client can use the given certificate file to authenticate the server.
184 Can only be used with 'coaps+tcp://HOSTINFO/*' and 'coaps+tcp://*' forms.
185 """
186 def __init__(self, certfile: str):
187 self.certfile = certfile
189 def as_ssl_params(self):
190 """Generate parameters suitable for passing via ** to
191 ssl.create_default_context when purpose is alreay set"""
192 return {"cafile": self.certfile}
194def construct_oscore(contextfile: str):
195 from .oscore import FilesystemSecurityContext
197 return FilesystemSecurityContext(contextfile)
199construct_oscore.from_item = lambda value: _call_from_structureddata(construct_oscore, 'OSCORE', value)
201_re_cache = {}
203class CredentialsMap(dict):
204 """
205 FIXME: outdated, rewrite when usable
207 A CredentialsMap, for any URI template and operation, which
208 security contexts are sufficient to to perform the operation on a matching
209 URI.
211 The same context can be used both by the server and the client, where the
212 client uses the information on allowed client credentials to decide which
213 credentials to present, and the information on allowed server credentials
214 to decide whether the server can be trusted.
216 Conversely, the server typically loads all available server credentials at
217 startup, and then uses the client credentials list to decide whether to
218 serve the request."""
220 def load_from_dict(self, d):
221 """Populate the map from a dictionary, which would typically have been
222 loaded from a JSON/YAML file and needs to match the CDDL in
223 credentials.cddl.
225 Running this multiple times will overwriter individual entries in the
226 map."""
227 for k, v in d.items():
228 if v is None:
229 if k in self:
230 del self[k]
231 else:
232 self[k] = self._item_from_dict(v)
233 # FIXME only works that way for OSCORE clients
234 self[k].authenticated_claims = [k]
236 def _item_from_dict(self, v):
237 if isinstance(v, str):
238 return CredentialReference(v, self)
239 elif isinstance(v, dict):
240 try:
241 (key, value), = v.items()
242 except ValueError:
243 # this follows how Rust Enums are encoded in serde JSON
244 raise CredentialsLoadError(
245 "Items in a credentials map must have exactly one key"
246 " (found %s)" % ("," .join(v.keys()) or "empty")
247 )
249 try:
250 constructor = self._class_map[key].from_item
251 except KeyError:
252 raise CredentialsLoadError("Unknown credential type: %s" % key)
254 return constructor(value)
256 _class_map = {
257 'dtls': DTLS,
258 'oscore': construct_oscore,
259 'tlscert': TLSCert,
260 'any-of': AnyOf,
261 'all-of': AllOf,
262 }
264 @staticmethod
265 def _wildcard_match(searchterm, pattern):
266 if pattern not in _re_cache:
267 _re_cache[pattern] = re.compile(re.escape(pattern).replace('\\*', '.*'))
268 return _re_cache[pattern].fullmatch(searchterm) is not None
270 # used by a client
272 def credentials_from_request(self, msg):
273 """Return the most specific match to a request message. Matching is
274 currently based on wildcards, but not yet very well thought out."""
276 uri = msg.get_request_uri()
278 for i in range(1000):
279 for (k, v) in sorted(self.items(), key=lambda x: len(x[0]), reverse=True):
280 if self._wildcard_match(uri, k):
281 if isinstance(v, str):
282 uri = v
283 continue
284 return v
285 else:
286 raise CredentialsMissingError("No suitable credentials for %s" % uri)
287 else:
288 raise CredentialsLoadError("Search for suitable credentials for %s exceeds recursion limit")
290 def ssl_client_context(self, scheme, hostinfo):
291 """Return an SSL client context as configured for the given request
292 scheme and hostinfo (no full message is to be processed here, as
293 connections are used across requests to the same origin).
295 If no credentials are configured, this returns None (for which the user
296 may need to fill in ssl.create_default_context() if None is not already
297 a good indicator for the eventual consumer to use the default)."""
299 ssl_params = {}
300 tlscert = self.get('%s://%s/*' % (scheme, hostinfo), None)
301 if tlscert is None:
302 tlscert = self.get('%s://*' % scheme, None)
303 if tlscert is not None:
304 ssl_params = tlscert.as_ssl_params()
305 if ssl_params:
306 import ssl
307 return ssl.create_default_context(**ssl_params)
309 # used by a server
311 def find_oscore(self, unprotected):
312 # FIXME: this is not constant-time as it should be, but too much in
313 # flux to warrant optimization
315 # FIXME: duplicate contexts for being tried out are not supported yet.
317 for item in self.values():
318 if not hasattr(item, "get_oscore_context_for"):
319 continue
321 ctx = item.get_oscore_context_for(unprotected)
322 if ctx is not None:
323 return ctx
325 raise KeyError()
327 def find_dtls_psk(self, identity):
328 # FIXME similar to find_oscore
329 for (entry, item) in self.items():
330 if not hasattr(item, "as_dtls_psk"):
331 continue
333 psk_id, psk = item.as_dtls_psk()
334 if psk_id != identity:
335 continue
337 # FIXME is returning the entry name a sane value to later put in to
338 # authenticated_claims? OSCORE does something different.
339 return (psk, entry)
341 raise KeyError()