Coverage for aiocoap/cli/client.py: 54%

309 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-08-01 17:22 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

6 

7import sys 

8import asyncio 

9import argparse 

10import logging 

11import subprocess 

12from pathlib import Path 

13 

14import shlex 

15 

16# even though not used directly, this has side effects on the input() function 

17# used in interactive mode 

18try: 

19 import readline # noqa: F401 

20except ImportError: 

21 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

22 

23import aiocoap 

24import aiocoap.defaults 

25import aiocoap.meta 

26import aiocoap.proxy.client 

27from aiocoap.util import contenttype 

28from aiocoap.util.cli import ActionNoYes 

29from aiocoap.numbers import ContentFormat 

30 

31 

32def build_parser(): 

33 p = argparse.ArgumentParser(description=__doc__) 

34 p.add_argument( 

35 "--non", 

36 help="Send request as non-confirmable (NON) message", 

37 action="store_true", 

38 ) 

39 p.add_argument( 

40 "-m", 

41 "--method", 

42 help="Name or number of request method to use (default: %(default)s)", 

43 default="GET", 

44 ) 

45 p.add_argument( 

46 "--observe", help="Register an observation on the resource", action="store_true" 

47 ) 

48 p.add_argument( 

49 "--observe-exec", 

50 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", 

51 metavar="CMD", 

52 ) 

53 p.add_argument( 

54 "--accept", 

55 help="Content format to request", 

56 metavar="MIME", 

57 ) 

58 p.add_argument( 

59 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI" 

60 ) 

61 p.add_argument( 

62 "--payload", 

63 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", 

64 metavar="X", 

65 ) 

66 p.add_argument( 

67 "--payload-initial-szx", 

68 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", 

69 metavar="SZX", 

70 type=int, 

71 ) 

72 p.add_argument( 

73 "--content-format", 

74 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.", 

75 metavar="MIME", 

76 ) 

77 p.add_argument( 

78 "--no-set-hostname", 

79 help="Suppress transmission of Uri-Host even if the host name is not an IP literal", 

80 dest="set_hostname", 

81 action="store_false", 

82 default=True, 

83 ) 

84 p.add_argument( 

85 "-v", 

86 "--verbose", 

87 help="Increase the debug output", 

88 action="count", 

89 ) 

90 p.add_argument( 

91 "-q", 

92 "--quiet", 

93 help="Decrease the debug output", 

94 action="count", 

95 ) 

96 # careful: picked before parsing 

97 p.add_argument( 

98 "--interactive", 

99 help="Enter interactive mode", 

100 action="store_true", 

101 ) 

102 p.add_argument( 

103 "--credentials", 

104 help="Load credentials to use from a given file", 

105 type=Path, 

106 ) 

107 p.add_argument( 

108 "--version", action="version", version="%(prog)s " + aiocoap.meta.version 

109 ) 

110 

111 p.add_argument( 

112 "--color", 

113 help="Color output (default on TTYs if all required modules are installed)", 

114 default=None, 

115 action=ActionNoYes, 

116 ) 

117 p.add_argument( 

118 "--pretty-print", 

119 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

120 default=None, 

121 action=ActionNoYes, 

122 ) 

123 p.add_argument( 

124 "url", 

125 help="CoAP address to fetch", 

126 ) 

127 

128 return p 

129 

130 

131def configure_logging(verbosity): 

132 logging.basicConfig() 

133 

134 if verbosity <= -2: 

135 logging.getLogger("coap").setLevel(logging.CRITICAL + 1) 

136 elif verbosity == -1: 

137 logging.getLogger("coap").setLevel(logging.ERROR) 

138 elif verbosity == 0: 

139 logging.getLogger("coap").setLevel(logging.WARNING) 

140 elif verbosity == 1: 

141 logging.getLogger("coap").setLevel(logging.INFO) 

142 elif verbosity >= 2: 

143 logging.getLogger("coap").setLevel(logging.DEBUG) 

144 

145 

146def colored(text, options, tokenlambda): 

147 """Apply pygments based coloring if options.color is set. Tokelambda is a 

148 callback to which pygments.token is passed and which returns a token type; 

149 this makes it easy to not need to conditionally react to pygments' possible 

150 absence in all color locations.""" 

151 if not options.color: 

152 return str(text) 

153 

154 from pygments.formatters import TerminalFormatter 

155 from pygments import token, format 

156 

157 return format( 

158 [(tokenlambda(token), str(text))], 

159 TerminalFormatter(), 

160 ) 

