Coverage for aiocoap/transports/tcp.py: 90%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

229 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9import asyncio 

10import socket 

11 

12from aiocoap.transports import rfc8323common 

13from aiocoap import interfaces, error, util 

14from aiocoap import COAP_PORT, Message 

15from aiocoap import defaults 

16 

17def _extract_message_size(data: bytes): 

18 """Read out the full length of a CoAP messsage represented by data. 

19 

20 Returns None if data is too short to read the (full) length. 

21 

22 The number returned is the number of bytes that has to be read into data to 

23 start reading the next message; it consists of a constant term, the token 

24 length and the extended length of options-plus-payload.""" 

25 

26 if not data: 

27 return None 

28 

29 l = data[0] >> 4 

30 tokenoffset = 2 

31 tkl = data[0] & 0x0f 

32 

33 if l >= 13: 

34 if l == 13: 

35 extlen = 1 

36 offset = 13 

37 elif l == 14: 

38 extlen = 2 

39 offset = 269 

40 else: 

41 extlen = 4 

42 offset = 65805 

43 if len(data) < extlen + 1: 

44 return None 

45 tokenoffset = 2 + extlen 

46 l = int.from_bytes(data[1:1 + extlen], "big") + offset 

47 return tokenoffset, tkl, l 

48 

49def _decode_message(data: bytes) -> Message: 

50 tokenoffset, tkl, _ = _extract_message_size(data) 

51 if tkl > 8: 

52 raise error.UnparsableMessage("Overly long token") 

53 code = data[tokenoffset - 1] 

54 token = data[tokenoffset:tokenoffset + tkl] 

55 

56 msg = Message(code=code, token=token) 

57 

58 msg.payload = msg.opt.decode(data[tokenoffset + tkl:]) 

59 

60 return msg 

61 

62def _encode_length(l: int): 

63 if l < 13: 

64 return (l, b"") 

65 elif l < 269: 

66 return (13, (l - 13).to_bytes(1, 'big')) 

67 elif l < 65805: 

68 return (14, (l - 269).to_bytes(2, 'big')) 

69 else: 

70 return (15, (l - 65805).to_bytes(4, 'big')) 

71 

72def _serialize(msg: Message) -> bytes: 

73 data = [msg.opt.encode()] 

74 if msg.payload: 

75 data += [b'\xff', msg.payload] 

76 data = b"".join(data) 

77 l, extlen = _encode_length(len(data)) 

78 

79 tkl = len(msg.token) 

80 if tkl > 8: 

81 raise ValueError("Overly long token") 

82 

83 return b"".join(( 

84 bytes(((l << 4) | tkl,)), 

85 extlen, 

86 bytes((msg.code,)), 

87 msg.token, 

88 data 

89 )) 

90 

91class TcpConnection(asyncio.Protocol, rfc8323common.RFC8323Remote, interfaces.EndpointAddress): 

92 # currently, both the protocol and the EndpointAddress are the same object. 

93 # if, at a later point in time, the keepaliving of TCP connections should 

94 # depend on whether the library user still keeps a usable address around, 

95 # those functions could be split. 

96 

97 def __init__(self, ctx, log, loop, *, is_server): 

98 super().__init__() 

99 self._ctx = ctx 

100 self.log = log 

101 self.loop = loop 

102 

103 self._spool = b"" 

104 

105 self._remote_settings = None 

106 

107 self._transport = None 

108 self._local_is_server = is_server 

109 

110 @property 

111 def scheme(self): 

112 return self._ctx._scheme 

113 

114 def _send_message(self, msg: Message): 

115 self.log.debug("Sending message: %r", msg) 

116 self._transport.write(_serialize(msg)) 

117 

118 def _abort_with(self, abort_msg): 

119 if self._transport is not None: 

120 self._send_message(abort_msg) 

121 self._transport.close() 

122 else: 

123 # FIXME: find out how this happens; i've only seen it after nmap 

124 # runs against an aiocoap server and then shutting it down. 

125 # "poisoning" the object to make sure this can not be exploited to 

126 # bypass the server shutdown. 

127 self._ctx = None 

128 

129 # implementing asyncio.Protocol 

130 

131 def connection_made(self, transport): 

132 self._transport = transport 

133 

134 ssl_object = transport.get_extra_info('ssl_object') 

135 if ssl_object is not None: 

136 server_name = getattr(ssl_object, "indicated_server_name", None) 

137 else: 

138 server_name = None 

139 

140 # `host` already contains the interface identifier, so throwing away 

141 # scope and interface identifier 

142 self._local_hostinfo = transport.get_extra_info('sockname')[:2] 

