Coverage for aiocoap/cli/client.py: 58%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

283 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

10 

11import sys 

12import asyncio 

13import argparse 

14import logging 

15import subprocess 

16from pathlib import Path 

17 

18import shlex 

19# even though not used directly, this has side effects on the input() function 

20# used in interactive mode 

21try: 

22 import readline # noqa: F401 

23except ImportError: 

24 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

25 

26import aiocoap 

27import aiocoap.defaults 

28import aiocoap.meta 

29import aiocoap.proxy.client 

30from aiocoap.util import contenttype 

31from aiocoap.util.cli import ActionNoYes 

32from aiocoap.numbers import ContentFormat 

33from ..util.asyncio import py38args 

34 

35def build_parser(): 

36 p = argparse.ArgumentParser(description=__doc__) 

37 p.add_argument('--non', help="Send request as non-confirmable (NON) message", action='store_true') 

38 p.add_argument('-m', '--method', help="Name or number of request method to use (default: %(default)s)", default="GET") 

39 p.add_argument('--observe', help="Register an observation on the resource", action='store_true') 

40 p.add_argument('--observe-exec', help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", metavar='CMD') 

41 p.add_argument('--accept', help="Content format to request", metavar="MIME") 

42 p.add_argument('--proxy', help="Relay the CoAP request to a proxy for execution", metavar="HOST[:PORT]") 

43 p.add_argument('--payload', help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", metavar="X") 

44 p.add_argument('--payload-initial-szx', help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", metavar="SZX", type=int) 

45 p.add_argument('--content-format', help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, conversion is attempted (currently only JSON/Python-literals to CBOR).", metavar="MIME") 

46 p.add_argument('--no-set-hostname', help="Suppress transmission of Uri-Host even if the host name is not an IP literal", dest="set_hostname", action='store_false', default=True) 

47 p.add_argument('-v', '--verbose', help="Increase the debug output", action="count") 

48 p.add_argument('-q', '--quiet', help="Decrease the debug output", action="count") 

49 p.add_argument('--interactive', help="Enter interactive mode", action="store_true") # careful: picked before parsing 

50 p.add_argument('--credentials', help="Load credentials to use from a given file", type=Path) 

51 p.add_argument('--version', action="version", version='%(prog)s ' + aiocoap.meta.version) 

52 

53 p.add_argument('--color', 

54 help="Color output (default on TTYs if all required modules are installed)", 

55 default=None, 

56 action=ActionNoYes, 

57 ) 

58 p.add_argument('--pretty-print', 

59 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

60 default=None, 

61 action=ActionNoYes, 

62 ) 

63 p.add_argument('url', help="CoAP address to fetch") 

64 

65 return p 

66 

67def configure_logging(verbosity): 

68 logging.basicConfig() 

69 

70 if verbosity <= -2: 

71 logging.getLogger('coap').setLevel(logging.CRITICAL + 1) 

72 elif verbosity == -1: 

73 logging.getLogger('coap').setLevel(logging.ERROR) 

74 elif verbosity == 0: 

75 logging.getLogger('coap').setLevel(logging.WARNING) 

76 elif verbosity == 1: 

77 logging.getLogger('coap').setLevel(logging.INFO) 

78 elif verbosity >= 2: 

79 logging.getLogger('coap').setLevel(logging.DEBUG) 

80 

81def colored(text, options, *args, **kwargs): 

82 """Apply termcolor.colored with the given args if options.color is set""" 

83 if not options.color: 

84 return text 

85 

86 import termcolor 

87 return termcolor.colored(text, *args, **kwargs) 

88 

89def incoming_observation(options, response): 

90 if options.observe_exec: 

91 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

92 # FIXME this blocks 

93 p.communicate(response.payload) 

94 else: 

95 sys.stdout.write(colored('---', options, 'grey', attrs=['bold']) + '\n') 

96 if response.code.is_successful(): 

97 present(response, options, file=sys.stderr) 

98 else: 

99 sys.stdout.flush() 

100 print(colored(response.code, options, 'red'), file=sys.stderr) 

101 if response.payload: 

102 present(response, options, file=sys.stderr) 

103 

104def apply_credentials(context, credentials, errfn): 

105 if credentials.suffix == '.json': 

106 import json 

107 context.client_credentials.load_from_dict(json.load(credentials.open('rb'))) 

108 else: 

109 raise errfn("Unknown suffix: %s (expected: .json)" % (credentials.suffix)) 

110 

111def present(message, options, file=sys.stdout): 

112 """Write a message payload to the output, pretty printing and/or coloring 

113 it as configured in the options.""" 

114 if not message.payload: 

115 return 

116 

117 payload = None 

118 

119 cf = message.opt.content_format or message.request.opt.content_format 

120 if cf is not None and cf.is_known(): 

121 mime = cf.media_type 

122 else: 

123 mime = 'application/octet-stream' 

124 if options.pretty_print: 

125 from aiocoap.util.prettyprint import pretty_print 

126 prettyprinted = pretty_print(message) 

127 if prettyprinted is not None: 

128 (infos, mime, payload) = prettyprinted 

129 if not options.quiet: 

130 for i in infos: 

131 print(colored(i, options, 'grey', attrs=['bold']), 

132 file=sys.stderr) 

133 

134 color = options.color 

135 if color: 

136 from aiocoap.util.prettyprint import lexer_for_mime 

137 import pygments 

138 try: 

139 lexer = lexer_for_mime(mime) 

140 except pygments.util.ClassNotFound: 

141 color = False 

142 

143 if color and payload is None: 

144 # Coloring requires a unicode-string style payload, either from the 

145 # mime type or from the pretty printer. 

146 try: 

147 payload = message.payload.decode('utf8') 

148 except UnicodeDecodeError: 

149 color = False 

150 

151 if color: 

152 from pygments.formatters import TerminalFormatter 

153 from pygments import highlight 

154 highlit = highlight( 

155 payload, 

156 lexer, 

157 TerminalFormatter(), 

158 ) 

159 # The TerminalFormatter already adds an end-of-line character, not 

160 # trying to add one for any missing trailing newlines. 

161 print(highlit, file=file, end="") 

162 file.flush() 

163 else: 

164 if payload is None: 

165 file.buffer.write(message.payload) 

166 if file.isatty() and message.payload[-1:] != b'\n': 

167 file.write("\n") 

168 else: 

169 file.write(payload) 

170 if file.isatty() and payload[-1] != '\n': 

171 file.write("\n") 

172 

173async def single_request(args, context=None): 

174 parser = build_parser() 

175 options = parser.parse_args(args) 

176 

177 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

178 if pretty_print_modules and \ 

179 (options.color is True or options.pretty_print is True): 

180 parser.error("Color and pretty printing require the following" 

181 " additional module(s) to be installed: %s" % 

182 ", ".join(pretty_print_modules)) 

183 if options.color is None: 

184 options.color = sys.stdout.isatty() and not pretty_print_modules 

185 if options.pretty_print is None: 

186 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

187 

188 configure_logging((options.verbose or 0) - (options.quiet or 0)) 

189 

190 try: 

191 code = getattr(aiocoap.numbers.codes.Code, options.method.upper()) 

192 except AttributeError: 

193 try: 

194 code = aiocoap.numbers.codes.Code(int(options.method)) 

195 except ValueError: 

196 raise parser.error("Unknown method") 

197 

198 if context is None: 

199 context = await aiocoap.Context.create_client_context() 

200 

201 if options.credentials is not None: 

202 apply_credentials(context, options.credentials, parser.error) 

203 

204 request = aiocoap.Message(code=code, mtype=aiocoap.NON if options.non else aiocoap.CON) 

205 try: 

206 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

207 except ValueError as e: 

208 raise parser.error(e) 

209 

210 if not request.opt.uri_host and not request.unresolved_remote: 

211 raise parser.error("Request URLs need to be absolute.") 

212 

213 if options.accept: 

214 try: 

215 request.opt.accept = ContentFormat(int(options.accept)) 

216 except ValueError: 

217 try: 

218 request.opt.accept = ContentFormat.by_media_type(options.accept) 

219 except KeyError: 

220 raise parser.error("Unknown accept type") 

221 

222 if options.observe: 

223 request.opt.observe = 0 

224 observation_is_over = asyncio.get_event_loop().create_future() 

225 

226 if options.content_format: 

227 try: 

228 request.opt.content_format = ContentFormat(int(options.content_format)) 

229 except ValueError: 

230 try: 

231 request.opt.content_format = ContentFormat.by_media_type(options.content_format) 

232 except KeyError: 

233 raise parser.error("Unknown content format") 

234 

235 if options.payload: 

236 if options.payload.startswith('@'): 

237 filename = options.payload[1:] 

238 if filename == "-": 

239 f = sys.stdin.buffer 

240 else: 

241 f = open(filename, 'rb') 

242 try: 

243 request.payload = f.read() 

244 except OSError as e: 

245 raise parser.error("File could not be opened: %s"%e) 

246 else: 

247 if contenttype.categorize( 

248 request.opt.content_format.media_type 

249 if request.opt.content_format is not None and 

250 request.opt.content_format.is_known() 

251 else "" 

252 ) == 'cbor': 

253 try: 

254 import cbor2 as cbor 

255 except ImportError as e: 

256 raise parser.error("CBOR recoding not available (%s)" % e) 

257 import json 

258 try: 

259 decoded = json.loads(options.payload) 

260 except json.JSONDecodeError as e: 

261 import ast 

262 try: 

263 decoded = ast.literal_eval(options.payload) 

264 except ValueError: 

265 raise parser.error("JSON and Python recoding failed. Make sure quotation marks are escaped from the shell. JSON error: %s" % e) 

266 request.payload = cbor.dumps(decoded) 

267 else: 

268 request.payload = options.payload.encode('utf8') 

269 

270 if options.payload_initial_szx is not None: 

271 request.opt.block1 = aiocoap.optiontypes.BlockOption.BlockwiseTuple( 

272 0, 

273 False, 

274 options.payload_initial_szx, 

275 ) 

276 

277 if options.proxy is None: 

278 interface = context 

279 else: 

280 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

281 

282 try: 

283 requested_uri = request.get_request_uri() 

284 

285 requester = interface.request(request) 

286 

287 if options.observe: 

288 requester.observation.register_errback(observation_is_over.set_result) 

289 requester.observation.register_callback(lambda data, options=options: incoming_observation(options, data)) 

290 

291 try: 

292 response_data = await requester.response 

293 except aiocoap.error.ResolutionError as e: 

294 print("Name resolution error:", e, file=sys.stderr) 

295 sys.exit(1) 

296 except aiocoap.error.NetworkError as e: 

297 print("Network error:", e, file=sys.stderr) 

298 sys.exit(1) 

299 # Fallback while not all backends raise NetworkErrors 

300 except OSError as e: 

301 text = str(e) 

302 if not text: 

303 text = repr(e) 

304 if not text: 

305 # eg ConnectionResetError flying out of a misconfigured SSL server 

306 text = type(e) 

307 print("Error:", text, file=sys.stderr) 

308 sys.exit(1) 

309 

310 response_uri = response_data.get_request_uri() 

311 if requested_uri != response_uri: 

312 print("Response arrived from different address; base URI is", 

313 response_uri, file=sys.stderr) 

314 if response_data.code.is_successful(): 

315 present(response_data, options) 

316 else: 

317 print(colored(response_data.code, options, 'red'), file=sys.stderr) 

318 present(response_data, options, file=sys.stderr) 

319 sys.exit(1) 

320 

321 if options.observe: 

322 exit_reason = await observation_is_over 

323 print("Observation is over: %r"%(exit_reason,), file=sys.stderr) 

324 finally: 

325 if not requester.response.done(): 

326 requester.response.cancel() 

327 if options.observe and not requester.observation.cancelled: 

328 requester.observation.cancel() 

329 

330interactive_expecting_keyboard_interrupt = None 

331 

332async def interactive(): 

333 global interactive_expecting_keyboard_interrupt 

334 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

335 

336 context = await aiocoap.Context.create_client_context() 

337 

338 while True: 

339 try: 

340 # when http://bugs.python.org/issue22412 is resolved, use that instead 

341 line = await asyncio.get_event_loop().run_in_executor(None, lambda: input("aiocoap> ")) 

342 except EOFError: 

343 line = "exit" 

344 line = shlex.split(line) 

345 if not line: 

346 continue 

347 if line in (["help"], ["?"]): 

348 line = ["--help"] 

349 if line in (["quit"], ["q"], ["exit"]): 

350 return 

351 

352 current_task = asyncio.create_task( 

353 single_request(line, context=context), 

354 **py38args(name="Interactive prompt command %r" % line) 

355 ) 

356 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

357 

358 done, pending = await asyncio.wait([current_task, interactive_expecting_keyboard_interrupt], return_when=asyncio.FIRST_COMPLETED) 

359 

360 if current_task not in done: 

361 current_task.cancel() 

362 else: 

363 try: 

364 await current_task 

365 except SystemExit as e: 

366 if e.code != 0: 

367 print("Exit code: %d"%e.code, file=sys.stderr) 

368 continue 

369 except Exception as e: 

370 print("Unhandled exception raised: %s"%(e,)) 

371 

372def sync_main(args=None): 

373 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

374 if args is None: 

375 args = sys.argv[1:] 

376 

377 if '--interactive' not in args: 

378 try: 

379 asyncio.run(single_request(args)) 

380 except KeyboardInterrupt: 

381 sys.exit(3) 

382 else: 

383 if len(args) != 1: 

384 print("No other arguments must be specified when entering interactive mode", file=sys.stderr) 

385 sys.exit(1) 

386 

387 loop = asyncio.get_event_loop() 

388 task = loop.create_task( 

389 interactive(), 

390 **py38args(name="Interactive prompt") 

391 ) 

392 

393 while not task.done(): 

394 try: 

395 loop.run_until_complete(task) 

396 except KeyboardInterrupt: 

397 if not interactive_expecting_keyboard_interrupt.done(): 

398 interactive_expecting_keyboard_interrupt.set_result(None) 

399 except SystemExit: 

400 continue # asyncio/tasks.py(242) raises those after setting them as results, but we particularly want them back in the loop 

401 

402if __name__ == "__main__": 

403 sync_main()