Coverage for aiocoap/util/prettyprint.py: 78%

91 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-16 16:09 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""A pretty-printer for known mime types""" 

6 

7import json 

8import re 

9 

10import pygments 

11import pygments.lexers 

12import pygments.formatters 

13 

14from aiocoap.util import linkformat, contenttype 

15 

16from aiocoap.util.linkformat_pygments import _register 

17 

18_register() 

19 

20MEDIATYPE_HEXDUMP = 'text/vnd.aiocoap.hexdump' 

21 

22def lexer_for_mime(mime): 

23 """A wrapper around pygments.lexers.get_lexer_for_mimetype that takes 

24 subtypes into consideration and catches the custom hexdump mime type.""" 

25 

26 if mime == MEDIATYPE_HEXDUMP: 

27 return pygments.lexers.HexdumpLexer() 

28 

29 if mime == 'text/plain;charset=utf8': 

30 # We have fall-throughs in place anwyay, no need to go through a no-op 

31 # TextLexer 

32 raise pygments.util.ClassNotFound 

33 

34 try: 

35 return pygments.lexers.get_lexer_for_mimetype(mime) 

36 except pygments.util.ClassNotFound: 

37 mime = re.sub('^([^/]+)/.*\\+([^;]+)(;.*)?$', 

38 lambda args: args[1] + '/' + args[2], mime) 

39 return pygments.lexers.get_lexer_for_mimetype(mime) 

40 

41def pretty_print(message): 

42 """Given a CoAP message, reshape its payload into something human-readable. 

43 The return value is a triple (infos, mime, text) where text represents the 

44 payload, mime is a type that could be used to syntax-highlight the text 

45 (not necessarily related to the original mime type, eg. a report of some 

46 binary data that's shaped like Markdown could use a markdown mime type), 

47 and some line of infos that give additional data (like the reason for a hex 

48 dump or the original mime type). 

49 

50 >>> from aiocoap import Message 

51 >>> def build(payload, request_cf, response_cf): 

52 ... response = Message(payload=payload, content_format=response_cf) 

53 ... request = Message(accept=request_cf) 

54 ... response.request = request 

55 ... return response 

56 >>> pretty_print(Message(payload=b"Hello", content_format=0)) 

57 ([], 'text/plain;charset=utf8', 'Hello') 

58 >>> print(pretty_print(Message(payload=b'{"hello":"world"}', content_format=50))[-1]) 

59 { 

60 "hello": "world" 

61 } 

62 >>> # Erroneous inputs still go to the pretty printer as long as they're 

63 >>> #Unicode 

64 >>> pretty_print(Message(payload=b'{"hello":"world', content_format=50)) 

65 (['Invalid JSON not re-formated'], 'application/json', '{"hello":"world') 

66 >>> pretty_print(Message(payload=b'<>,', content_format=40)) 

67 (['Invalid application/link-format content was not re-formatted'], 'application/link-format', '<>,') 

68 >>> pretty_print(Message(payload=b'a', content_format=60)) # doctest: +ELLIPSIS 

69 (['Showing hex dump of application/cbor payload: CBOR value is invalid'], 'text/vnd.aiocoap.hexdump', '00000000 61 ... 

70 """ 

71 infos = [] 

72 info = infos.append 

73 

74 cf = message.opt.content_format or message.request.opt.accept 

75 if cf is None: 

76 content_type = "type unknown" 

77 elif cf.is_known(): 

78 content_type = cf.media_type 

79 if cf.encoding != 'identity': 

80 info("Content format is %s in %s encoding; treating as " 

81 "application/octet-stream because decompression is not " 

82 "supported yet" % (cf.media_type, cf.encoding)) 

83 else: 

84 content_type = "type %d" % cf 

85 category = contenttype.categorize(content_type) 

86 

87 show_hex = None 

88 

89 if linkformat is not None and category == 'link-format': 

90 try: 

91 decoded = message.payload.decode('utf8') 

92 try: 

93 parsed = linkformat.link_header.parse(decoded) 

94 except linkformat.link_header.ParseException: 

95 info("Invalid application/link-format content was not re-formatted") 

96 return (infos, 'application/link-format', decoded) 

97 else: 

98 info("application/link-format content was re-formatted") 

99 prettyprinted = ",\n".join(str(l) for l in parsed.links) 

100 return (infos, 'application/link-format', prettyprinted) 

101 except ValueError: 

102 # Handled later 

103 pass 

104 

105 elif category in ('cbor', 'cbor-seq'): 

106 if category == 'cbor-seq': 

107 # Faking an indefinite length CBOR array is the easiest way to 

108 # parse an array into a list-like data structure, especially as 

109 # long as we don't indicate precise locations of invalid CBOR 

110 # anyway 

111 payload = b'\x9f' + message.payload + b'\xff' 

112 else: 

113 payload = message.payload 

114 

115 try: 

116 import cbor_diag 

117 

118 formatted = cbor_diag.cbor2diag(payload) 

119 

120 if category == 'cbor-seq': 

121 info("CBOR sequence message shown as array in Diagnostic Notation") 

122 else: 

123 info("CBOR message shown in Diagnostic Notation") 

124 

125 # It's not exactly CDDL, but it's close enough that the syntax 

126 # highlighting looks OK, and tolerant enough to not complain about 

127 # missing leading barewords and "=" signs 

128 return (infos, 'text/x-cddl', formatted) 

129 except ImportError: 

130 show_hex = "No CBOR pretty-printer available" 

131 except ValueError: 

132 show_hex = "CBOR value is invalid" 

133 

134 elif category == 'json': 

135 try: 

136 decoded = message.payload.decode('utf8') 

137 except ValueError: 

138 pass 

139 else: 

140 try: 

141 parsed = json.loads(decoded) 

142 except ValueError: 

143 info("Invalid JSON not re-formated") 

144 return (infos, 'application/json', decoded) 

145 else: 

146 info("JSON re-formated and indented") 

147 formatted = json.dumps(parsed, indent=4) 

148 return (infos, 'application/json', formatted) 

149 

150 # That's about the formats we do for now. 

151 

152 if show_hex is None: 

153 try: 

154 text = message.payload.decode('utf8') 

155 except UnicodeDecodeError: 

156 show_hex = "Message can not be parsed as UTF-8" 

157 else: 

158 return (infos, 'text/plain;charset=utf8', text) 

159 

160 info("Showing hex dump of %s payload%s" % ( 

161 content_type if cf is not None else "untyped", 

162 ": " + show_hex if show_hex is not None else "")) 

163 data = message.payload 

164 # Not the most efficient hex dumper, but we won't stream video over 

165 # this anyway 

166 formatted = [] 

167 offset = 0 

168 while data: 

169 line, data = data[:16], data[16:] 

170 

171 formatted.append("%08x " % offset + 

172 " ".join("%02x" % line[i] if i < len(line) else " " for i in range(8)) + " " + 

173 " ".join("%02x" % line[i] if i < len(line) else " " for i in range(8, 16)) + " |" + 

174 "".join(chr(x) if 32 <= x < 127 else '.' for x in line) + 

175 "|\n") 

176 

177 offset += len(line) 

178 if offset % 16 != 0: 

179 formatted.append("%08x\n" % offset) 

180 return (infos, MEDIATYPE_HEXDUMP, "".join(formatted))