Coverage for aiocoap/messagemanager.py: 84%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

222 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9"""This module contains all internals needed to manage messages on unreliable 

10transports, ie. everything that deals in message types or Message IDs. 

11 

12Currently, it also provides the mechanisms for managing tokens, but those will 

13be split into dedicated classes. 

14""" 

15 

16import asyncio 

17import functools 

18import random 

19from typing import Dict, Tuple, Optional 

20 

21from . import error 

22from . import interfaces 

23from .interfaces import EndpointAddress 

24from .message import Message 

25from .numbers.types import CON, ACK, RST, NON 

26from .numbers.codes import EMPTY 

27from .numbers.constants import (EXCHANGE_LIFETIME, ACK_TIMEOUT, EMPTY_ACK_DELAY, 

28 MAX_RETRANSMIT, ACK_RANDOM_FACTOR) 

29 

30 

31class MessageManager(interfaces.TokenInterface, interfaces.MessageManager): 

32 """This MessageManager Drives a message interface following the rules of 

33 RFC7252 CoAP over UDP. 

34 

35 It takes care of picking message IDs (mid) for outgoing messages, 

36 retransmitting CON messages, and to react appropriately to incoming 

37 messages' type, sending ACKs either immediately or later. 

38 

39 It creates piggy-backed responses by keeping an eye on the tokens the 

40 messages are sent with, but otherwise ignores the tokens. (It inspects 

41 tokens *only* where required by its sub-layer). 

42 """ 

43 

44 def __init__(self, token_manager): 

45 self.token_manager = token_manager 

46 

47 self.message_id = random.randint(0, 65535) 

48 #: Tracker of recently received messages (by remote and message ID). 

49 #: Maps them to a response message when one is already known. 

50 self._recent_messages: Dict[Tuple[EndpointAddress, int], Optional[Message]] = {} 

51 self._active_exchanges = {} #: active exchanges i.e. sent CON messages (remote, message-id): (messageerror_monitor monitor, cancellable timeout) 

52 self._backlogs = {} #: per-remote list of (backlogged package, messageerror_monitor) tupless (keys exist iff there is an active_exchange with that node) 

53 

54 #: Maps pending remote/token combinations to the MID a response can be 

55 #: piggybacked on, and the timeout that should be cancelled if it is. 

56 self._piggyback_opportunities: Dict[Tuple[EndpointAddress, bytes], (int, asyncio.TimerHandle)] = {} 

57 

58 self.log = token_manager.log 

59 self.loop = token_manager.loop 

60 

61 #self.message_interface = … -- needs to be set post-construction, because the message_interface in its constructor already needs to get its manager 

62 

63 def __repr__(self): 

64 return '<%s for %s>' % (type(self).__name__, getattr(self, 'message_interface', '(unbound)')) 

65 

66 @property 

67 def client_credentials(self): 

68 return self.token_manager.client_credentials 

69 

70 async def shutdown(self): 

71 for messageerror_monitor, cancellable in self._active_exchanges.values(): 

72 # Not calling messageerror_monitor: This is not message specific, 

73 # and its shutdown will take care of these things 

74 cancellable.cancel() 

75 self._active_exchanges = None 

76 

77 await self.message_interface.shutdown() 

78 

79 # 

80 # implementing the MessageManager interface 

81 # 

82 

83 def dispatch_message(self, message): 

84 """Feed a message through the message-id, message-type and message-code 

85 sublayers of CoAP""" 

86 

87 self.log.debug("Incoming message %r", message) 

88 if message.code.is_request(): 

89 # Responses don't get deduplication because they "are idempotent or 

90 # can be handled in an idempotent fashion" (RFC 7252 Section 4.5). 

91 # This means that a separate response may get a RST when it is 

92 # arrives at the aiocoap client twice. Note that this does not 

93 # impede the operation of observations: Their token is still active 

94 # so they are ACK'd, and deduplication based on observation numbers 

95 # filters out the rest. 

96 # 

97 # This saves memory, and allows stateful transports to be shut down 

98 # expeditiously unless kept alive by something else (otherwise, 

99 # they'd linger for EXCHANGE_LIFETIME with no good reason). 

100 if self._deduplicate_message(message) is True: 

101 return 

102 

103 if message.mtype in (ACK, RST): 

104 self._remove_exchange(message) 

105 

106 if message.code is EMPTY and message.mtype is CON: 

107 self._process_ping(message) 

108 elif message.code is EMPTY and message.mtype in (ACK, RST): 

109 pass # empty ack has already been handled above 

110 elif message.code.is_request() and message.mtype in (CON, NON): 

111 # the request handler will have to deal with sending ACK itself, as 

112 # it might be timeout-related 

113 self._process_request(message) 

