Coverage for aiocoap/proxy/server.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

232 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9"""Basic implementation of CoAP-CoAP proxying 

10 

11This is work in progress and not yet part of the API.""" 

12 

13import asyncio 

14import functools 

15import ipaddress 

16import logging 

17 

18from .. import numbers, interfaces, message, error, util 

19 

20class CanNotRedirect(error.ConstructionRenderableError): 

21 message = "Proxy redirection failed" 

22 

23class NoUriSplitting(CanNotRedirect): 

24 code = numbers.codes.NOT_IMPLEMENTED 

25 message = "URI splitting not implemented, please use Proxy-Scheme." 

26 

27class IncompleteProxyUri(CanNotRedirect): 

28 code = numbers.codes.BAD_REQUEST 

29 message = "Proxying requires Proxy-Scheme and Uri-Host" 

30 

31class NotAForwardProxy(CanNotRedirect): 

32 code = numbers.codes.PROXYING_NOT_SUPPORTED 

33 message = "This is a reverse proxy, not a forward one." 

34 

35class NoSuchHostname(CanNotRedirect): 

36 code = numbers.codes.NOT_FOUND 

37 message = "" 

38 

39class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect): 

40 code = numbers.codes.BAD_OPTION 

41 

42 def __init__(self, options): 

43 self.message = "Unsafe options in request: %s"%(", ".join(str(o.number) for o in options)) 

44 

45def raise_unless_safe(request, known_options): 

46 """Raise a BAD_OPTION CanNotRedirect unless all options in request are 

47 safe to forward or known""" 

48 

49 known_options = set(known_options).union({ 

50 # it is expected that every proxy is aware of these options even though 

51 # one of them often doesn't need touching 

52 numbers.OptionNumber.URI_HOST, 

53 numbers.OptionNumber.URI_PORT, 

54 numbers.OptionNumber.URI_PATH, 

55 numbers.OptionNumber.URI_QUERY, 

56 # handled by the Context 

57 numbers.OptionNumber.BLOCK1, 

58 numbers.OptionNumber.BLOCK2, 

59 # handled by the proxy resource 

60 numbers.OptionNumber.OBSERVE, 

61 }) 

62 

63 unsafe_options = [o for o in request.opt.option_list() if o.number.is_unsafe() and o.number not in known_options] 

64 if unsafe_options: 

65 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options) 

66 

67class Proxy(interfaces.Resource): 

68 # other than in special cases, we're trying to be transparent wrt blockwise transfers 

69 interpret_block_options = False 

70 

71 def __init__(self, outgoing_context, logger=None): 

72 super().__init__() 

73 self.outgoing_context = outgoing_context 

74 self.log = logger or logging.getLogger('proxy') 

75 

76 self._redirectors = [] 

77 

78 def add_redirector(self, redirector): 

79 self._redirectors.append(redirector) 

80 

81 def apply_redirection(self, request): 

82 for r in self._redirectors: 

83 result = r.apply_redirection(request) 

84 if result is not None: 

85 return result 

86 return None 

87 

88 async def needs_blockwise_assembly(self, request): 

89 return self.interpret_block_options 

90 

91 async def render(self, request): 

92 # FIXME i'd rather let the application do with the message whatever it 

93 # wants. everything the responder needs of the request should be 

94 # extracted beforehand. 

95 request = request.copy(mid=None, token=None) 

96 

97 try: 

98 request = self.apply_redirection(request) 

99 except CanNotRedirect as e: 

100 return e.to_message() 

101 

102 if request is None: 

103 response = await super().render(request) 

104 if response is None: 

105 raise IncompleteProxyUri("No matching proxy rule") 

106 return response 

107 

108 try: 

109 response = await self.outgoing_context.request(request, handle_blockwise=self.interpret_block_options).response 

110 except error.TimeoutError: 

111 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT) 

112 

113 raise_unless_safe(response, ()) 

114 

115 response.mtype = None 

116 response.mid = None 

117 response.remote = None 

118 response.token = None 

119 

120 return response 

121 

122class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource): 

123 def __init__(self, outgoing_context, logger=None): 

124 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger) 

125 

126 self._outgoing_observations = {} 

127 

128 @staticmethod 

129 def _cache_key(request): 

130 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE]) 

131 

132 def _peek_observation_for(self, request): 

133 """Return the augmented request (see _get_obervation_for) towards a 

134 resource, or raise KeyError""" 

135 cachekey = self._cache_key(request) 

136 

