Coverage for aiocoap/options.py: 95%
138 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5from itertools import chain
7from .numbers.optionnumbers import OptionNumber
8from .error import UnparsableMessage
10def _read_extended_field_value(value, rawdata):
11 """Used to decode large values of option delta and option length
12 from raw binary form."""
13 if value >= 0 and value < 13:
14 return (value, rawdata)
15 elif value == 13:
16 if len(rawdata) < 1:
17 raise UnparsableMessage("Option ended prematurely")
18 return (rawdata[0] + 13, rawdata[1:])
19 elif value == 14:
20 if len(rawdata) < 2:
21 raise UnparsableMessage("Option ended prematurely")
22 return (int.from_bytes(rawdata[:2], 'big') + 269, rawdata[2:])
23 else:
24 raise UnparsableMessage("Option contained partial payload marker.")
27def _write_extended_field_value(value):
28 """Used to encode large values of option delta and option length
29 into raw binary form.
30 In CoAP option delta and length can be represented by a variable
31 number of bytes depending on the value."""
32 if value >= 0 and value < 13:
33 return (value, b'')
34 elif value >= 13 and value < 269:
35 return (13, (value - 13).to_bytes(1, 'big'))
36 elif value >= 269 and value < 65804:
37 return (14, (value - 269).to_bytes(2, 'big'))
38 else:
39 raise ValueError("Value out of range.")
42def _single_value_view(option_number, doc=None):
43 """Generate a property for a given option number, where the option is not
44 repeatable. For getting, it will return the value of the first option
45 object with matching number. For setting, it will remove all options with
46 that number and create one with the given value. The property can be
47 deleted, resulting in removal of the option from the header.
49 For consistency, setting the value to None also clears the option. (Note
50 that with the currently implemented optiontypes, None is not a valid value
51 for any of them)."""
53 def _getter(self, option_number=option_number):
54 options = self.get_option(option_number)
55 if not options:
56 return None
57 else:
58 return options[0].value
60 def _setter(self, value, option_number=option_number):
61 self.delete_option(option_number)
62 if value is not None:
63 self.add_option(option_number.create_option(value=value))
65 def _deleter(self, option_number=option_number):
66 self.delete_option(option_number)
68 return property(_getter, _setter, _deleter, doc or "Single-value view on the %s option." % option_number)
70def _items_view(option_number, doc=None):
71 """Generate a property for a given option number, where the option is
72 repeatable. For getting, it will return a tuple of the values of the option
73 objects with matching number. For setting, it will remove all options with
74 that number and create new ones from the given iterable."""
76 def _getter(self, option_number=option_number):
77 return tuple(o.value for o in self.get_option(option_number))
79 def _setter(self, value, option_number=option_number):
80 self.delete_option(option_number)
81 for v in value:
82 self.add_option(option_number.create_option(value=v))
84 def _deleter(self, option_number=option_number):
85 self.delete_option(option_number)
87 return property(_getter, _setter, _deleter, doc=doc or "Iterable view on the %s option." % option_number)
89def _empty_presence_view(option_number, doc=None):
90 """Generate a property for a given option number, where the option is not
91 repeatable and (usually) empty. The values True and False are mapped to
92 presence and absence of the option."""
94 def _getter(self, option_number=option_number):
95 return bool(self.get_option(option_number))
97 def _setter(self, value, option_number=option_number):
98 self.delete_option(option_number)
99 if value:
100 self.add_option(option_number.create_option())
102 return property(_getter, _setter, doc=doc or "Presence of the %s option." % option_number)
104class Options(object):
105 """Represent CoAP Header Options."""
107 # this is not so much an optimization as a safeguard -- if custom
108 # attributes were placed here, they could be accessed but would not be
109 # serialized
110 __slots__ = ["_options"]
112 def __init__(self):
113 self._options = {}
115 def __eq__(self, other):
116 if not isinstance(other, Options):
117 return NotImplemented
118 # this implementation is much easier than implementing equality on
119 # StringOption etc
120 return self.encode() == other.encode()
122 def __repr__(self):
123 text = ", ".join("%s: %s" % (OptionNumber(k), " / ".join(map(str, v))) for (k, v) in self._options.items())
124 return "<aiocoap.options.Options at %#x: %s>" % (id(self), text or "empty")
126 def _repr_html_(self):
127 if self._options:
128 n_opt = sum(len(o) for o in self._options.values())
129 items = (f'<li value="{int(k)}">{OptionNumber(k)._repr_html_()}: {", ".join(vi._repr_html_() for vi in v)}' for (k, v) in sorted(self._options.items()))
130 return f"""<details><summary style="display:list-item">{n_opt} option{'s' if n_opt != 1 else ''}</summary><ol>{''.join(items)}</ol></details>"""
131 else:
132 return "<div>No options</div>"
134 def decode(self, rawdata):
135 """Passed a CoAP message body after the token as rawdata, fill self
136 with the options starting at the beginning of rawdata, an return the
137 rest of the message (the body)."""
138 option_number = OptionNumber(0)
140 while rawdata:
141 if rawdata[0] == 0xFF:
142 return rawdata[1:]
143 dllen = rawdata[0]
144 delta = (dllen & 0xF0) >> 4
145 length = (dllen & 0x0F)
146 rawdata = rawdata[1:]
147 (delta, rawdata) = _read_extended_field_value(delta, rawdata)
148 (length, rawdata) = _read_extended_field_value(length, rawdata)
149 option_number += delta
150 if len(rawdata) < length:
151 raise UnparsableMessage("Option announced but absent")
152 option = option_number.create_option(decode=rawdata[:length])
153 self.add_option(option)
154 rawdata = rawdata[length:]
155 return b''
157 def encode(self):
158 """Encode all options in option header into string of bytes."""
159 data = []
160 current_opt_num = 0
161 for option in self.option_list():
162 optiondata = option.encode()
164 delta, extended_delta = _write_extended_field_value(option.number - current_opt_num)
165 length, extended_length = _write_extended_field_value(len(optiondata))
167 data.append(bytes([((delta & 0x0F) << 4) + (length & 0x0F)]))
168 data.append(extended_delta)
169 data.append(extended_length)
170 data.append(optiondata)
172 current_opt_num = option.number
174 return b''.join(data)
176 def add_option(self, option):
177 """Add option into option header."""
178 self._options.setdefault(option.number, []).append(option)
180 def delete_option(self, number):
181 """Delete option from option header."""
182 if number in self._options:
183 self._options.pop(number)
185 def get_option(self, number):
186 """Get option with specified number."""
187 return self._options.get(number, ())
189 def option_list(self):
190 return chain.from_iterable(sorted(self._options.values(), key=lambda x: x[0].number))
192 uri_path = _items_view(OptionNumber.URI_PATH)
193 uri_query = _items_view(OptionNumber.URI_QUERY)
194 location_path = _items_view(OptionNumber.LOCATION_PATH)
195 location_query = _items_view(OptionNumber.LOCATION_QUERY)
196 block2 = _single_value_view(OptionNumber.BLOCK2)
197 block1 = _single_value_view(OptionNumber.BLOCK1)
198 content_format = _single_value_view(OptionNumber.CONTENT_FORMAT)
199 etag = _single_value_view(OptionNumber.ETAG, "Single ETag as used in responses")
200 etags = _items_view(OptionNumber.ETAG, "List of ETags as used in requests")
201 if_none_match = _empty_presence_view(OptionNumber.IF_NONE_MATCH)
202 observe = _single_value_view(OptionNumber.OBSERVE)
203 accept = _single_value_view(OptionNumber.ACCEPT)
204 uri_host = _single_value_view(OptionNumber.URI_HOST)
205 uri_port = _single_value_view(OptionNumber.URI_PORT)
206 proxy_uri = _single_value_view(OptionNumber.PROXY_URI)
207 proxy_scheme = _single_value_view(OptionNumber.PROXY_SCHEME)
208 size1 = _single_value_view(OptionNumber.SIZE1)
209 object_security = _single_value_view(OptionNumber.OBJECT_SECURITY)
210 max_age = _single_value_view(OptionNumber.MAX_AGE)
211 if_match = _items_view(OptionNumber.IF_MATCH)
212 no_response = _single_value_view(OptionNumber.NO_RESPONSE)
213 echo = _single_value_view(OptionNumber.ECHO)
214 request_tag = _items_view(OptionNumber.REQUEST_TAG)
215 hop_limit = _single_value_view(OptionNumber.HOP_LIMIT)
216 request_hash = _single_value_view(OptionNumber.REQUEST_HASH,
217 "Experimental property for draft-amsuess-core-cachable-oscore")
218 edhoc = _empty_presence_view(OptionNumber.EDHOC)
219 size2 = _single_value_view(OptionNumber.SIZE2)