Coverage for aiocoap/credentials.py: 77%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

141 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9"""This module describes how security credentials are expressed in aiocoap, 

10how security protocols (TLS, DTLS, OSCOAP) can store and access their key 

11material, and for which URIs they are used. 

12 

13For consistency, mappings between accessible resources and their credentials 

14are always centered around URIs. This is slightly atypical, because a client 

15will typically use a particular set of credentials for all operations on one 

16server, while a server first loads all available credentials and then filters 

17out whether the client may actually access a resource per-path, but it works 

18with full URIs (or patterns thereof) just as well. That approach allows using 

19more similar structures both on the server and the client, and works smoothly 

20for virtual hosting, firewalling and clients accessing resources with varying 

21credentials. 

22 

23Still, client and server credentials are kept apart, lest a server open up (and 

24potentially reveal) to a PSK set it is only configured to use as a client. 

25While client credentials already have their place in 

26:attr:`aiocoap.protocol.Context.client_credentials`, server credentials are not 

27in use at a standardized location yet because there is only code in the OSCORE 

28plug tests that can use it so far. 

29 

30Library developer notes 

31~~~~~~~~~~~~~~~~~~~~~~~ 

32 

33This whole module currently relies on a mixture of introspection and manual 

34parsing of the JSON-ish tree. A preferred expression of the same would rely on 

35the credentials.cddl description and build an object tree from that, but the 

36author is unaware of any existing CDDL Python implementation. That might also 

37ease porting to platforms that don't support inspect like micropython does. 

38""" 

39 

40import re 

41import inspect 

42 

43from typing import Optional 

44 

45 

46''' 

47server: { 

48 'coaps://mysite/*': { 'dtls-psk' (or other granularity): { 'psk': 'abcd' }}, 

49 'coap://mysite/*': { 'oscore': { 'contextfile': 'my-contextfile/' } }, 

50 'coap://myothersite/firmware': ':myotherkey', 

51 'coap://myothersite/reset': ':myotherkey', 

52 'coap://othersite*': { 'unprotected': true }, 

53 ':myotherkey': { 'oscore': { 'contextfile': 'my-contextfile/' } } 

54 } 

55 

56server can of course just say it doesn't want to have the Site handle it and 

57just say '*': { 'unprotected': true }, add some ':foo': {'dtls-psk': ...} 

58entries (so communication can be established in the first place) and let 

59individual resources decide whether they return 4.01 or something else. 

60 

61client can be the same with different implied role, or have something like 

62 

63client: { 

64 'coap://myothersite/*': ':myotherkey', 

65 ... 

66 } 

67 

68in future also 

69 

70server: { 

71 'coaps://mysite/*': { 'dtls-cert': {'key': '...pem', 'cert': '...crt'} } 

72 } 

73 

74client: { 

75 '*': { 'dtls-cert': { 'ca': '/etc/ssl/...' } } 

76} 

77 

78or more complex ones: 

79 

80server: { 

81 'coaps://myothersite/wellprotected': { 'all': [ ':mydtls', ':myotherkey' ]} 

82 'coaps://myothersite/*': { 'any': [ ':mydtls', ':myotherkey' ]} 

83} 

84''' 

85 

86class CredentialsLoadError(ValueError): 

87 """Raised by functions that create a CredentialsMap or its parts from 

88 simple data structures""" 

89 

90class CredentialsMissingError(RuntimeError): 

91 """Raised when no suiting credentials can be found for a message, or 

92 credentials are found but inapplicable to a transport's security 

93 mechanisms.""" 

94 

95class CredentialReference: 

96 def __init__(self, target, map): 

97 if not target.startswith(':'): 

98 raise CredentialsLoadError("Credential references must start with a colon (':')") 

99 self.target = target 

100 self.map = map 

101 

102 # FIXME either generalize this with getattr, or introduce a function to 

103 # resolve any indirect credentials to a particular instance. 

104 

105 def as_dtls_psk(self): 

106 return self.map[self.target].as_dtls_psk() 

107 

108class _Listish(list): 

109 @classmethod 

110 def from_item(cls, v): 

111 if not isinstance(v, list): 

112 raise CredentialsLoadError("%s goes with a list" % cls.__name__) 

113 return cls(v) 

114 

