Coverage for aiocoap/credentials.py: 78%

143 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-16 16:09 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module describes how security credentials are expressed in aiocoap, 

6how security protocols (TLS, DTLS, OSCOAP) can store and access their key 

7material, and for which URIs they are used. 

8 

9For consistency, mappings between accessible resources and their credentials 

10are always centered around URIs. This is slightly atypical, because a client 

11will typically use a particular set of credentials for all operations on one 

12server, while a server first loads all available credentials and then filters 

13out whether the client may actually access a resource per-path, but it works 

14with full URIs (or patterns thereof) just as well. That approach allows using 

15more similar structures both on the server and the client, and works smoothly 

16for virtual hosting, firewalling and clients accessing resources with varying 

17credentials. 

18 

19Still, client and server credentials are kept apart, lest a server open up (and 

20potentially reveal) to a PSK set it is only configured to use as a client. 

21While client credentials already have their place in 

22:attr:`aiocoap.protocol.Context.client_credentials`, server credentials are not 

23in use at a standardized location yet because there is only code in the OSCORE 

24plug tests that can use it so far. 

25 

26Library developer notes 

27~~~~~~~~~~~~~~~~~~~~~~~ 

28 

29This whole module currently relies on a mixture of introspection and manual 

30parsing of the JSON-ish tree. A preferred expression of the same would rely on 

31the credentials.cddl description and build an object tree from that, but the 

32author is unaware of any existing CDDL Python implementation. That might also 

33ease porting to platforms that don't support inspect like micropython does. 

34""" 

35 

36import re 

37import inspect 

38 

39from typing import Optional 

40 

41 

42''' 

43server: { 

44 'coaps://mysite/*': { 'dtls-psk' (or other granularity): { 'psk': 'abcd' }}, 

45 'coap://mysite/*': { 'oscore': { 'contextfile': 'my-contextfile/' } }, 

46 'coap://myothersite/firmware': ':myotherkey', 

47 'coap://myothersite/reset': ':myotherkey', 

48 'coap://othersite*': { 'unprotected': true }, 

49 ':myotherkey': { 'oscore': { 'contextfile': 'my-contextfile/' } } 

50 } 

51 

52server can of course just say it doesn't want to have the Site handle it and 

53just say '*': { 'unprotected': true }, add some ':foo': {'dtls-psk': ...} 

54entries (so communication can be established in the first place) and let 

55individual resources decide whether they return 4.01 or something else. 

56 

57client can be the same with different implied role, or have something like 

58 

59client: { 

60 'coap://myothersite/*': ':myotherkey', 

61 ... 

62 } 

63 

64in future also 

65 

66server: { 

67 'coaps://mysite/*': { 'dtls-cert': {'key': '...pem', 'cert': '...crt'} } 

68 } 

69 

70client: { 

71 '*': { 'dtls-cert': { 'ca': '/etc/ssl/...' } } 

72} 

73 

74or more complex ones: 

75 

76server: { 

77 'coaps://myothersite/wellprotected': { 'all': [ ':mydtls', ':myotherkey' ]} 

78 'coaps://myothersite/*': { 'any': [ ':mydtls', ':myotherkey' ]} 

79} 

80''' 

81 

82class CredentialsLoadError(ValueError): 

83 """Raised by functions that create a CredentialsMap or its parts from 

84 simple data structures""" 

85 

86class CredentialsMissingError(RuntimeError): 

87 """Raised when no suiting credentials can be found for a message, or 

88 credentials are found but inapplicable to a transport's security 

