Coverage for aiocoap/proxy/server.py: 58%

241 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-10 11:47 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Basic implementation of CoAP-CoAP proxying 

6 

7This is work in progress and not yet part of the API.""" 

8 

9import asyncio 

10import functools 

11import ipaddress 

12import logging 

13import warnings 

14 

15from .. import numbers, interfaces, message, error, util, resource 

16from ..numbers import codes 

17from ..blockwise import Block1Spool, Block2Cache 

18from ..pipe import Pipe 

19 

20 

21class CanNotRedirect(error.ConstructionRenderableError): 

22 message = "Proxy redirection failed" 

23 

24 

25class NoUriSplitting(CanNotRedirect): 

26 code = codes.NOT_IMPLEMENTED 

27 message = "URI splitting not implemented, please use Proxy-Scheme." 

28 

29 

30class IncompleteProxyUri(CanNotRedirect): 

31 code = codes.BAD_REQUEST 

32 message = "Proxying requires Proxy-Scheme and Uri-Host" 

33 

34 

35class NotAForwardProxy(CanNotRedirect): 

36 code = codes.PROXYING_NOT_SUPPORTED 

37 message = "This is a reverse proxy, not a forward one." 

38 

39 

40class NoSuchHostname(CanNotRedirect): 

41 code = codes.NOT_FOUND 

42 message = "" 

43 

44 

45class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect): 

46 code = codes.BAD_OPTION 

47 

48 def __init__(self, options): 

49 self.message = "Unsafe options in request: %s" % ( 

50 ", ".join(str(o.number) for o in options) 

51 ) 

52 

53 

54def raise_unless_safe(request, known_options): 

55 """Raise a BAD_OPTION CanNotRedirect unless all options in request are 

56 safe to forward or known""" 

57 

58 known_options = set(known_options).union( 

59 { 

60 # it is expected that every proxy is aware of these options even though 

61 # one of them often doesn't need touching 

62 numbers.OptionNumber.URI_HOST, 

63 numbers.OptionNumber.URI_PORT, 

64 numbers.OptionNumber.URI_PATH, 

65 numbers.OptionNumber.URI_QUERY, 

66 # handled by the Context 

67 numbers.OptionNumber.BLOCK1, 

68 numbers.OptionNumber.BLOCK2, 

69 # handled by the proxy resource 

70 numbers.OptionNumber.OBSERVE, 

71 } 

72 ) 

73 

74 unsafe_options = [ 

75 o 

76 for o in request.opt.option_list() 

77 if o.number.is_unsafe() and o.number not in known_options 

78 ] 

79 if unsafe_options: 

80 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options) 

81 

82 

83class Proxy(interfaces.Resource): 

84 # other than in special cases, we're trying to be transparent wrt blockwise transfers 

85 interpret_block_options = False 

86 

87 def __init__(self, outgoing_context, logger=None): 

88 super().__init__() 

89 # Provide variables for render_to_pipe 

90 # FIXME this is copied from aiocoap.resource's __init__ -- but on the 

91 # long run proxying shouldn't rely on that anyway but implement 

92 # render_to_pipe right on its own 

93 self._block1 = Block1Spool() 

94 self._block2 = Block2Cache() 

95 

96 self.outgoing_context = outgoing_context 

97 self.log = logger or logging.getLogger("proxy") 

98 

99 self._redirectors = [] 

100 

101 def add_redirector(self, redirector): 

102 self._redirectors.append(redirector) 

103 

104 def apply_redirection(self, request): 

105 for r in self._redirectors: 

106 result = r.apply_redirection(request) 

107 if result is not None: 

108 return result 

109 return None 

110 

111 async def needs_blockwise_assembly(self, request): 

112 return self.interpret_block_options 

113 

114 async def render(self, request): 

115 # FIXME i'd rather let the application do with the message whatever it 

116 # wants. everything the responder needs of the request should be 

117 # extracted beforehand. 

118 request = request.copy(mid=None, token=None) 

119 

120 try: 

121 request = self.apply_redirection(request) 

122 except CanNotRedirect as e: 

123 return e.to_message() 

124 

125 if request is None: 

126 response = await super().render(request) 

127 if response is None: 

128 raise IncompleteProxyUri("No matching proxy rule") 

129 return response 

130 

131 try: 

132 response = await self.outgoing_context.request( 

133 request, handle_blockwise=self.interpret_block_options 

134 ).response 

135 except error.TimeoutError: 

136 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT) 

137 

138 raise_unless_safe(response, ()) 

139 

140 response.mtype = None 

141 response.mid = None 

142 response.remote = None 

143 response.token = None 

144 

145 return response 

146 

147 # Not inheriting from them because we do *not* want the .render() in the 

148 # resolution tree (it can't deal with None requests, which are used among 

149 # proxy implementations) 

150 async def render_to_pipe(self, pipe: Pipe) -> None: 

151 await resource.Resource.render_to_pipe(self, pipe) # type: ignore 

152 

153 

154class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource): 

155 def __init__(self, outgoing_context, logger=None): 

156 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger) 

157 

158 self._outgoing_observations = {} 

159 

160 @staticmethod 

161 def _cache_key(request): 

162 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE]) 

163 

164 def _peek_observation_for(self, request): 

165 """Return the augmented request (see _get_obervation_for) towards a 