114 elif message.code.is_response() and message.mtype in (CON, NON, ACK): 

115 success = self._process_response(message) 

116 if success: 

117 if message.mtype is CON: 

118 self._send_empty_ack(message.remote, message.mid, reason="acknowledging incoming response") 

119 else: 

120 if message.remote.is_multicast_locally: 

121 self.log.info("Ignoring response incoming with multicast destination.") 

122 else: 

123 self.log.info("Response not recognized - sending RST.") 

124 rst = Message(mtype=RST, mid=message.mid, code=EMPTY, payload='') 

125 rst.remote = message.remote.as_response_address() 

126 self._send_initially(rst) 

127 else: 

128 self.log.warning("Received a message with code %s and type %s (those don't fit) from %s, ignoring it.", message.code, message.mtype, message.remote) 

129 

130 def dispatch_error(self, error, remote): 

131 if self._active_exchanges is None: 

132 # Not entirely sure where it is so far; better just raise a warning 

133 # than an exception later, nothing terminally bad should come of 

134 # this error. 

135 self.log.warning("Internal shutdown sequence msismatch: error dispatched through messagemanager after shutown") 

136 return 

137 

138 self.log.debug("Incoming error %s from %r", error, remote) 

139 

140 # cancel requests first, and then exchanges: cancelling the pending 

141 # exchange would trigger enqueued requests to be transmitted 

142 self.token_manager.dispatch_error(error, remote) 

143 

144 keys_for_removal = [] 

145 for key, (messageerror_monitor, cancellable_timeout) in self._active_exchanges.items(): 

146 (exchange_remote, message_id) = key 

147 if remote == exchange_remote: 

148 cancellable_timeout.cancel() 

149 keys_for_removal.append(key) 

150 for k in keys_for_removal: 

151 self._active_exchanges.pop(k) 

152 

153 # 

154 # coap dispatch, message-id sublayer: duplicate handling 

155 # 

156 

157 def _deduplicate_message(self, message): 

158 """Return True if a message is a duplicate, and re-send the stored 

159 response if available. 

160 

161 Duplicate is a message with the same Message ID (mid) and sender 

162 (remote), as message received within last EXCHANGE_LIFETIME seconds 

163 (usually 247 seconds).""" 

164 

165 key = (message.remote, message.mid) 

166 if key in self._recent_messages: 

167 if message.mtype is CON: 

168 if self._recent_messages[key] is not None: 

169 self.log.info('Duplicate CON received, sending old response again') 

170 # not going via send_message because that would strip the 

171 # mid and might do all other sorts of checks 

172 self._send_initially(self._recent_messages[key]) 

173 else: 

174 self.log.info('Duplicate CON received, no response to send yet') 

175 else: 

176 self.log.info('Duplicate NON, ACK or RST received') 

177 return True 

178 else: 

179 self.log.debug('New unique message received') 

180 self.loop.call_later(EXCHANGE_LIFETIME, functools.partial(self._recent_messages.pop, key)) 

181 self._recent_messages[key] = None 

182 return False 

183 

184 def _store_response_for_duplicates(self, message): 

185 """If the message is the response can be used to satisfy a future 

186 duplicate message, store it.""" 

187 

188 key = (message.remote, message.mid) 

189 if key in self._recent_messages: 

190 self._recent_messages[key] = message 

191 

192 # 

193 # coap dispatch, message-type sublayer: retransmission handling 

194 # 

195 

196 def _add_exchange(self, message, messageerror_monitor): 

197 """Add an "exchange" for outgoing CON message. 

198 

199 CON (Confirmable) messages are automatically retransmitted by protocol 

200 until ACK or RST message with the same Message ID is received from 

201 target host.""" 

202 

203 key = (message.remote, message.mid) 

204 

205 if message.remote not in self._backlogs: 

206 self._backlogs[message.remote] = [] 

207 

208 timeout = random.uniform(ACK_TIMEOUT, ACK_TIMEOUT * ACK_RANDOM_FACTOR) 

209 

210 next_retransmission = self._schedule_retransmit(message, timeout, 0) 

211 self._active_exchanges[key] = (messageerror_monitor, next_retransmission) 

212 

213 self.log.debug("Exchange added, message ID: %d.", message.mid) 

214 

215 def _remove_exchange(self, message): 

216 """Remove exchange from active exchanges and cancel the timeout to next 

217 retransmission.""" 

218 key = (message.remote, message.mid) 

219 

220 if key not in self._active_exchanges: 

221 self.log.warning("Received %s from %s, but could not match it to a running exchange.", message.mtype, message.remote) 

222 return 

223 

224 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key) 

225 next_retransmission.cancel() 

226 if message.mtype is RST: 

227 messageerror_monitor() 

228 self.log.debug("Exchange removed, message ID: %d.", message.mid) 

