Coverage for aiocoap/tokenmanager.py: 89%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

114 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9import functools 

10import random 

11 

12from . import error 

13from . import interfaces 

14# To be used sparingly here: This deals with request / responses on the token 

15# layer. But the layer below won't even know that messages are responses, so it 

16# can't make the informed decisions we make here. 

17from .numbers.types import NON 

18from .plumbingrequest import PlumbingRequest 

19 

20class TokenManager(interfaces.RequestInterface, interfaces.TokenManager): 

21 

22 def __init__(self, context): 

23 self.context = context 

24 

25 self._token = random.randint(0, 65535) 

26 self.outgoing_requests = {} 

27 """Unfinished outgoing requests (identified by token and remote)""" 

28 self.incoming_requests = {} 

29 """Unfinished incoming requests. 

30 

31 ``(token, remote): (PlumbingRequest, stopper)`` where stopper is a 

32 function unregistes the PlumbingRequest event handler and thus 

33 indicates to the server the discontinued interest""" 

34 

35 self.log = self.context.log 

36 self.loop = self.context.loop 

37 

38 #self.token_interface = … -- needs to be set post-construction, because the token_interface in its constructor already needs to get its manager 

39 

40 def __repr__(self): 

41 return '<%s for %s>' % (type(self).__name__, getattr(self, 'token_interface', '(unbound)')) 

42 

43 @property 

44 def client_credentials(self): 

45 return self.context.client_credentials 

46 

47 async def shutdown(self): 

48 while self.incoming_requests: 

49 key = next(iter(self.incoming_requests.keys())) 

50 (pr, pr_stop) = self.incoming_requests.pop(key) 

51 # This cancels them, not sending anything. 

52 # 

53 # FIXME should we? (RST? 5.00 Server Shutdown? An RST would only 

54 # work if we pushed this further down the shutdown chain; a 5.00 we 

55 # could raise in the task.) 

56 pr_stop() 

57 self.incoming_requests = None 

58 

59 while self.outgoing_requests: 

60 key = next(iter(self.outgoing_requests.keys())) 

61 request = self.outgoing_requests.pop(key) 

62 request.add_exception(error.LibraryShutdown()) 

63 self.outgoing_requests = None 

64 

65 await self.token_interface.shutdown() 

66 

67 def next_token(self): 

68 """Reserve and return a new Token for request.""" 

69 #TODO: add proper Token handling 

70 self._token = (self._token + 1) % (2 ** 64) 

71 return self._token.to_bytes(8, 'big').lstrip(b'\0') 

72 

73 # 

74 # implement the tokenmanager interface 

75 # 

76 

77 def dispatch_error(self, exception, remote): 

78 if self.outgoing_requests is None: 

79 # Not entirely sure where it is so far; better just raise a warning 

80 # than an exception later, nothing terminally bad should come of 

81 # this error. 

82 self.log.warning("Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutown") 

83 return 

84 

85 # NetworkError is what we promise users to raise from request etc; if 

86 # it's already a NetworkError and possibly more descriptive (eg. a 

87 # TimeoutError), we'll just let it through (and thus allow 

88 # differentiated handling eg. in application-level retries). 

89 if not isinstance(exception, error.NetworkError): 

90 cause = exception 

91 exception = error.NetworkError(str(exception)) 

92 exception.__cause__ = cause 

93 

94 # The stopping calls would pop items from the pending requests -- 

95 # iterating once, extracting the stoppers and then calling them en 

96 # batch 

97 stoppers = [] 

98 for key, request in self.outgoing_requests.items(): 

99 (token, request_remote) = key 

100 if request_remote == remote: 

101 stoppers.append(lambda request=request, exception=exception: request.add_exception(exception)) 

102 

103 for ((_, _r), (_, stopper)) in self.incoming_requests.items(): 

104 if remote == _r: 

105 stoppers.append(stopper) 

106 for stopper in stoppers: 

107 stopper() 

108 

109 def process_request(self, request): 

110 key = (request.token, request.remote) 

111 

112 if key in self.incoming_requests: 

113 # This is either a "I consider that token invalid, probably forgot 

114 # about it, but here's a new request" or renewed interest in an 

115 # observation, which gets modelled as a new request at thislevel 

116 self.log.debug("Incoming request overrides existing request") 

117 # Popping: FIXME Decide if one of them is sufficient (see `del self.incoming_requests[key]` below) 

118 (pr, pr_stop) = self.incoming_requests.pop(key) 

119 pr_stop() 

120 

121 pr = PlumbingRequest(request, self.log) 

122 

123 # FIXME: what can we pass down to the token_interface? certainly not 

124 # the request, but maybe the request with a response filter applied? 

125 def on_event(ev): 

126 if ev.message is not None: 

127 m = ev.message 

128 # FIXME: should this code warn if token or remote are set? 

129 m.token = request.token 

130 m.remote = request.remote.as_response_address() 

131 

132 if m.mtype is None and request.mtype is NON: 

133 # Default to sending NON to NON requests; rely on the 