166 resource, or raise KeyError""" 

167 cachekey = self._cache_key(request) 

168 

169 return self._outgoing_observations[cachekey] 

170 

171 def _get_observation_for(self, request): 

172 """Return an existing augmented request towards a resource or create one. 

173 

174 An augmented request is an observation request that has some additional 

175 properties (__users, __cachekey, __latest_response), which are used in 

176 ProxyWithPooledObservations to immediately serve responses from 

177 observed resources, and to tear the observations down again.""" 

178 

179 # see ProxiedResource.render 

180 request = request.copy(mid=None, remote=None, token=None) 

181 request = self.apply_redirection(request) 

182 

183 cachekey = self._cache_key(request) 

184 

185 try: 

186 obs = self._outgoing_observations[cachekey] 

187 except KeyError: 

188 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request( 

189 request 

190 ) 

191 obs.__users = set() 

192 obs.__cachekey = cachekey 

193 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive. 

194 

195 def when_first_request_done(result, obs=obs): 

196 obs.__latest_response = result.result() 

197 

198 obs.response.add_done_callback(when_first_request_done) 

199 

200 def cb(incoming_message, obs=obs): 

201 self.log.info( 

202 "Received incoming message %r, relaying it to %d clients", 

203 incoming_message, 

204 len(obs.__users), 

205 ) 

206 obs.__latest_response = incoming_message 

207 for observationserver in set(obs.__users): 

208 observationserver.trigger(incoming_message.copy()) 

209 

210 obs.observation.register_callback(cb) 

211 

212 def eb(exception, obs=obs): 

213 if obs.__users: 

214 code = numbers.codes.INTERNAL_SERVER_ERROR 

215 payload = b"" 

216 if isinstance(exception, error.RenderableError): 

217 code = exception.code 

218 payload = exception.message.encode("ascii") 

219 self.log.debug( 

220 "Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.", 

221 exception, 

222 code, 

223 payload, 

224 ) 

225 for u in list(obs.__users): 

226 u.trigger(message.Message(code=code, payload=payload)) 

227 if obs.__users: 

228 self.log.error( 

229 "Observations survived sending them an error message." 

230 ) 

231 else: 

232 self.log.debug( 

233 "Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.", 

234 exception, 

235 ) 

236 

237 obs.observation.register_errback(eb) 

238 

239 return obs 

240 

241 def _add_observation_user(self, clientobservationrequest, serverobservation): 

242 clientobservationrequest.__users.add(serverobservation) 

243 

244 def _remove_observation_user(self, clientobservationrequest, serverobservation): 

245 clientobservationrequest.__users.remove(serverobservation) 

246 # give the request that just cancelled time to be dealt with before 

247 # dropping the __latest_response 

248 asyncio.get_event_loop().call_soon( 

249 self._consider_dropping, clientobservationrequest 

250 ) 

251 

252 def _consider_dropping(self, clientobservationrequest): 

253 if not clientobservationrequest.__users: 

254 self.log.debug( 

255 "Last client of observation went away, deregistering with server." 

256 ) 

257 self._outgoing_observations.pop(clientobservationrequest.__cachekey) 

258 if not clientobservationrequest.observation.cancelled: 

259 clientobservationrequest.observation.cancel() 

260 

261 async def add_observation(self, request, serverobservation): 

262 """As ProxiedResource is intended to be just the proxy's interface 

263 toward the Context, accepting observations is handled here, where the 