229 

230 self._continue_backlog(message.remote) 

231 

232 def _continue_backlog(self, remote): 

233 """After an exchange has been removed, start working off the backlog or 

234 clear it completely.""" 

235 

236 if remote not in self._backlogs: 

237 # if active exchanges were something we could do a 

238 # .register_finally() on, we could chain them like that; if we 

239 # implemented anything but NSTART=1, we'll need a more elaborate 

240 # system anyway 

241 raise AssertionError("backlogs/active_exchange relation violated (implementation error)") 

242 

243 # first iteration is sure to happen, others happen only if the enqueued 

244 # messages were NONs 

245 while not any(r == remote for r, mid in self._active_exchanges.keys()): 

246 if self._backlogs[remote] != []: 

247 next_message, messageerror_monitor = self._backlogs[remote].pop(0) 

248 self._send_initially(next_message, messageerror_monitor) 

249 else: 

250 del self._backlogs[remote] 

251 break 

252 

253 def _schedule_retransmit(self, message, timeout, retransmission_counter): 

254 """Create and return a call_later for first or subsequent 

255 retransmissions.""" 

256 

257 # while this could just as well be done in a lambda or with the 

258 # arguments passed to call_later, in this form makes the test cases 

259 # easier to debug (it's about finding where references to a Context 

260 # are kept around; contexts should be able to shut down in an orderly 

261 # way without littering references in the loop) 

262 

263 def retr(self=self, 

264 message=message, 

265 timeout=timeout, 

266 retransmission_counter=retransmission_counter, 

267 doc="If you read this, have a look at _schedule_retransmit", 

268 id=object()): 

269 self._retransmit(message, timeout, retransmission_counter) 

270 return self.loop.call_later(timeout, retr) 

271 

272 def _retransmit(self, message, timeout, retransmission_counter): 

273 """Retransmit CON message that has not been ACKed or RSTed.""" 

274 key = (message.remote, message.mid) 

275 

276 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key) 

277 # this should be a no-op, but let's be sure 

278 next_retransmission.cancel() 

279 

280 if retransmission_counter < MAX_RETRANSMIT: 

281 self.log.info("Retransmission, Message ID: %d.", message.mid) 

282 self._send_via_transport(message) 

283 retransmission_counter += 1 

284 timeout *= 2 

285 

286 next_retransmission = self._schedule_retransmit(message, timeout, retransmission_counter) 

287 self._active_exchanges[key] = (messageerror_monitor, next_retransmission) 

288 else: 

289 self.log.info("Exchange timed out trying to transmit %s", message) 

290 del self._backlogs[message.remote] 

291 self.token_manager.dispatch_error(error.ConRetransmitsExceeded("Retransmissions exceeded"), message.remote) 

292 

293 # 

294 # coap dispatch, message-code sublayer: triggering custom actions based on incoming messages 

295 # 

296 

297 def _process_ping(self, message): 

298 self.log.info('Received CoAP Ping from %s, replying with RST.', message.remote) 

299 rst = Message(mtype=RST, mid=message.mid, code=EMPTY, payload=b'') 

300 rst.remote = message.remote.as_response_address() 

301 # not going via send_message because that would strip the mid, and we 

302 # already know that it can go straight to the wire 

303 self._send_initially(rst) 

304 

305 def _process_request(self, request): 

306 """Spawn a Responder for an incoming request, or feed a long-running 

307 responder if one exists.""" 

308 

309 if request.mtype == CON: 

310 def on_timeout(self, remote, token): 

311 mid, own_timeout = self._piggyback_opportunities.pop( 

312 (remote, token)) 

313 self._send_empty_ack(request.remote, mid, 

314 "Response took too long to prepare") 

315 handle = self.loop.call_later(EMPTY_ACK_DELAY, 

316 on_timeout, self, request.remote, request.token) 

317 key = (request.remote, request.token) 

318 if key in self._piggyback_opportunities: 

319 self.log.warning("New request came in while old request not" 

320 " ACKed yet. Possible mismatch between EMPTY_ACK_DELAY" 

321 " and EXCHANGE_LIFETIME. Cancelling ACK to ward off any" 

322 " further confusion.") 

323 mid, old_handle = self._piggyback_opportunities.pop(key) 

324 old_handle.cancel() 

325 self._piggyback_opportunities[key] = (request.mid, handle) 

326 

327 self.token_manager.process_request(request) 

328 

329 def _process_response(self, response): 

330 """Feed a response back to whatever might expect it. 

331 

332 Returns True if the response was expected (and should be ACK'd 

333 depending on mtype), and False if it was not expected (and should be 

334 RST'd).""" 

335 

336 self.log.debug("Received Response: %r", response) 

337 