161 

162 

163def incoming_observation(options, response): 

164 if options.observe_exec: 

165 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

166 # FIXME this blocks 

167 p.communicate(response.payload) 

168 else: 

169 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc)) 

170 if response.code.is_successful(): 

171 present(response, options, file=sys.stderr) 

172 else: 

173 sys.stdout.flush() 

174 print( 

175 colored( 

176 response.code, options, lambda token: token.Token.Generic.Error 

177 ), 

178 file=sys.stderr, 

179 ) 

180 if response.payload: 

181 present(response, options, file=sys.stderr) 

182 

183 

184def apply_credentials(context, credentials, errfn): 

185 if credentials.suffix == ".json": 

186 import json 

187 

188 context.client_credentials.load_from_dict(json.load(credentials.open("rb"))) 

189 elif credentials.suffix == ".diag": 

190 try: 

191 import cbor_diag 

192 import cbor2 

193 except ImportError: 

194 raise errfn( 

195 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package" 

196 ) 

197 context.client_credentials.load_from_dict( 

198 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read())) 

199 ) 

200 else: 

201 raise errfn( 

202 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix) 

203 ) 

204 

205 

206def present(message, options, file=sys.stdout): 

207 """Write a message payload to the output, pretty printing and/or coloring 

208 it as configured in the options.""" 

209 if not options.quiet and (message.opt.location_path or message.opt.location_query): 

210 # FIXME: Percent encoding is completely missing; this would be done 

211 # most easily with a CRI library 

212 location_ref = "/" + "/".join(message.opt.location_path) 

213 if message.opt.location_query: 

214 location_ref += "?" + "&".join(message.opt.location_query) 

215 print( 

216 colored( 

217 f"Location options indicate new resource: {location_ref}", 

218 options, 

219 lambda token: token.Token.Generic.Inserted, 

220 ), 

221 file=sys.stderr, 

222 ) 

223 

224 if not message.payload: 

225 return 

226 

227 payload = None 

228 

229 cf = message.opt.content_format or message.request.opt.content_format 

230 if cf is not None and cf.is_known(): 

231 mime = cf.media_type 

232 else: 

233 mime = "application/octet-stream" 

234 if options.pretty_print: 

235 from aiocoap.util.prettyprint import pretty_print 

236 

237 prettyprinted = pretty_print(message) 

238 if prettyprinted is not None: 

239 (infos, mime, payload) = prettyprinted 

240 if not options.quiet: 

241 for i in infos: 

242 print( 

243 colored("# " + i, options, lambda token: token.Comment), 

244 file=sys.stderr, 

245 ) 

246 

247 color = options.color 

248 if color: 

249 from aiocoap.util.prettyprint import lexer_for_mime 

250 import pygments 

251 

252 try: 

253 lexer = lexer_for_mime(mime) 

254 except pygments.util.ClassNotFound: 

255 color = False 

256 

257 if color and payload is None: 

258 # Coloring requires a unicode-string style payload, either from the 

259 # mime type or from the pretty printer. 

260 try: 

261 payload = message.payload.decode("utf8") 

262 except UnicodeDecodeError: 

263 color = False 

264 

265 if color: 

266 from pygments.formatters import TerminalFormatter 

267 from pygments import highlight 

268 

269 highlit = highlight( 

270 payload, 

271 lexer, 

272 TerminalFormatter(), 

273 ) 

274 # The TerminalFormatter already adds an end-of-line character, not 

275 # trying to add one for any missing trailing newlines. 

276 print(highlit, file=file, end="") 

277 file.flush() 

278 else: 

279 if payload is None: 

280 file.buffer.write(message.payload) 

281 if file.isatty() and message.payload[-1:] != b"\n": 

282 file.write("\n") 

283 else: 

284 file.write(payload) 

285 if file.isatty() and payload[-1] != "\n": 

286 file.write("\n") 

287 

288 

289async def single_request(args, context): 

290 parser = build_parser() 

291 options = parser.parse_args(args) 

292 

293 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

294 if pretty_print_modules and (options.color is True or options.pretty_print is True): 

295 parser.error( 

296 "Color and pretty printing require the following" 

297 " additional module(s) to be installed: %s" 

298 % ", ".join(pretty_print_modules) 

299 ) 

300 if options.color is None: 

301 options.color = sys.stdout.isatty() and not pretty_print_modules 

302 if options.pretty_print is None: 

303 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

304 

305 configure_logging((options.verbose or 0) - (options.quiet or 0)) 

