Coverage for aiocoap/util/asyncio/recvmsg.py: 73%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1# This file is part of the Python aiocoap library project. 

2# 

3# Copyright (c) 2012-2014 Maciej Wasilak <http://sixpinetrees.blogspot.com/>, 

4# 2013-2014 Christian Amsüss <c.amsuess@energyharvesting.at> 

5# 

6# aiocoap is free software, this file is published under the MIT license as 

7# described in the accompanying LICENSE file. 

8 

9from .. import socknumbers 

10 

11from asyncio import BaseProtocol 

12from asyncio.transports import BaseTransport 

13 

14class RecvmsgDatagramProtocol(BaseProtocol): 

15 """Callback interface similar to asyncio.DatagramProtocol, but dealing with 

16 recvmsg data.""" 

17 

18 def datagram_msg_received(self, data, ancdata, flags, address): 

19 """Called when some datagram is received.""" 

20 

21 def datagram_errqueue_received(self, data, ancdata, flags, address): 

22 """Called when some data is received from the error queue""" 

23 

24 def error_received(self, exc): 

25 """Called when a send or receive operation raises an OSError.""" 

26 

27def _set_result_unless_cancelled(fut, result): 

28 """Helper setting the result only if the future was not cancelled.""" 

29 if fut.cancelled(): 

30 return 

31 fut.set_result(result) 

32 

33class RecvmsgSelectorDatagramTransport(BaseTransport): 

34 """A simple loop-independent transport that largely mimicks 

35 DatagramTransport but interfaces a RecvmsgSelectorDatagramProtocol. 

36 

37 This does not implement any flow control, based on the assumption that it's 

38 not needed, for CoAP has its own flow control mechanisms.""" 

39 

40 max_size = 4096 # Buffer size passed to recvmsg() -- should suffice for a full MTU package and ample ancdata 

41 

42 def __init__(self, loop, sock, protocol, waiter): 

43 super().__init__(extra={'socket': sock}) 

44 self.__sock = sock 

45 # Persisted outside of sock because when GC breaks a reference cycle, 

46 # it can happen that the sock gets closed before this; we have to hope 

47 # that no new file gets opened and registered in the meantime. 

48 self.__sock_fileno = sock.fileno() 

49 self._loop = loop 

50 self._protocol = protocol 

51 

52 loop.call_soon(protocol.connection_made, self) 

53 # only start reading when connection_made() has been called 

54 import weakref 

55 # We could add error handling in here like this: 

56 # ``` 

57 # self = s() 

58 # if self is None or self.__sock is None: 

59 # # The read event happened briefly before .close() was called, 

60 # # but late enough that the caller of close did not yield to let 

61 # # the event out; when remove_reader was then called, the 

62 # # pending event was not removed, so it fires now that the 

63 # # socket is already closed. (Depending on the GC's whims, self 

64 # # may or may not have been GC'd, but if it wasn't yet, the 

65 # # closed state is indicated by the lack of a __sock. 

66 # # 

67 # # Thus, silently (preferably with an ICMP error, but really 

68 # # can't do that)... 

69 # return 

70 # ``` 

71 # That was done tentatively while debugging errors flying out of 

72 # _read_ready, but it turned out that this was not the actual error 

73 # source. Thus, I'm not adding the handler and assuming that close's 

74 # remove_reader is not racing against callbacks, and thus that s() is 

75 # always valid while the transport is around (and the weakref is really 

76 # only used to break up the reference cycles to ensure the GC is not 

77 # needed here). 

78 def rr(s=weakref.ref(self)): 

79 s()._read_ready() 

80 loop.call_soon(loop.add_reader, self.__sock_fileno, rr) 

81 loop.call_soon(_set_result_unless_cancelled, waiter, None) 

82 

83 def close(self): 

84 if self.__sock is None: 

85 return 

86 

87 if not self._loop.is_closed(): 

88 self._loop.call_soon(self._protocol.connection_lost, None) 

89 

90 self._loop.remove_reader(self.__sock_fileno) 

91 self.__sock.close() 

92 self.__sock = None 

93 self._protocol = None 

94 self._loop = None 

95 

96 def __del__(self): 

97 if self.__sock is not None: 

98 self.close() 

99 

100 def _read_ready(self): 

101 if socknumbers.HAS_RECVERR: 

102 try: 

103 data, ancdata, flags, addr = self.__sock.recvmsg(self.max_size, 1024, socknumbers.MSG_ERRQUEUE) 

104 except (BlockingIOError, InterruptedError): 

105 pass 

106 except OSError as exc: 

107 if repr(exc) == "OSError('received malformed or improperly truncated ancillary data',)": 

108 pass # workaround for https://bitbucket.org/pypy/pypy/issues/2649/recvmsg-with-empty-err-queue-raises-odd 

109 else: 

110 self._protocol.error_received(exc) 

111 except Exception as exc: 

112 self._fatal_error(exc, 'Fatal read error on datagram transport') 

113 else: 

114 self._protocol.datagram_errqueue_received(data, ancdata, flags, addr) 

115 

116 # copied and modified from _SelectorDatagramTransport 

117 try: 

118 data, ancdata, flags, addr = self.__sock.recvmsg(self.max_size, 1024) # TODO: find a way for the application to tell the trensport how much data is expected 

119 except (BlockingIOError, InterruptedError): 

120 pass 

121 except OSError as exc: 

122 self._protocol.error_received(exc) 

123 except Exception as exc: 

124 self._fatal_error(exc, 'Fatal read error on datagram transport') 

125 else: 

126 self._protocol.datagram_msg_received(data, ancdata, flags, addr) 

127 

128 def sendmsg(self, data, ancdata, flags, address): 

129 try: 

130 self.__sock.sendmsg((data,), ancdata, flags, address) 

131 return 

132 except OSError as exc: 

133 self._protocol.error_received(exc) 

134 return 

135 except Exception as exc: 

136 self._fatal_error(exc, 

137 'Fatal write error on datagram transport') 

138 return 

139 

140async def create_recvmsg_datagram_endpoint(loop, factory, sock): 

141 """Create a datagram connection that uses recvmsg rather than recvfrom, and 

142 a RecvmsgDatagramProtocol protocol type. 

143 

144 This is used like the create_datagram_endpoint method of an asyncio loop, 

145 but implemented in a generic way using the loop's add_reader method; thus, 

146 it's not a method of the loop but an independent function. 

147 

148 Due to the way it is used in aiocoap, socket is not an optional argument 

149 here; it could be were this module ever split off into a standalone 

150 package. 

151 """ 

152 sock.setblocking(False) 

153 

154 protocol = factory() 

155 waiter = loop.create_future() 

156 transport = RecvmsgSelectorDatagramTransport( 

157 loop, sock, protocol, waiter) 

158 

159 try: 

160 await waiter 

161 # see https://github.com/PyCQA/pycodestyle/issues/703 

162 except: # noqa: E722 

163 transport.close() 

164 raise 

165 

166 return transport, protocol