264 observations handling can be defined by the subclasses.""" 

265 

266 try: 

267 clientobservationrequest = self._get_observation_for(request) 

268 except CanNotRedirect: 

269 pass # just don't accept the observation, the rest will be taken care of at rendering 

270 else: 

271 self._add_observation_user(clientobservationrequest, serverobservation) 

272 serverobservation.accept( 

273 functools.partial( 

274 self._remove_observation_user, 

275 clientobservationrequest, 

276 serverobservation, 

277 ) 

278 ) 

279 

280 async def render(self, request): 

281 # FIXME this is evaulated twice in the implementation (once here, but 

282 # unless it's an observation what matters is inside the super call), 

283 # maybe this needs to hook in differently than by subclassing and 

284 # calling super. 

285 self.log.info("render called") 

286 redirected_request = request.copy() 

287 

288 try: 

289 redirected_request = self.apply_redirection(redirected_request) 

290 if redirected_request is None: 

291 return await super().render(request) 

292 clientobservationrequest = self._peek_observation_for(redirected_request) 

293 except (KeyError, CanNotRedirect) as e: 

294 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None: 

295 self.log.warning( 

296 "No matching observation found: request is %r (cache key %r), outgoing observations %r", 

297 redirected_request, 

298 self._cache_key(redirected_request), 

299 self._outgoing_observations, 

300 ) 

301 

302 return message.Message( 

303 code=numbers.codes.BAD_OPTION, 

304 payload="Observe option can not be proxied without active observation.".encode( 

305 "utf8" 

306 ), 

307 ) 

308 self.log.debug( 

309 "Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms." 

310 ) 

311 return await super(ProxyWithPooledObservations, self).render(request) 

312 else: 

313 self.log.info( 

314 "Serving request using latest cached response of %r", 

315 clientobservationrequest, 

316 ) 

317 await clientobservationrequest.response 

318 cached_response = clientobservationrequest.__latest_response 

319 cached_response.mid = None 

320 cached_response.token = None 

321 cached_response.remote = None 

322 cached_response.mtype = None 

323 return cached_response 

324 

325 

326class ForwardProxy(Proxy): 

327 def apply_redirection(self, request): 

328 request = request.copy() 

329 if request.opt.proxy_uri is not None: 

330 raise NoUriSplitting 

331 if request.opt.proxy_scheme is None: 

332 return super().apply_redirection(request) 

333 if request.opt.uri_host is None: 

334 raise IncompleteProxyUri 

335 

336 raise_unless_safe( 

337 request, 

338 ( 

339 numbers.OptionNumber.PROXY_SCHEME, 

340 numbers.OptionNumber.URI_HOST, 

341 numbers.OptionNumber.URI_PORT, 

342 ), 

343 ) 

344 

345 request.remote = message.UndecidedRemote( 

346 request.opt.proxy_scheme, 

347 util.hostportjoin(request.opt.uri_host, request.opt.uri_port), 

348 ) 

349 request.opt.proxy_scheme = None 

350 request.opt.uri_port = None 

351 forward_host = request.opt.uri_host 

352 try: 

353 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to 

354 # offer any other choice 

355 ipaddress.ip_address(request.opt.uri_host) 

356 

357 warnings.warn( 

358 "URI-Host looks like IPv6 but has no square " 

359 "brackets. This is deprecated, see " 

360 "https://github.com/chrysn/aiocoap/issues/216", 

361 DeprecationWarning, 

362 ) 

363 except ValueError: 

364 pass 

365 else: 

366 request.opt.uri_host = None 

367 if forward_host.startswith("["): 

368 # IPv6 or future literals are not recognized by ipaddress which 

369 # does not look at host-encoded form 

370 request.opt.uri_host = None 

371 

372 # Maybe the URI-Host matches a known forwarding -- in that case, catch that. 

373 redirected = super(ForwardProxy, self).apply_redirection(request) 

374 if redirected is not None: 

375 return redirected 

376 

377 return request 

378 

379 

380class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations): 

381 pass 

382 

383 

384class ReverseProxy(Proxy): 

385 def __init__(self, *args, **kwargs): 

386 import warnings 

387 

388 warnings.warn( 

389 "ReverseProxy has become moot due to proxy operation " 

390 "changes, just instanciate Proxy and set the appropriate " 

391 "redirectors", 

392 DeprecationWarning, 

393 stacklevel=1, 

394 ) 

395 super().__init__(*args, **kwargs) 

396 

397 

398class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations): 

399 pass 

400 

401 

402class Redirector: 

403 def apply_redirection(self, request): 

404 return None 

405 

406 

407class NameBasedVirtualHost(Redirector): 

408 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False): 

409 self.match_name = match_name 

410 self.target = target 

411 self.rewrite_uri_host = rewrite_uri_host 

412 self.use_as_proxy = use_as_proxy 

413 

414 def apply_redirection(self, request): 

415 raise_unless_safe(request, ()) 

416 

417 if self._matches(request.opt.uri_host): 

418 if self.use_as_proxy: 

419 request.opt.proxy_scheme = request.remote.scheme 

420 if self.rewrite_uri_host: 

421 request.opt.uri_host, _ = util.hostportsplit(self.target) 

422 request.unresolved_remote = self.target 

423 return request 

424 

425 def _matches(self, hostname): 

426 return hostname == self.match_name 

427 

428 

429class SubdomainVirtualHost(NameBasedVirtualHost): 

430 def __init__(self, *args, **kwargs): 

431 super().__init__(*args, **kwargs) 

432 if self.rewrite_uri_host: 

433 raise TypeError( 

434 "rewrite_uri_host makes no sense with subdomain virtual hosting" 

435 ) 

436 

437 def _matches(self, hostname): 

438 return hostname.endswith("." + self.match_name) 

439 

440 

441class UnconditionalRedirector(Redirector): 

442 def __init__(self, target, use_as_proxy=False): 

443 self.target = target 

444 self.use_as_proxy = use_as_proxy 

445 

446 def apply_redirection(self, request): 

447 raise_unless_safe(request, ()) 

448 

449 if self.use_as_proxy: 

450 request.opt.proxy_scheme = request.remote.scheme 

451 request.unresolved_remote = self.target 

452 return request 

453 

454 

455class SubresourceVirtualHost(Redirector): 

456 def __init__(self, path, target): 

457 self.path = tuple(path) 

458 self.target = target 

459 

460 def apply_redirection(self, request): 

461 raise_unless_safe(request, ()) 

462 

463 if self.path == request.opt.uri_path[: len(self.path)]: 

464 request.opt.uri_path = request.opt.uri_path[len(self.path) :] 

465 request.unresolved_remote = self.target 

466 return request