306 

307 try: 

308 code = getattr( 

309 aiocoap.numbers.codes.Code, 

310 options.method.upper().replace("IPATCH", "iPATCH"), 

311 ) 

312 except AttributeError: 

313 try: 

314 code = aiocoap.numbers.codes.Code(int(options.method)) 

315 except ValueError: 

316 raise parser.error("Unknown method") 

317 

318 if options.credentials is not None: 

319 apply_credentials(context, options.credentials, parser.error) 

320 

321 request = aiocoap.Message( 

322 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON 

323 ) 

324 try: 

325 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

326 except ValueError as e: 

327 raise parser.error(e) 

328 

329 if not request.opt.uri_host and not request.unresolved_remote: 

330 raise parser.error("Request URLs need to be absolute.") 

331 

332 if options.accept: 

333 try: 

334 request.opt.accept = ContentFormat(int(options.accept)) 

335 except ValueError: 

336 try: 

337 request.opt.accept = ContentFormat.by_media_type(options.accept) 

338 except KeyError: 

339 raise parser.error("Unknown accept type") 

340 

341 if options.observe: 

342 request.opt.observe = 0 

343 observation_is_over = asyncio.get_event_loop().create_future() 

344 

345 if options.content_format: 

346 try: 

347 request.opt.content_format = ContentFormat(int(options.content_format)) 

348 except ValueError: 

349 try: 

350 request.opt.content_format = ContentFormat.by_media_type( 

351 options.content_format 

352 ) 

353 except KeyError: 

354 raise parser.error("Unknown content format") 

355 

356 if options.payload: 

357 if options.payload.startswith("@"): 

358 filename = options.payload[1:] 

359 if filename == "-": 

360 f = sys.stdin.buffer 

361 else: 

362 f = open(filename, "rb") 

363 try: 

364 request.payload = f.read() 

365 except OSError as e: 

366 raise parser.error("File could not be opened: %s" % e) 

367 else: 

368 request_classification = contenttype.categorize( 

369 request.opt.content_format.media_type 

370 if request.opt.content_format is not None 

371 and request.opt.content_format.is_known() 

372 else "" 

373 ) 

374 if request_classification in ("cbor", "cbor-seq"): 

375 try: 

376 import cbor_diag 

377 except ImportError as e: 

378 raise parser.error(f"CBOR recoding not available ({e})") 

379 

380 try: 

381 encoded = cbor_diag.diag2cbor(options.payload) 

382 except ValueError as e: 

383 raise parser.error( 

384 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}" 

385 ) 

386 

387 if request_classification == "cbor-seq": 

388 try: 

389 import cbor2 

390 except ImportError as e: 

391 raise parser.error( 

392 f"CBOR sequence recoding not available ({e})" 

393 ) 

394 decoded = cbor2.loads(encoded) 

395 if not isinstance(decoded, list): 

396 raise parser.error( 

397 "CBOR sequence recoding requires an array as the top-level element." 

398 ) 

399 request.payload = b"".join(cbor2.dumps(d) for d in decoded) 

400 else: 

401 request.payload = encoded 

402 else: 

403 request.payload = options.payload.encode("utf8") 

404 

405 if options.payload_initial_szx is not None: 

406 request.remote.maximum_block_size_exp = options.payload_initial_szx 

407 

408 if options.proxy is None: 

409 interface = context 

410 else: 

411 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

412 

413 try: 

414 requested_uri = request.get_request_uri() 

415 

416 requester = interface.request(request) 

417 

418 if options.observe: 

419 requester.observation.register_errback(observation_is_over.set_result) 

420 requester.observation.register_callback( 

421 lambda data, options=options: incoming_observation(options, data) 

422 ) 

423 

424 try: 

425 response_data = await requester.response 

426 except aiocoap.error.ResolutionError as e: 

427 print("Name resolution error:", e, file=sys.stderr) 

428 sys.exit(1) 

429 except aiocoap.error.NetworkError as e: 

430 print("Network error:", e, file=sys.stderr) 

431 extra_help = e.extra_help() 

432 if extra_help: 

433 print("Debugging hint:", extra_help, file=sys.stderr) 

434 sys.exit(1) 

435 # Fallback while not all backends raise NetworkErrors 

436 except OSError as e: 

437 text = str(e) 

438 if not text: 

439 text = repr(e) 

440 if not text: 

441 # eg ConnectionResetError flying out of a misconfigured SSL server 

442 text = type(e) 

