Coverage for aiocoap/util/prettyprint.py: 81%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

90 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9"""A pretty-printer for known mime types""" 

10 

11import json 

12import sys 

13import pprint 

14import re 

15 

16import cbor2 as cbor 

17import pygments 

18import pygments.lexers 

19import pygments.formatters 

20 

21from aiocoap.util import linkformat, contenttype 

22 

23from aiocoap.util.linkformat_pygments import _register 

24 

25_register() 

26 

27MEDIATYPE_HEXDUMP = 'text/vnd.aiocoap.hexdump' 

28 

29def lexer_for_mime(mime): 

30 """A wrapper around pygments.lexers.get_lexer_for_mimetype that takes 

31 subtypes into consideration and catches the custom hexdump mime type.""" 

32 

33 if mime == MEDIATYPE_HEXDUMP: 

34 return pygments.lexers.HexdumpLexer() 

35 

36 if mime == 'text/plain;charset=utf8': 

37 # We have fall-throughs in place anwyay, no need to go through a no-op 

38 # TextLexer 

39 raise pygments.util.ClassNotFound 

40 

41 try: 

42 return pygments.lexers.get_lexer_for_mimetype(mime) 

43 except pygments.util.ClassNotFound: 

44 mime = re.sub('^([^/]+)/.*\\+([^;]+)(;.*)?$', 

45 lambda args: args[1] + '/' + args[2], mime) 

46 return pygments.lexers.get_lexer_for_mimetype(mime) 

47 

48def pretty_print(message): 

49 """Given a CoAP message, reshape its payload into something human-readable. 

50 The return value is a triple (infos, mime, text) where text represents the 

51 payload, mime is a type that could be used to syntax-highlight the text 

52 (not necessarily related to the original mime type, eg. a report of some 

53 binary data that's shaped like Markdown could use a markdown mime type), 

54 and some line of infos that give additional data (like the reason for a hex 

55 dump or the original mime type). 

56 

57 >>> from aiocoap import Message 

58 >>> def build(payload, request_cf, response_cf): 

59 ... response = Message(payload=payload, content_format=response_cf) 

60 ... request = Message(accept=request_cf) 

61 ... response.request = request 

62 ... return response 

63 >>> pretty_print(Message(payload=b"Hello", content_format=0)) 

64 ([], 'text/plain;charset=utf8', 'Hello') 

65 >>> print(pretty_print(Message(payload=b'{"hello":"world"}', content_format=50))[-1]) 

66 { 

67 "hello": "world" 

68 } 

69 >>> # Erroneous inputs still go to the pretty printer as long as they're 

70 >>> #Unicode 

71 >>> pretty_print(Message(payload=b'{"hello":"world', content_format=50)) 

72 (['Invalid JSON not re-formated'], 'application/json', '{"hello":"world') 

73 >>> pretty_print(Message(payload=b'<>,', content_format=40)) 

74 (['Invalid application/link-format content was not re-formatted'], 'application/link-format', '<>,') 

75 >>> pretty_print(Message(payload=b'a', content_format=60)) # doctest: +ELLIPSIS 

76 (['Showing hex dump of application/cbor payload: CBOR value is invalid'], 'text/vnd.aiocoap.hexdump', '00000000 61 ... 

77 """ 

78 infos = [] 

79 info = infos.append 

80 

81 cf = message.opt.content_format or message.request.opt.accept 

82 if cf is None: 

83 content_type = "type unknown" 

84 elif cf.is_known(): 

85 content_type = cf.media_type 

86 if cf.encoding != 'identity': 

87 info("Content format is %s in %s encoding; treating as " 

88 "application/octet-stream because decompression is not " 

89 "supported yet" % (cf.media_type, cf.encoding)) 

90 else: 

91 content_type = "type %d" % cf 

92 category = contenttype.categorize(content_type) 

93 

94 show_hex = None 

95 

96 if linkformat is not None and category == 'link-format': 

97 try: 

98 decoded = message.payload.decode('utf8') 

99 try: 

100 parsed = linkformat.link_header.parse(decoded) 

101 except linkformat.link_header.ParseException: 

102 info("Invalid application/link-format content was not re-formatted") 

103 return (infos, 'application/link-format', decoded) 

104 else: 

105 info("application/link-format content was re-formatted") 

106 prettyprinted = ",\n".join(str(l) for l in parsed.links) 

107 return (infos, 'application/link-format', prettyprinted) 

108 except ValueError: 

109 # Handled later 

110 pass 

111 

112 elif category == 'cbor': 

113 try: 

114 parsed = cbor.loads(message.payload) 

115 except cbor.CBORDecodeError: 

116 show_hex = "CBOR value is invalid" 

117 else: 

118 info("CBOR message shown in naïve Python decoding") 

119 # Formatting it via Python b/c that's reliably available (as 

120 # opposed to JSON which might not round-trip well). The repr for 

121 # tags might still not be parsable, but I think chances of good 

122 # highlighting are best this way 

123 # 

124 # Not sorting dicts to give a more faithful representation of the 

125 # original CBOR message 

126 if sys.version_info >= (3, 8): 

127 printer = pprint.PrettyPrinter(sort_dicts=False) 

128 else: 

129 printer = pprint.PrettyPrinter() 

130 formatted = printer.pformat(parsed) 

131 return (infos, 'text/x-python3', formatted) 

132 

133 elif category == 'json': 

134 try: 

135 decoded = message.payload.decode('utf8') 

136 except ValueError: 

137 pass 

138 else: 

139 try: 

140 parsed = json.loads(decoded) 

141 except ValueError: 

142 info("Invalid JSON not re-formated") 

143 return (infos, 'application/json', decoded) 

144 else: 

145 info("JSON re-formated and indented") 

146 formatted = json.dumps(parsed, indent=4) 

147 return (infos, 'application/json', formatted) 

148 

149 # That's about the formats we do for now. 

150 

151 if show_hex is None: 

152 try: 

153 text = message.payload.decode('utf8') 

154 except UnicodeDecodeError: 

155 show_hex = "Message can not be parsed as UTF-8" 

156 else: 

157 return (infos, 'text/plain;charset=utf8', text) 

158 

159 info("Showing hex dump of %s payload%s" % ( 

160 content_type if cf is not None else "untyped", 

161 ": " + show_hex if show_hex is not None else "")) 

162 data = message.payload 

163 # Not the most efficient hex dumper, but we won't stream video over 

164 # this anyway 

165 formatted = [] 

166 offset = 0 

167 while data: 

168 line, data = data[:16], data[16:] 

169 

170 formatted.append("%08x " % offset + 

171 " ".join("%02x" % line[i] if i < len(line) else " " for i in range(8)) + " " + 

172 " ".join("%02x" % line[i] if i < len(line) else " " for i in range(8, 16)) + " |" + 

173 "".join(chr(x) if 32 <= x < 127 else '.' for x in line) + 

174 "|\n") 

175 

176 offset += len(line) 

177 if offset % 16 != 0: 

178 formatted.append("%08x\n" % offset) 

179 return (infos, MEDIATYPE_HEXDUMP, "".join(formatted))