338 return self.token_manager.process_response(response) 

339 

340 # 

341 # outgoing messages 

342 # 

343 

344 async def fill_or_recognize_remote(self, message): 

345 if message.remote is not None: 

346 if await self.message_interface.recognize_remote(message.remote): 

347 return True 

348 remote = await self.message_interface.determine_remote(message) 

349 if remote is not None: 

350 message.remote = remote 

351 return True 

352 return False 

353 

354 def send_message(self, message, messageerror_monitor): 

355 """Encode and send message. This takes care of retransmissions (if 

356 CON), message IDs and rate limiting, but does not hook any events to 

357 responses. (Use the :class:`Request` class or responding resources 

358 instead; those are the typical callers of this function.) 

359 

360 If notification about the progress of the exchange is required, an 

361 ExchangeMonitor can be passed in, which will receive the appropriate 

362 callbacks.""" 

363 

364 if message.mid is not None: 

365 # if you can give any reason why the application should provide a 

366 # fixed mid, lower the log level on demand and provide the reason 

367 # in a comment. 

368 self.log.warning("Message ID set on to-be-sent message, this is" 

369 " probably unintended; clearing it.") 

370 message.mid = None 

371 

372 if message.code.is_response(): 

373 no_response = (message.opt.no_response or 0) & (1 << message.code.class_ - 1) != 0 

374 

375 piggyback_key = (message.remote, message.token) 

376 if piggyback_key in self._piggyback_opportunities: 

377 mid, handle = self._piggyback_opportunities.pop(piggyback_key) 

378 handle.cancel() 

379 

380 if no_response: 

381 new_message = Message(code=EMPTY, mid=mid, mtype=ACK) 

382 new_message.remote = message.remote.as_response_address() 

383 message = new_message 

384 self.log.debug("Turning to-be-sent message into an empty ACK due to no_response option.") 

385 else: 

386 message.mtype = ACK 

387 message.mid = mid 

388 else: 

389 if no_response: 

390 self.log.debug("Stopping message in message manager as it is no_response and no ACK is pending.") 

391 return 

392 

393 message.opt.no_response = None 

394 

395 if message.mtype is None: 

396 if self._active_exchanges is None: 

397 # during shutdown, this is all we can do 

398 message.mtype = NON 

399 else: 

400 if message.remote.is_multicast: 

401 message.mtype = NON 

402 else: 

403 # FIXME: on responses, this should take the request into 

404 # consideration (cf. RFC7252 Section 5.2.3, answer to NON 

405 # SHOULD be NON) 

406 message.mtype = CON 

407 else: 

408 if self._active_exchanges is None: 

409 self.log.warning("Forcing message to be sent as NON even though specified because transport is shutting down") 

410 message.mtype = NON 

411 

412 if message.mtype == CON and message.remote.is_multicast: 

413 raise ValueError("Refusing to send CON message to multicast address") 

414 

415 if message.mid is None: 

416 message.mid = self._next_message_id() 

417 

418 if message.mtype == CON and message.remote in self._backlogs: 

419 self.log.debug("Message to %s put into backlog", message.remote) 

420 self._backlogs[message.remote].append((message, messageerror_monitor)) 

421 else: 

422 self._send_initially(message, messageerror_monitor) 

423 

424 def _send_initially(self, message, messageerror_monitor=None): 

425 """Put the message on the wire for the first time, starting retransmission timeouts""" 

426 

427 self.log.debug("Sending message %r", message) 

428 

429 if message.mtype is CON: 

430 assert messageerror_monitor is not None, "messageerror_monitor needs to be set for CONs" 

431 self._add_exchange(message, messageerror_monitor) 

432 

433 self._store_response_for_duplicates(message) 

434 

435 self._send_via_transport(message) 

436 

437 def _send_via_transport(self, message): 

438 """Put the message on the wire""" 

439 

440 self.message_interface.send(message) 

441 

442 def _next_message_id(self): 

443 """Reserve and return a new message ID.""" 

444 message_id = self.message_id 

445 self.message_id = 0xFFFF & (1 + self.message_id) 

446 return message_id 

447 

448 def _send_empty_ack(self, remote, mid, reason): 

449 """Send separate empty ACK for any reason. 

450 

451 Currently, this can happen only once per Responder, that is, when the 

452 last block1 has been transferred and the first block2 is not ready 

453 yet.""" 

454 

455 self.log.debug("Sending empty ACK: %s", reason) 

456 ack = Message( 

457 mtype=ACK, 

458 code=EMPTY, 

459 payload=b"", 

460 ) 

461 ack.remote = remote.as_response_address() 

462 ack.mid = mid 

463 # not going via send_message because that would strip the mid, and we 

464 # already know that it can go straight to the wire 

465 self._send_initially(ack)