137 return self._outgoing_observations[cachekey] 

138 

139 def _get_observation_for(self, request): 

140 """Return an existing augmented request towards a resource or create one. 

141 

142 An augmented request is an observation request that has some additional 

143 properties (__users, __cachekey, __latest_response), which are used in 

144 ProxyWithPooledObservations to immediately serve responses from 

145 observed resources, and to tear the observations down again.""" 

146 

147 # see ProxiedResource.render 

148 request = request.copy(mid=None, remote=None, token=None) 

149 request = self.apply_redirection(request) 

150 

151 cachekey = self._cache_key(request) 

152 

153 try: 

154 obs = self._outgoing_observations[cachekey] 

155 except KeyError: 

156 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request(request) 

157 obs.__users = set() 

158 obs.__cachekey = cachekey 

159 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive. 

160 

161 def when_first_request_done(result, obs=obs): 

162 obs.__latest_response = result.result() 

163 obs.response.add_done_callback(when_first_request_done) 

164 

165 def cb(incoming_message, obs=obs): 

166 self.log.info("Received incoming message %r, relaying it to %d clients", incoming_message, len(obs.__users)) 

167 obs.__latest_response = incoming_message 

168 for observationserver in set(obs.__users): 

169 observationserver.trigger(incoming_message.copy()) 

170 obs.observation.register_callback(cb) 

171 def eb(exception, obs=obs): 

172 if obs.__users: 

173 code = numbers.codes.INTERNAL_SERVER_ERROR 

174 payload = b"" 

175 if isinstance(exception, error.RenderableError): 

176 code = exception.code 

177 payload = exception.message.encode('ascii') 

178 self.log.debug("Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.", exception, code, payload) 

179 for u in list(obs.__users): 

180 u.trigger(message.Message(code=code, payload=payload)) 

181 if obs.__users: 

182 self.log.error("Observations survived sending them an error message.") 

183 else: 

184 self.log.debug("Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.", exception) 

185 obs.observation.register_errback(eb) 

186 

187 return obs 

188 

189 def _add_observation_user(self, clientobservationrequest, serverobservation): 

190 clientobservationrequest.__users.add(serverobservation) 

191 

192 def _remove_observation_user(self, clientobservationrequest, serverobservation): 

193 clientobservationrequest.__users.remove(serverobservation) 

194 # give the request that just cancelled time to be dealt with before 

195 # dropping the __latest_response 

196 asyncio.get_event_loop().call_soon(self._consider_dropping, clientobservationrequest) 

197 

198 def _consider_dropping(self, clientobservationrequest): 

199 if not clientobservationrequest.__users: 

200 self.log.debug("Last client of observation went away, deregistering with server.") 

201 self._outgoing_observations.pop(clientobservationrequest.__cachekey) 

202 if not clientobservationrequest.observation.cancelled: 

203 clientobservationrequest.observation.cancel() 

204 

205 async def add_observation(self, request, serverobservation): 

206 """As ProxiedResource is intended to be just the proxy's interface 

207 toward the Context, accepting observations is handled here, where the 

208 observations handling can be defined by the subclasses.""" 

209 

210 try: 

211 clientobservationrequest = self._get_observation_for(request) 

212 except CanNotRedirect: 

213 pass # just don't accept the observation, the rest will be taken care of at rendering 

214 else: 

215 self._add_observation_user(clientobservationrequest, serverobservation) 

216 serverobservation.accept(functools.partial(self._remove_observation_user, clientobservationrequest, serverobservation)) 

217 

218 async def render(self, request): 

219 # FIXME this is evaulated twice in the implementation (once here, but 

220 # unless it's an observation what matters is inside the super call), 

221 # maybe this needs to hook in differently than by subclassing and 

222 # calling super. 

223 self.log.info("render called") 

224 redirected_request = request.copy() 

225 

226 try: 

227 redirected_request = self.apply_redirection(redirected_request) 

228 if redirected_request is None: 

229 return await super().render(request) 

230 clientobservationrequest = self._peek_observation_for(redirected_request) 

231 except (KeyError, CanNotRedirect) as e: 

232 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None: 

233 self.log.warning("No matching observation found: request is %r (cache key %r), outgoing observations %r", redirected_request, self._cache_key(redirected_request), self._outgoing_observations) 

234 

235 return message.Message(code=numbers.codes.BAD_OPTION, payload="Observe option can not be proxied without active observation.".encode('utf8')) 

236 self.log.debug("Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms.") 

237 return await super(ProxyWithPooledObservations, self).render(request) 