89 mechanisms.""" 

90 

91class CredentialReference: 

92 def __init__(self, target, map): 

93 if not target.startswith(':'): 

94 raise CredentialsLoadError("Credential references must start with a colon (':')") 

95 self.target = target 

96 self.map = map 

97 

98 # FIXME either generalize this with getattr, or introduce a function to 

99 # resolve any indirect credentials to a particular instance. 

100 

101 def as_dtls_psk(self): 

102 return self.map[self.target].as_dtls_psk() 

103 

104class _Listish(list): 

105 @classmethod 

106 def from_item(cls, v): 

107 if not isinstance(v, list): 

108 raise CredentialsLoadError("%s goes with a list" % cls.__name__) 

109 return cls(v) 

110 

111class AnyOf(_Listish): 

112 pass 

113 

114class AllOf(_Listish): 

115 pass 

116 

117def _call_from_structureddata(constructor, name, init_data): 

118 if not isinstance(init_data, dict): 

119 raise CredentialsLoadError("%s goes with an object" % name) 

120 

121 init_data = {k.replace('-', '_'): v for (k, v) in init_data.items()} 

122 

123 sig = inspect.signature(constructor) 

124 

125 checked_items = {} 

126 

127 for k, v in init_data.items(): 

128 try: 

129 annotation = sig.parameters[k].annotation 

130 except KeyError: 

131 # let this raise later in binding 

132 checked_items[k] = object() 

133 

134 if isinstance(v, dict) and 'ascii' in v: 

135 if len(v) != 1: 

136 raise CredentialsLoadError("ASCII objects can only have one elemnt.") 

137 try: 

138 v = v['ascii'].encode('ascii') 

139 except UnicodeEncodeError: 

140 raise CredentialsLoadError("Elements of the ASCII object can not be represented in ASCII, please use binary or hex representation.") 

141 

142 

143 if isinstance(v, dict) and 'hex' in v: 

144 if len(v) != 1: 

145 raise CredentialsLoadError("Hex objects can only have one elemnt.") 

146 try: 

147 v = bytes.fromhex(v['hex'].replace('-', '').replace(' ', '').replace(':', '')) 

148 except ValueError as e: 

149 raise CredentialsLoadError("Hex object can not be read: %s" % (e.args[0])) 

150 

151 # Not using isinstance because I foundno way to extract the type 

152 # information from an Optional/Union again; this whole thing works 

153 # only for strings and ints anyway, so why not. 

154 if type(v) != annotation and Optional[type(v)] != annotation: 

155 # explicitly not excluding inspect._empty here: constructors 

156 # need to be fully annotated 

157 raise CredentialsLoadError("Type mismatch in attribute %s of %s: expected %s, got %r" % (k, name, annotation, v)) 

158 

159 checked_items[k] = v 

160 

161 try: 

162 bound = sig.bind(**checked_items) 

163 except TypeError as e: 

164 raise CredentialsLoadError("%s: %s" % (name, e.args[0])) 

165 

166 return constructor(*bound.args, **bound.kwargs) 

167 

168class _Objectish: 

169 @classmethod 

170 def from_item(cls, init_data): 

171 return _call_from_structureddata(cls, cls.__name__, init_data) 

172 

173class DTLS(_Objectish): 

174 def __init__(self, psk: bytes, client_identity: bytes): 

175 self.psk = psk 

176 self.client_identity = client_identity 

177 

178 def as_dtls_psk(self): 

179 return (self.client_identity, self.psk) 

180 

181class TLSCert(_Objectish): 

182 """Indicates that a client can use the given certificate file to authenticate the server. 

183 

184 Can only be used with 'coaps+tcp://HOSTINFO/*' and 'coaps+tcp://*' forms. 

185 """ 

186 def __init__(self, certfile: str): 

187 self.certfile = certfile 

188 

189 def as_ssl_params(self): 

190 """Generate parameters suitable for passing via ** to 

191 ssl.create_default_context when purpose is alreay set""" 

192 return {"cafile": self.certfile} 

193 

194def construct_oscore(contextfile: str): 

195 from .oscore import FilesystemSecurityContext 

196 

197 return FilesystemSecurityContext(contextfile) 

198 

199construct_oscore.from_item = lambda value: _call_from_structureddata(construct_oscore, 'OSCORE', value) 

200 

201_re_cache = {} 

202 

203class CredentialsMap(dict): 

204 """ 

205 FIXME: outdated, rewrite when usable 

206 

207 A CredentialsMap, for any URI template and operation, which 

208 security contexts are sufficient to to perform the operation on a matching 

209 URI. 

210 

211 The same context can be used both by the server and the client, where the 

212 client uses the information on allowed client credentials to decide which 

213 credentials to present, and the information on allowed server credentials 

214 to decide whether the server can be trusted. 

215 

216 Conversely, the server typically loads all available server credentials at 

217 startup, and then uses the client credentials list to decide whether to 

218 serve the request.""" 

219 

220 def load_from_dict(self, d): 

221 """Populate the map from a dictionary, which would typically have been 