143 self._remote_hostinfo = transport.get_extra_info('peername')[:2] 

144 

145 def none_default_port(sockname): 

146 return (sockname[0], None if sockname[1] == self._ctx._default_port else sockname[1]) 

147 self._local_hostinfo = none_default_port(self._local_hostinfo) 

148 self._remote_hostinfo = none_default_port(self._remote_hostinfo) 

149 

150 # SNI information available 

151 if server_name is not None: 

152 if self._local_is_server: 

153 self._local_hostinfo = (server_name, self._local_hostinfo[1]) 

154 else: 

155 self._remote_hostinfo = (server_name, self._remote_hostinfo[1]) 

156 

157 self._send_initial_csm() 

158 

159 def connection_lost(self, exc): 

160 # FIXME react meaningfully: 

161 # * send event through pool so it can propagate the error to all 

162 # requests on the same remote 

163 # * mark the address as erroneous so it won't be recognized by 

164 # fill_or_recognize_remote 

165 

166 self._ctx._dispatch_error(self, exc) 

167 

168 def data_received(self, data): 

169 # A rope would be more efficient here, but the expected case is that 

170 # _spool is b"" and spool gets emptied soon -- most messages will just 

171 # fit in a single TCP package and not be nagled together. 

172 # 

173 # (If this does become a bottleneck, say self._spool = SomeRope(b"") 

174 # and barely change anything else). 

175 

176 self._spool += data 

177 

178 while True: 

179 msglen = _extract_message_size(self._spool) 

180 if msglen is None: 

181 break 

182 msglen = sum(msglen) 

183 if msglen > self._my_max_message_size: 

184 self.abort("Overly large message announced") 

185 return 

186 

187 if msglen > len(self._spool): 

188 break 

189 

190 msg = self._spool[:msglen] 

191 try: 

192 msg = _decode_message(msg) 

193 except error.UnparsableMessage: 

194 self.abort("Failed to parse message") 

195 return 

196 msg.remote = self 

197 

198 self.log.debug("Received message: %r", msg) 

199 

200 self._spool = self._spool[msglen:] 

201 

202 if msg.code.is_signalling(): 

203 self._process_signaling(msg) 

204 continue 

205 

206 if self._remote_settings is None: 

207 self.abort("No CSM received") 

208 return 

209 

210 self._ctx._dispatch_incoming(self, msg) 

211 

212 def eof_received(self): 

213 # FIXME: as with connection_lost, but less noisy if announced 

214 # FIXME: return true and initiate own shutdown if that is what CoAP prescribes 

215 pass 

216 

217 def pause_writing(self): 

218 # FIXME: do something ;-) 

219 pass 

220 

221 def resume_writing(self): 

222 # FIXME: do something ;-) 

223 pass 

224 

225class _TCPPooling: 

226 # implementing TokenInterface 

227 

228 def send_message(self, message, messageerror_monitor): 

229 # Ignoring messageerror_monitor: CoAP over reliable transports has no 

230 # way of indicating that a particular message was bad, it always shuts 

231 # down the complete connection 

232 

233 if message.code.is_response(): 

234 no_response = (message.opt.no_response or 0) & (1 << message.code.class_ - 1) != 0 

235 if no_response: 

236 return 

237 

238 message.opt.no_response = None 

239 

240 message.remote._send_message(message) 

241 

242 # used by the TcpConnection instances 

243 

244 def _dispatch_incoming(self, connection, msg): 

245 if msg.code == 0: 

246 pass 

247 

248 if msg.code.is_response(): 

249 self._tokenmanager.process_response(msg) 

250 # ignoring the return value; unexpected responses can be the 

251 # asynchronous result of cancelled observations 

252 else: 

253 self._tokenmanager.process_request(msg) 

254 

255 def _dispatch_error(self, connection, exc): 

256 self._evict_from_pool(connection) 

257 

258 if self._tokenmanager is None: 

259 if exc is not None: 

260 self.log.warning("Ignoring late error during shutdown: %s", exc) 

261 else: 

262 # it's just a regular connection loss, that's to be expected during shutdown 

263 pass 

264 return 

265 

266 self._tokenmanager.dispatch_error(exc, connection) 

267 

268 # for diverting behavior of _TLSMixIn 

269 _scheme = 'coap+tcp' 

270 _default_port = COAP_PORT 

271 

272class TCPServer(_TCPPooling, interfaces.TokenInterface): 

273 def __init__(self): 

274 self._pool = set() 

275 

276 @classmethod 

277 async def create_server(cls, bind, tman: interfaces.TokenManager, log, loop, *, _server_context=None): 