134 # default (CON if stand-alone else ACK) otherwise. 

135 m.mtype = NON 

136 self.token_interface.send_message( 

137 m, 

138 # No more interest from *that* remote; as it's the only 

139 # thing keeping the PR alive, it'll go its course of 

140 # vanishing for lack of interest (as it would if 

141 # pr_stop were called from its other possible caller, 

142 # the start of process_request when a new request comes 

143 # in on the same token) 

144 pr_stop, 

145 ) 

146 else: 

147 self.log.error("Requests shouldn't receive errors at the level of a TokenManager any more, but this did: %s", ev) 

148 if not ev.is_last: 

149 return True 

150 def on_end(): 

151 if key in self.incoming_requests: 

152 # It may not be, especially if it was popped in `(pr, pr_stop) = self.incoming_requests.pop(keyu)` above 

153 # FIXME Decide if one of them is sufficient 

154 del self.incoming_requests[key] 

155 # no further cleanup to do here: any piggybackable ack was already flushed 

156 # out by the first response, and if there was not even a 

157 # NoResponse, something went wrong above (and we can't tell easily 

158 # here). 

159 pr_stop = pr.on_event(on_event) 

160 pr.on_interest_end(on_end) 

161 

162 self.incoming_requests[key] = (pr, pr_stop) 

163 

164 self.context.render_to_plumbing_request(pr) 

165 

166 def process_response(self, response): 

167 key = (response.token, response.remote) 

168 if key not in self.outgoing_requests: 

169 # maybe it was a multicast... 

170 key = (response.token, None) 

171 

172 try: 

173 request = self.outgoing_requests[key] 

174 except KeyError: 

175 self.log.info("Response %r could not be matched to any request", response) 

176 return False 

177 else: 

178 self.log.debug("Response %r matched to request %r", response, request) 

179 

180 # FIXME: there's a multicast aspect to that as well 

181 # 

182 # Is it necessary to look into .opt.observe here, wouldn't that better 

183 # be done by the higher-level code that knows about CoAP options? 

184 # Maybe, but at some point in TokenManager we *have* to look into the 

185 # options to see whether to expect a short- or long-running token. 

186 # Still, it would be an option not to send an is_last here and *always* 

187 # have the higher-level code indicate loss of interest in that exchange 

188 # when it detects that no more observations will follow. 

189 final = not (request.request.opt.observe == 0 and response.opt.observe is not None) 

190 

191 if final: 

192 self.outgoing_requests.pop(key) 

193 

194 request.add_response(response, is_last=final) 

195 return True 

196 

197 # 

198 # implement RequestInterface 

199 # 

200 

201 async def fill_or_recognize_remote(self, message): 

202 return await self.token_interface.fill_or_recognize_remote(message) 

203 

204 def request(self, request): 

205 msg = request.request 

206 

207 assert msg.code.is_request(), "Message code is not valid for request" 

208 

209 # This might easily change, but right now, relying on the Context to 

210 # fill_remote early makes steps easier here. 

211 assert msg.remote is not None, "Remote not pre-populated" 

212 

213 # FIXME: pick a suitably short one where available, and a longer one 

214 # for observations if many short ones are already in-flight 

215 msg.token = self.next_token() 

216 

217 self.log.debug("Sending request - Token: %s, Remote: %s", msg.token.hex(), msg.remote) 

218 

219 try: 

220 send_canceller = self.token_interface.send_message(msg, lambda: request.add_exception(error.MessageError)) 

221 except Exception as e: 

222 request.add_exception(e) 

223 return 

224 

225 if send_canceller is not None: 

226 # This needs to be called both when the requester cancels the 

227 # request, and when a response to the CON request comes in via a 

228 # different CON when the original ACK was lost, so the retransmits 

229 # can stop. 

230 # 

231 # FIXME: This might need a little sharper conditions: A fresh CON 

232 # should be sufficient to stop retransmits of a CON in a first 

233 # request, but when refreshing an observation, only an ACK tells us 

234 # that the updated observation got through. Also, multicast needs 

235 # to be an exception, but that generally needs handling here. 

236 # 

237 # It may be that it'd be wise to reduce the use of send_canceller 

238 # to situations when the request is actualy cancelled, and pass 

239 # some information to the token_interface about whether it should 

240 # keep an eye out for responses on that token and cancel 

241 # transmission accordingly. 

242 request.on_event(lambda ev: (send_canceller(), False)[1], 

243 is_interest=False) 

244 

245 # A request sent over the multicast interface will only return a single 

246 # response and otherwise behave quite like an anycast request (which is 

247 # probably intended). 

248 if msg.remote.is_multicast: 

249 self.log.warning("Sending request to multicast via unicast request method") 

250 key = (msg.token, None) 

251 else: 

252 key = (msg.token, msg.remote) 

253 self.outgoing_requests[key] = request 

254 request.on_interest_end(functools.partial(self.outgoing_requests.pop, key, None)) 

255 

256''' 

257 def multicast_request(self, request): 

258 return MulticastRequest(self, request).responses 

259'''