238 else: 

239 self.log.info("Serving request using latest cached response of %r", clientobservationrequest) 

240 await clientobservationrequest.response 

241 cached_response = clientobservationrequest.__latest_response 

242 cached_response.mid = None 

243 cached_response.token = None 

244 cached_response.remote = None 

245 cached_response.mtype = None 

246 return cached_response 

247 

248 

249class ForwardProxy(Proxy): 

250 def apply_redirection(self, request): 

251 request = request.copy() 

252 if request.opt.proxy_uri is not None: 

253 raise NoUriSplitting 

254 if request.opt.proxy_scheme is None: 

255 return super().apply_redirection(request) 

256 if request.opt.uri_host is None: 

257 raise IncompleteProxyUri 

258 

259 raise_unless_safe(request, (numbers.OptionNumber.PROXY_SCHEME, numbers.OptionNumber.URI_HOST, numbers.OptionNumber.URI_PORT)) 

260 

261 request.remote = message.UndecidedRemote(request.opt.proxy_scheme, util.hostportjoin(request.opt.uri_host, request.opt.uri_port)) 

262 request.opt.proxy_scheme = None 

263 request.opt.uri_port = None 

264 try: 

265 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to 

266 # offer any other choice 

267 ipaddress.ip_address(request.opt.uri_host) 

268 except ValueError: 

269 pass 

270 else: 

271 request.opt.uri_host = None 

272 if request.opt.uri_host.startswith('['): 

273 # IPv6 or future literals are not recognized by ipaddress which 

274 # does not look at host-encoded form 

275 request.opt.uri_host = None 

276 

277 # Maybe the URI-Host matches a known forwarding -- in that case, catch that. 

278 redirected = super(ForwardProxy, self).apply_redirection(request) 

279 if redirected is not None: 

280 return redirected 

281 

282 return request 

283 

284class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations): 

285 pass 

286 

287class ReverseProxy(Proxy): 

288 def __init__(self, *args, **kwargs): 

289 import warnings 

290 warnings.warn("ReverseProxy has become moot due to proxy operation " 

291 "changes, just instanciate Proxy and set the appropriate " 

292 "redirectors", DeprecationWarning, stacklevel=1) 

293 super().__init__(*args, **kwargs) 

294 

295class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations): 

296 pass 

297 

298class Redirector(): 

299 def apply_redirection(self, request): 

300 return None 

301 

302class NameBasedVirtualHost(Redirector): 

303 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False): 

304 self.match_name = match_name 

305 self.target = target 

306 self.rewrite_uri_host = rewrite_uri_host 

307 self.use_as_proxy = use_as_proxy 

308 

309 def apply_redirection(self, request): 

310 raise_unless_safe(request, ()) 

311 

312 if self._matches(request.opt.uri_host): 

313 if self.use_as_proxy: 

314 request.opt.proxy_scheme = request.remote.scheme 

315 if self.rewrite_uri_host: 

316 request.opt.uri_host, _ = util.hostportsplit(self.target) 

317 request.unresolved_remote = self.target 

318 return request 

319 

320 def _matches(self, hostname): 

321 return hostname == self.match_name 

322 

323class SubdomainVirtualHost(NameBasedVirtualHost): 

324 def __init__(self, *args, **kwargs): 

325 super().__init__(*args, **kwargs) 

326 if self.rewrite_uri_host: 

327 raise TypeError("rewrite_uri_host makes no sense with subdomain virtual hosting") 

328 

329 def _matches(self, hostname): 

330 return hostname.endswith('.' + self.match_name) 

331 

332class UnconditionalRedirector(Redirector): 

333 def __init__(self, target, use_as_proxy=False): 

334 self.target = target 

335 self.use_as_proxy= use_as_proxy 

336 

337 def apply_redirection(self, request): 

338 raise_unless_safe(request, ()) 

339 

340 if self.use_as_proxy: 

341 request.opt.proxy_scheme = request.remote.scheme 

342 request.unresolved_remote = self.target 

343 return request 

344 

345class SubresourceVirtualHost(Redirector): 

346 def __init__(self, path, target): 

347 self.path = tuple(path) 

348 self.target = target 

349 

350 def apply_redirection(self, request): 

351 raise_unless_safe(request, ()) 

352 

353 if self.path == request.opt.uri_path[:len(self.path)]: 

354 request.opt.uri_path = request.opt.uri_path[len(self.path):] 

355 request.unresolved_remote = self.target 

356 return request