278 self = cls() 

279 self._tokenmanager = tman 

280 self.log = log 

281 #self.loop = loop 

282 

283 bind = bind or ('::', None) 

284 bind = (bind[0], bind[1] + (self._default_port - COAP_PORT) if bind[1] else self._default_port) 

285 

286 def new_connection(): 

287 c = TcpConnection(self, log, loop, is_server=True) 

288 self._pool.add(c) 

289 return c 

290 

291 try: 

292 server = await loop.create_server(new_connection, bind[0], bind[1], 

293 ssl=_server_context, reuse_port=defaults.has_reuse_port()) 

294 except socket.gaierror: 

295 raise error.ResolutionError("No local bindable address found for %s" % bind[0]) 

296 self.server = server 

297 

298 return self 

299 

300 def _evict_from_pool(self, connection): 

301 self._pool.remove(connection) 

302 

303 # implementing TokenInterface 

304 

305 async def fill_or_recognize_remote(self, message): 

306 if message.remote is not None \ 

307 and isinstance(message.remote, TcpConnection) \ 

308 and message.remote._ctx is self: 

309 return True 

310 

311 return False 

312 

313 async def shutdown(self): 

314 self.server.close() 

315 for c in self._pool: 

316 # FIXME: it would be nicer to release them 

317 c.abort("Server shutdown") 

318 await self.server.wait_closed() 

319 self._tokenmanager = None 

320 

321class TCPClient(_TCPPooling, interfaces.TokenInterface): 

322 def __init__(self): 

323 self._pool = {} # (host, port) -> connection 

324 # note that connections are filed by host name, so different names for 

325 # the same address might end up with different connections, which is 

326 # probably okay for TCP, and crucial for later work with TLS. 

327 

328 async def _spawn_protocol(self, message): 

329 if message.unresolved_remote is None: 

330 host = message.opt.uri_host 

331 port = message.opt.uri_port or self._default_port 

332 if host is None: 

333 raise ValueError("No location found to send message to (neither in .opt.uri_host nor in .remote)") 

334 else: 

335 host, port = util.hostportsplit(message.unresolved_remote) 

336 port = port or self._default_port 

337 

338 if (host, port) in self._pool: 

339 return self._pool[(host, port)] 

340 

341 try: 

342 _, protocol = await self.loop.create_connection( 

343 lambda: TcpConnection(self, self.log, self.loop, 

344 is_server=False), 

345 host, port, 

346 ssl=self._ssl_context_factory(message.unresolved_remote)) 

347 except socket.gaierror: 

348 raise error.ResolutionError("No address information found for requests to %r" % host) 

349 except OSError: 

350 raise error.NetworkError("Connection failed to %r" % host) 

351 

352 self._pool[(host, port)] = protocol 

353 

354 return protocol 

355 

356 # for diverting behavior of TLSClient 

357 def _ssl_context_factory(self, hostinfo): 

358 return None 

359 

360 def _evict_from_pool(self, connection): 

361 keys = [] 

362 for k, p in self._pool.items(): 

363 if p is connection: 

364 keys.append(k) 

365 # should really be zero or one 

366 for k in keys: 

367 self._pool.pop(k) 

368 

369 @classmethod 

370 async def create_client_transport(cls, tman: interfaces.TokenManager, log, loop, credentials=None): 

371 # this is not actually asynchronous, and even though the interface 

372 # between the context and the creation of interfaces is not fully 

373 # standardized, this stays in the other inferfaces' style. 

374 self = cls() 

375 self._tokenmanager = tman 

376 self.log = log 

377 self.loop = loop 

378 # used by the TLS variant; FIXME not well thought through 

379 self.credentials = credentials 

380 

381 return self 

382 

383 # implementing TokenInterface 

384 

385 async def fill_or_recognize_remote(self, message): 

386 if message.remote is not None \ 

387 and isinstance(message.remote, TcpConnection) \ 

388 and message.remote._ctx is self: 

389 return True 

390 

391 if message.requested_scheme == self._scheme: 

392 # FIXME: This could pool outgoing connections. 

393 # (Checking if an incoming connection is a pool candidate is 

394 # probably overkill because even if a URI can be constructed from a 

395 # ephemeral client port, nobody but us can use it, and we can just 

396 # set .remote). 

397 message.remote = await self._spawn_protocol(message) 

398 return True 

399 

400 return False 

401 

402 async def shutdown(self): 

403 for c in self._pool.values(): 

404 # FIXME: it would be nicer to release them 

405 c.abort("Server shutdown") 

406 del self._tokenmanager