222 loaded from a JSON/YAML file and needs to match the CDDL in 

223 credentials.cddl. 

224 

225 Running this multiple times will overwriter individual entries in the 

226 map.""" 

227 for k, v in d.items(): 

228 if v is None: 

229 if k in self: 

230 del self[k] 

231 else: 

232 self[k] = self._item_from_dict(v) 

233 # FIXME only works that way for OSCORE clients 

234 self[k].authenticated_claims = [k] 

235 

236 def _item_from_dict(self, v): 

237 if isinstance(v, str): 

238 return CredentialReference(v, self) 

239 elif isinstance(v, dict): 

240 try: 

241 (key, value), = v.items() 

242 except ValueError: 

243 # this follows how Rust Enums are encoded in serde JSON 

244 raise CredentialsLoadError( 

245 "Items in a credentials map must have exactly one key" 

246 " (found %s)" % ("," .join(v.keys()) or "empty") 

247 ) 

248 

249 try: 

250 constructor = self._class_map[key].from_item 

251 except KeyError: 

252 raise CredentialsLoadError("Unknown credential type: %s" % key) 

253 

254 return constructor(value) 

255 

256 _class_map = { 

257 'dtls': DTLS, 

258 'oscore': construct_oscore, 

259 'tlscert': TLSCert, 

260 'any-of': AnyOf, 

261 'all-of': AllOf, 

262 } 

263 

264 @staticmethod 

265 def _wildcard_match(searchterm, pattern): 

266 if pattern not in _re_cache: 

267 _re_cache[pattern] = re.compile(re.escape(pattern).replace('\\*', '.*')) 

268 return _re_cache[pattern].fullmatch(searchterm) is not None 

269 

270 # used by a client 

271 

272 def credentials_from_request(self, msg): 

273 """Return the most specific match to a request message. Matching is 

274 currently based on wildcards, but not yet very well thought out.""" 

275 

276 uri = msg.get_request_uri() 

277 

278 for i in range(1000): 

279 for (k, v) in sorted(self.items(), key=lambda x: len(x[0]), reverse=True): 

280 if self._wildcard_match(uri, k): 

281 if isinstance(v, str): 

282 uri = v 

283 continue 

284 return v 

285 else: 

286 raise CredentialsMissingError("No suitable credentials for %s" % uri) 

287 else: 

288 raise CredentialsLoadError("Search for suitable credentials for %s exceeds recursion limit") 

289 

290 def ssl_client_context(self, scheme, hostinfo): 

291 """Return an SSL client context as configured for the given request 

292 scheme and hostinfo (no full message is to be processed here, as 

293 connections are used across requests to the same origin). 

294 

295 If no credentials are configured, this returns None (for which the user 

296 may need to fill in ssl.create_default_context() if None is not already 

297 a good indicator for the eventual consumer to use the default).""" 

298 

299 ssl_params = {} 

300 tlscert = self.get('%s://%s/*' % (scheme, hostinfo), None) 

301 if tlscert is None: 

302 tlscert = self.get('%s://*' % scheme, None) 

303 if tlscert is not None: 

304 ssl_params = tlscert.as_ssl_params() 

305 if ssl_params: 

306 import ssl 

307 return ssl.create_default_context(**ssl_params) 

308 

309 # used by a server 

310 

311 def find_oscore(self, unprotected): 

312 # FIXME: this is not constant-time as it should be, but too much in 

313 # flux to warrant optimization 

314 

315 # FIXME: duplicate contexts for being tried out are not supported yet. 

316 

317 for item in self.values(): 

318 if not hasattr(item, "get_oscore_context_for"): 

319 continue 

320 

321 ctx = item.get_oscore_context_for(unprotected) 

322 if ctx is not None: 

323 return ctx 

324 

325 raise KeyError() 

326 

327 def find_dtls_psk(self, identity): 

328 # FIXME similar to find_oscore 

329 for (entry, item) in self.items(): 

330 if not hasattr(item, "as_dtls_psk"): 

331 continue 

332 

333 psk_id, psk = item.as_dtls_psk() 

334 if psk_id != identity: 

335 continue 

336 

337 # FIXME is returning the entry name a sane value to later put in to 

338 # authenticated_claims? OSCORE does something different. 

339 return (psk, entry) 

340 

341 raise KeyError()