115class AnyOf(_Listish): 

116 pass 

117 

118class AllOf(_Listish): 

119 pass 

120 

121def _call_from_structureddata(constructor, name, init_data): 

122 if not isinstance(init_data, dict): 

123 raise CredentialsLoadError("%s goes with an object" % name) 

124 

125 init_data = {k.replace('-', '_'): v for (k, v) in init_data.items()} 

126 

127 sig = inspect.signature(constructor) 

128 

129 checked_items = {} 

130 

131 for k, v in init_data.items(): 

132 try: 

133 annotation = sig.parameters[k].annotation 

134 except KeyError: 

135 # let this raise later in binding 

136 checked_items[k] = object() 

137 

138 if isinstance(v, dict) and 'ascii' in v: 

139 if len(v) != 1: 

140 raise CredentialsLoadError("ASCII objects can only have one elemnt.") 

141 try: 

142 v = v['ascii'].encode('ascii') 

143 except UnicodeEncodeError: 

144 raise CredentialsLoadError("Elements of the ASCII object can not be represented in ASCII, please use binary or hex representation.") 

145 

146 

147 if isinstance(v, dict) and 'hex' in v: 

148 if len(v) != 1: 

149 raise CredentialsLoadError("Hex objects can only have one elemnt.") 

150 try: 

151 v = bytes.fromhex(v['hex'].replace('-', '').replace(' ', '').replace(':', '')) 

152 except ValueError as e: 

153 raise CredentialsLoadError("Hex object can not be read: %s" % (e.args[0])) 

154 

155 # Not using isinstance because I foundno way to extract the type 

156 # information from an Optional/Union again; this whole thing works 

157 # only for strings and ints anyway, so why not. 

158 if type(v) != annotation and Optional[type(v)] != annotation: 

159 # explicitly not excluding inspect._empty here: constructors 

160 # need to be fully annotated 

161 raise CredentialsLoadError("Type mismatch in attribute %s of %s: expected %s, got %r" % (k, name, annotation, v)) 

162 

163 checked_items[k] = v 

164 

165 try: 

166 bound = sig.bind(**checked_items) 

167 except TypeError as e: 

168 raise CredentialsLoadError("%s: %s" % (name, e.args[0])) 

169 

170 return constructor(*bound.args, **bound.kwargs) 

171 

172class _Objectish: 

173 @classmethod 

174 def from_item(cls, init_data): 

175 return _call_from_structureddata(cls, cls.__name__, init_data) 

176 

177class DTLS(_Objectish): 

178 def __init__(self, psk: bytes, client_identity: bytes): 

179 self.psk = psk 

180 self.client_identity = client_identity 

181 

182 def as_dtls_psk(self): 

183 return (self.client_identity, self.psk) 

184 

185class TLSCert(_Objectish): 

186 """Indicates that a client can use the given certificate file to authenticate the server. 

187 

188 Can only be used with 'coaps+tcp://HOSTINFO/*' and 'coaps+tcp://*' forms. 

189 """ 

190 def __init__(self, certfile: str): 

191 self.certfile = certfile 

192 

193 def as_ssl_params(self): 

194 """Generate parameters suitable for passing via ** to 

195 ssl.create_default_context when purpose is alreay set""" 

196 return {"cafile": self.certfile} 

197 

198def construct_oscore(contextfile: str): 

199 from .oscore import FilesystemSecurityContext 

200 

201 return FilesystemSecurityContext(contextfile) 

202 

203construct_oscore.from_item = lambda value: _call_from_structureddata(construct_oscore, 'OSCORE', value) 

204 

205_re_cache = {} 

206 

207class CredentialsMap(dict): 

208 """ 

209 FIXME: outdated, rewrite when usable 

210 

211 A CredentialsMap, for any URI template and operation, which 

212 security contexts are sufficient to to perform the operation on a matching 

213 URI. 

214 

215 The same context can be used both by the server and the client, where the 

216 client uses the information on allowed client credentials to decide which 

217 credentials to present, and the information on allowed server credentials 

218 to decide whether the server can be trusted. 

219 

220 Conversely, the server typically loads all available server credentials at 

221 startup, and then uses the client credentials list to decide whether to 

222 serve the request.""" 

223 

224 def load_from_dict(self, d): 

