Coverage for aiocoap/options.py: 97%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

129 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9from itertools import chain 

10 

11from .numbers.optionnumbers import OptionNumber 

12from .error import UnparsableMessage 

13 

14def _read_extended_field_value(value, rawdata): 

15 """Used to decode large values of option delta and option length 

16 from raw binary form.""" 

17 if value >= 0 and value < 13: 

18 return (value, rawdata) 

19 elif value == 13: 

20 if len(rawdata) < 1: 

21 raise UnparsableMessage("Option ended prematurely") 

22 return (rawdata[0] + 13, rawdata[1:]) 

23 elif value == 14: 

24 if len(rawdata) < 2: 

25 raise UnparsableMessage("Option ended prematurely") 

26 return (int.from_bytes(rawdata[:2], 'big') + 269, rawdata[2:]) 

27 else: 

28 raise UnparsableMessage("Option contained partial payload marker.") 

29 

30 

31def _write_extended_field_value(value): 

32 """Used to encode large values of option delta and option length 

33 into raw binary form. 

34 In CoAP option delta and length can be represented by a variable 

35 number of bytes depending on the value.""" 

36 if value >= 0 and value < 13: 

37 return (value, b'') 

38 elif value >= 13 and value < 269: 

39 return (13, (value - 13).to_bytes(1, 'big')) 

40 elif value >= 269 and value < 65804: 

41 return (14, (value - 269).to_bytes(2, 'big')) 

42 else: 

43 raise ValueError("Value out of range.") 

44 

45 

46def _single_value_view(option_number, doc=None): 

47 """Generate a property for a given option number, where the option is not 

48 repeatable. For getting, it will return the value of the first option 

49 object with matching number. For setting, it will remove all options with 

50 that number and create one with the given value. The property can be 

51 deleted, resulting in removal of the option from the header. 

52 

53 For consistency, setting the value to None also clears the option. (Note 

54 that with the currently implemented optiontypes, None is not a valid value 

55 for any of them).""" 

56 

57 def _getter(self, option_number=option_number): 

58 options = self.get_option(option_number) 

59 if not options: 

60 return None 

61 else: 

62 return options[0].value 

63 

64 def _setter(self, value, option_number=option_number): 

65 self.delete_option(option_number) 

66 if value is not None: 

67 self.add_option(option_number.create_option(value=value)) 

68 

69 def _deleter(self, option_number=option_number): 

70 self.delete_option(option_number) 

71 

72 return property(_getter, _setter, _deleter, doc or "Single-value view on the %s option."%option_number) 

73 

74def _items_view(option_number, doc=None): 

75 """Generate a property for a given option number, where the option is 

76 repeatable. For getting, it will return a tuple of the values of the option 

77 objects with matching number. For setting, it will remove all options with 

78 that number and create new ones from the given iterable.""" 

79 

80 def _getter(self, option_number=option_number): 

81 return tuple(o.value for o in self.get_option(option_number)) 

82 

83 def _setter(self, value, option_number=option_number): 

84 self.delete_option(option_number) 

85 for v in value: 

86 self.add_option(option_number.create_option(value=v)) 

87 

88 def _deleter(self, option_number=option_number): 

89 self.delete_option(option_number) 

90 

91 return property(_getter, _setter, _deleter, doc=doc or "Iterable view on the %s option."%option_number) 

92 

93def _empty_presence_view(option_number, doc=None): 

94 """Generate a property for a given option number, where the option is not 

95 repeatable and (usually) empty. The values True and False are mapped to 

96 presence and absence of the option.""" 

97 

98 def _getter(self, option_number=option_number): 

99 return bool(self.get_option(option_number)) 

100 

101 def _setter(self, value, option_number=option_number): 

102 self.delete_option(option_number) 

103 if value: 

104 self.add_option(option_number.create_option()) 

105 

106 return property(_getter, _setter, doc=doc or "Presence of the %s option."%option_number) 

107 

108class Options(object): 

109 """Represent CoAP Header Options.""" 

110 

111 # this is not so much an optimization as a safeguard -- if custom 

112 # attributes were placed here, they could be accessed but would not be 

113 # serialized 

114 __slots__ = ["_options"] 

115 

116 def __init__(self): 

117 self._options = {} 

118 

119 def __eq__(self, other): 

120 if not isinstance(other, Options): 

121 return NotImplemented 

122 # this implementation is much easier than implementing equality on 

