Coverage for aiocoap/util/prettyprint.py: 78%
91 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""A pretty-printer for known mime types"""
7import json
8import re
10import pygments
11import pygments.lexers
12import pygments.formatters
14from aiocoap.util import linkformat, contenttype
16from aiocoap.util.linkformat_pygments import _register
18_register()
20MEDIATYPE_HEXDUMP = 'text/vnd.aiocoap.hexdump'
22def lexer_for_mime(mime):
23 """A wrapper around pygments.lexers.get_lexer_for_mimetype that takes
24 subtypes into consideration and catches the custom hexdump mime type."""
26 if mime == MEDIATYPE_HEXDUMP:
27 return pygments.lexers.HexdumpLexer()
29 if mime == 'text/plain;charset=utf8':
30 # We have fall-throughs in place anwyay, no need to go through a no-op
31 # TextLexer
32 raise pygments.util.ClassNotFound
34 try:
35 return pygments.lexers.get_lexer_for_mimetype(mime)
36 except pygments.util.ClassNotFound:
37 mime = re.sub('^([^/]+)/.*\\+([^;]+)(;.*)?$',
38 lambda args: args[1] + '/' + args[2], mime)
39 return pygments.lexers.get_lexer_for_mimetype(mime)
41def pretty_print(message):
42 """Given a CoAP message, reshape its payload into something human-readable.
43 The return value is a triple (infos, mime, text) where text represents the
44 payload, mime is a type that could be used to syntax-highlight the text
45 (not necessarily related to the original mime type, eg. a report of some
46 binary data that's shaped like Markdown could use a markdown mime type),
47 and some line of infos that give additional data (like the reason for a hex
48 dump or the original mime type).
50 >>> from aiocoap import Message
51 >>> def build(payload, request_cf, response_cf):
52 ... response = Message(payload=payload, content_format=response_cf)
53 ... request = Message(accept=request_cf)
54 ... response.request = request
55 ... return response
56 >>> pretty_print(Message(payload=b"Hello", content_format=0))
57 ([], 'text/plain;charset=utf8', 'Hello')
58 >>> print(pretty_print(Message(payload=b'{"hello":"world"}', content_format=50))[-1])
59 {
60 "hello": "world"
61 }
62 >>> # Erroneous inputs still go to the pretty printer as long as they're
63 >>> #Unicode
64 >>> pretty_print(Message(payload=b'{"hello":"world', content_format=50))
65 (['Invalid JSON not re-formated'], 'application/json', '{"hello":"world')
66 >>> pretty_print(Message(payload=b'<>,', content_format=40))
67 (['Invalid application/link-format content was not re-formatted'], 'application/link-format', '<>,')
68 >>> pretty_print(Message(payload=b'a', content_format=60)) # doctest: +ELLIPSIS
69 (['Showing hex dump of application/cbor payload: CBOR value is invalid'], 'text/vnd.aiocoap.hexdump', '00000000 61 ...
70 """
71 infos = []
72 info = infos.append
74 cf = message.opt.content_format or message.request.opt.accept
75 if cf is None:
76 content_type = "type unknown"
77 elif cf.is_known():
78 content_type = cf.media_type
79 if cf.encoding != 'identity':
80 info("Content format is %s in %s encoding; treating as "
81 "application/octet-stream because decompression is not "
82 "supported yet" % (cf.media_type, cf.encoding))
83 else:
84 content_type = "type %d" % cf
85 category = contenttype.categorize(content_type)
87 show_hex = None
89 if linkformat is not None and category == 'link-format':
90 try:
91 decoded = message.payload.decode('utf8')
92 try:
93 parsed = linkformat.link_header.parse(decoded)
94 except linkformat.link_header.ParseException:
95 info("Invalid application/link-format content was not re-formatted")
96 return (infos, 'application/link-format', decoded)
97 else:
98 info("application/link-format content was re-formatted")
99 prettyprinted = ",\n".join(str(l) for l in parsed.links)
100 return (infos, 'application/link-format', prettyprinted)
101 except ValueError:
102 # Handled later
103 pass
105 elif category in ('cbor', 'cbor-seq'):
106 if category == 'cbor-seq':
107 # Faking an indefinite length CBOR array is the easiest way to
108 # parse an array into a list-like data structure, especially as
109 # long as we don't indicate precise locations of invalid CBOR
110 # anyway
111 payload = b'\x9f' + message.payload + b'\xff'
112 else:
113 payload = message.payload
115 try:
116 import cbor_diag
118 formatted = cbor_diag.cbor2diag(payload)
120 if category == 'cbor-seq':
121 info("CBOR sequence message shown as array in Diagnostic Notation")
122 else:
123 info("CBOR message shown in Diagnostic Notation")
125 # It's not exactly CDDL, but it's close enough that the syntax
126 # highlighting looks OK, and tolerant enough to not complain about
127 # missing leading barewords and "=" signs
128 return (infos, 'text/x-cddl', formatted)
129 except ImportError:
130 show_hex = "No CBOR pretty-printer available"
131 except ValueError:
132 show_hex = "CBOR value is invalid"
134 elif category == 'json':
135 try:
136 decoded = message.payload.decode('utf8')
137 except ValueError:
138 pass
139 else:
140 try:
141 parsed = json.loads(decoded)
142 except ValueError:
143 info("Invalid JSON not re-formated")
144 return (infos, 'application/json', decoded)
145 else:
146 info("JSON re-formated and indented")
147 formatted = json.dumps(parsed, indent=4)
148 return (infos, 'application/json', formatted)
150 # That's about the formats we do for now.
152 if show_hex is None:
153 try:
154 text = message.payload.decode('utf8')
155 except UnicodeDecodeError:
156 show_hex = "Message can not be parsed as UTF-8"
157 else:
158 return (infos, 'text/plain;charset=utf8', text)
160 info("Showing hex dump of %s payload%s" % (
161 content_type if cf is not None else "untyped",
162 ": " + show_hex if show_hex is not None else ""))
163 data = message.payload
164 # Not the most efficient hex dumper, but we won't stream video over
165 # this anyway
166 formatted = []
167 offset = 0
168 while data:
169 line, data = data[:16], data[16:]
171 formatted.append("%08x " % offset +
172 " ".join("%02x" % line[i] if i < len(line) else " " for i in range(8)) + " " +
173 " ".join("%02x" % line[i] if i < len(line) else " " for i in range(8, 16)) + " |" +
174 "".join(chr(x) if 32 <= x < 127 else '.' for x in line) +
175 "|\n")
177 offset += len(line)
178 if offset % 16 != 0:
179 formatted.append("%08x\n" % offset)
180 return (infos, MEDIATYPE_HEXDUMP, "".join(formatted))