443 print( 

444 "Warning: OS errors should not be raised this way any more.", 

445 file=sys.stderr, 

446 ) 

447 # not telling what to do precisely: the form already tells users to 

448 # include `aiocoap.cli.defaults` output, which is exactly what we 

449 # need. 

450 print( 

451 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.", 

452 file=sys.stderr, 

453 ) 

454 print("Error:", text, file=sys.stderr) 

455 sys.exit(1) 

456 

457 response_uri = response_data.get_request_uri() 

458 if requested_uri != response_uri: 

459 print( 

460 colored( 

461 f"Response arrived from different address; base URI is {response_uri}", 

462 options, 

463 lambda token: token.Generic.Inserted, 

464 ), 

465 file=sys.stderr, 

466 ) 

467 if response_data.code.is_successful(): 

468 present(response_data, options) 

469 else: 

470 print( 

471 colored(response_data.code, options, lambda token: token.Generic.Error), 

472 file=sys.stderr, 

473 ) 

474 present(response_data, options, file=sys.stderr) 

475 sys.exit(1) 

476 

477 if options.observe: 

478 exit_reason = await observation_is_over 

479 print("Observation is over: %r" % (exit_reason,), file=sys.stderr) 

480 finally: 

481 if not requester.response.done(): 

482 requester.response.cancel() 

483 if options.observe and not requester.observation.cancelled: 

484 requester.observation.cancel() 

485 

486 

487async def single_request_with_context(args): 

488 """Wrapper around single_request until sync_main gets made fully async, and 

489 async context managers are used to manage contexts.""" 

490 context = await aiocoap.Context.create_client_context() 

491 try: 

492 await single_request(args, context) 

493 finally: 

494 await context.shutdown() 

495 

496 

497interactive_expecting_keyboard_interrupt = None 

498 

499 

500async def interactive(): 

501 global interactive_expecting_keyboard_interrupt 

502 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

503 

504 context = await aiocoap.Context.create_client_context() 

505 

506 while True: 

507 try: 

508 # when http://bugs.python.org/issue22412 is resolved, use that instead 

509 line = await asyncio.get_event_loop().run_in_executor( 

510 None, lambda: input("aiocoap> ") 

511 ) 

512 except EOFError: 

513 line = "exit" 

514 line = shlex.split(line) 

515 if not line: 

516 continue 

517 if line in (["help"], ["?"]): 

518 line = ["--help"] 

519 if line in (["quit"], ["q"], ["exit"]): 

520 break 

521 

522 current_task = asyncio.create_task( 

523 single_request(line, context=context), 

524 name="Interactive prompt command %r" % line, 

525 ) 

526 interactive_expecting_keyboard_interrupt = ( 

527 asyncio.get_event_loop().create_future() 

528 ) 

529 

530 done, pending = await asyncio.wait( 

531 [current_task, interactive_expecting_keyboard_interrupt], 

532 return_when=asyncio.FIRST_COMPLETED, 

533 ) 

534 

535 if current_task not in done: 

536 current_task.cancel() 

537 else: 

538 try: 

539 await current_task 

540 except SystemExit as e: 

541 if e.code != 0: 

542 print("Exit code: %d" % e.code, file=sys.stderr) 

543 continue 

544 except Exception as e: 

545 print("Unhandled exception raised: %s" % (e,)) 

546 

547 await context.shutdown() 

548 

549 

550def sync_main(args=None): 

551 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

552 if args is None: 

553 args = sys.argv[1:] 

554 

555 if "--interactive" not in args: 

556 try: 

557 asyncio.run(single_request_with_context(args)) 

558 except KeyboardInterrupt: 

559 sys.exit(3) 

560 else: 

561 if len(args) != 1: 

562 print( 

563 "No other arguments must be specified when entering interactive mode", 

564 file=sys.stderr, 

565 ) 

566 sys.exit(1) 

567 

568 loop = asyncio.get_event_loop() 

569 task = loop.create_task( 

570 interactive(), 

571 name="Interactive prompt", 

572 ) 

573 

574 while not task.done(): 

575 try: 

576 loop.run_until_complete(task) 

577 except KeyboardInterrupt: 

578 if not interactive_expecting_keyboard_interrupt.done(): 

579 interactive_expecting_keyboard_interrupt.set_result(None) 

580 except SystemExit: 

581 continue # asyncio/tasks.py(242) raises those after setting them as results, but we particularly want them back in the loop 

582 

583 

584if __name__ == "__main__": 

585 sync_main()