123 # StringOption etc 

124 return self.encode() == other.encode() 

125 

126 def __repr__(self): 

127 text = ", ".join("%s: %s"%(OptionNumber(k), " / ".join(map(str, v))) for (k, v) in self._options.items()) 

128 return "<aiocoap.options.Options at %#x: %s>"%(id(self), text or "empty") 

129 

130 def decode(self, rawdata): 

131 """Passed a CoAP message body after the token as rawdata, fill self 

132 with the options starting at the beginning of rawdata, an return the 

133 rest of the message (the body).""" 

134 option_number = OptionNumber(0) 

135 

136 while rawdata: 

137 if rawdata[0] == 0xFF: 

138 return rawdata[1:] 

139 dllen = rawdata[0] 

140 delta = (dllen & 0xF0) >> 4 

141 length = (dllen & 0x0F) 

142 rawdata = rawdata[1:] 

143 (delta, rawdata) = _read_extended_field_value(delta, rawdata) 

144 (length, rawdata) = _read_extended_field_value(length, rawdata) 

145 option_number += delta 

146 if len(rawdata) < length: 

147 raise UnparsableMessage("Option announced but absent") 

148 option = option_number.create_option(decode=rawdata[:length]) 

149 self.add_option(option) 

150 rawdata = rawdata[length:] 

151 return b'' 

152 

153 def encode(self): 

154 """Encode all options in option header into string of bytes.""" 

155 data = [] 

156 current_opt_num = 0 

157 for option in self.option_list(): 

158 optiondata = option.encode() 

159 

160 delta, extended_delta = _write_extended_field_value(option.number - current_opt_num) 

161 length, extended_length = _write_extended_field_value(len(optiondata)) 

162 

163 data.append(bytes([((delta & 0x0F) << 4) + (length & 0x0F)])) 

164 data.append(extended_delta) 

165 data.append(extended_length) 

166 data.append(optiondata) 

167 

168 current_opt_num = option.number 

169 

170 return b''.join(data) 

171 

172 def add_option(self, option): 

173 """Add option into option header.""" 

174 self._options.setdefault(option.number, []).append(option) 

175 

176 def delete_option(self, number): 

177 """Delete option from option header.""" 

178 if number in self._options: 

179 self._options.pop(number) 

180 

181 def get_option(self, number): 

182 """Get option with specified number.""" 

183 return self._options.get(number, ()) 

184 

185 def option_list(self): 

186 return chain.from_iterable(sorted(self._options.values(), key=lambda x: x[0].number)) 

187 

188 uri_path = _items_view(OptionNumber.URI_PATH) 

189 uri_query = _items_view(OptionNumber.URI_QUERY) 

190 location_path = _items_view(OptionNumber.LOCATION_PATH) 

191 location_query = _items_view(OptionNumber.LOCATION_QUERY) 

192 block2 = _single_value_view(OptionNumber.BLOCK2) 

193 block1 = _single_value_view(OptionNumber.BLOCK1) 

194 content_format = _single_value_view(OptionNumber.CONTENT_FORMAT) 

195 etag = _single_value_view(OptionNumber.ETAG, "Single ETag as used in responses") 

196 etags = _items_view(OptionNumber.ETAG, "List of ETags as used in requests") 

197 if_none_match = _empty_presence_view(OptionNumber.IF_NONE_MATCH) 

198 observe = _single_value_view(OptionNumber.OBSERVE) 

199 accept = _single_value_view(OptionNumber.ACCEPT) 

200 uri_host = _single_value_view(OptionNumber.URI_HOST) 

201 uri_port = _single_value_view(OptionNumber.URI_PORT) 

202 proxy_uri = _single_value_view(OptionNumber.PROXY_URI) 

203 proxy_scheme = _single_value_view(OptionNumber.PROXY_SCHEME) 

204 size1 = _single_value_view(OptionNumber.SIZE1) 

205 object_security = _single_value_view(OptionNumber.OBJECT_SECURITY) 

206 max_age = _single_value_view(OptionNumber.MAX_AGE) 

207 if_match = _items_view(OptionNumber.IF_MATCH) 

208 no_response = _single_value_view(OptionNumber.NO_RESPONSE) 

209 echo = _single_value_view(OptionNumber.ECHO) 

210 request_tag = _items_view(OptionNumber.REQUEST_TAG) 

211 request_hash = _single_value_view(OptionNumber.REQUEST_HASH)