225 """Populate the map from a dictionary, which would typically have been 

226 loaded from a JSON/YAML file and needs to match the CDDL in 

227 credentials.cddl. 

228 

229 Running this multiple times will overwriter individual entries in the 

230 map.""" 

231 for k, v in d.items(): 

232 if v is None: 

233 if k in self: 

234 del self[k] 

235 else: 

236 self[k] = self._item_from_dict(v) 

237 # FIXME only works that way for OSCORE clients 

238 self[k].authenticated_claims = [k] 

239 

240 def _item_from_dict(self, v): 

241 if isinstance(v, str): 

242 return CredentialReference(v, self) 

243 elif isinstance(v, dict): 

244 try: 

245 (key, value), = v.items() 

246 except ValueError: 

247 # this follows how Rust Enums are encoded in serde JSON 

248 raise CredentialsLoadError( 

249 "Items in a credentials map must have exactly one key" 

250 " (found %s)" % ("," .join(v.keys()) or "empty") 

251 ) 

252 

253 try: 

254 constructor = self._class_map[key].from_item 

255 except KeyError: 

256 raise CredentialsLoadError("Unknown credential type: %s" % key) 

257 

258 return constructor(value) 

259 

260 _class_map = { 

261 'dtls': DTLS, 

262 'oscore': construct_oscore, 

263 'tlscert': TLSCert, 

264 'any-of': AnyOf, 

265 'all-of': AllOf, 

266 } 

267 

268 @staticmethod 

269 def _wildcard_match(searchterm, pattern): 

270 if pattern not in _re_cache: 

271 _re_cache[pattern] = re.compile(re.escape(pattern).replace('\\*', '.*')) 

272 return _re_cache[pattern].fullmatch(searchterm) is not None 

273 

274 # used by a client 

275 

276 def credentials_from_request(self, msg): 

277 """Return the most specific match to a request message. Matching is 

278 currently based on wildcards, but not yet very well thought out.""" 

279 

280 uri = msg.get_request_uri() 

281 

282 for i in range(1000): 

283 for (k, v) in sorted(self.items(), key=lambda x: len(x[0]), reverse=True): 

284 if self._wildcard_match(uri, k): 

285 if isinstance(v, str): 

286 uri = v 

287 continue 

288 return v 

289 else: 

290 raise CredentialsMissingError("No suitable credentials for %s" % uri) 

291 else: 

292 raise CredentialsLoadError("Search for suitable credentials for %s exceeds recursion limit") 

293 

294 def ssl_client_context(self, scheme, hostinfo): 

295 """Return an SSL client context as configured for the given request 

296 scheme and hostinfo (no full message is to be processed here, as 

297 connections are used across requests to the same origin). 

298 

299 If no credentials are configured, this returns the default SSL client 

300 context.""" 

301 

302 import ssl 

303 

304 ssl_params = {} 

305 tlscert = self.get('%s://%s/*' % (scheme, hostinfo), None) 

306 if tlscert is None: 

307 tlscert = self.get('%s://*' % scheme, None) 

308 if tlscert is not None: 

309 ssl_params = tlscert.as_ssl_params() 

310 return ssl.create_default_context(**ssl_params) 

311 

312 # used by a server 

313 

314 def find_oscore(self, unprotected): 

315 # FIXME: this is not constant-time as it should be, but too much in 

316 # flux to warrant optimization 

317 

318 # FIXME: duplicate contexts for being tried out are not supported yet. 

319 

320 for item in self.values(): 

321 if not hasattr(item, "get_oscore_context_for"): 

322 continue 

323 

324 ctx = item.get_oscore_context_for(unprotected) 

325 if ctx is not None: 

326 return ctx 

327 

328 raise KeyError() 

329 

330 def find_dtls_psk(self, identity): 

331 # FIXME similar to find_oscore 

332 for (entry, item) in self.items(): 

333 if not hasattr(item, "as_dtls_psk"): 

334 continue 

335 

336 psk_id, psk = item.as_dtls_psk() 

337 if psk_id != identity: 

338 continue 

339 

340 # FIXME is returning the entry name a sane value to later put in to 

341 # authenticated_claims? OSCORE does something different. 

342 return (psk, entry) 

343 

344 raise KeyError()