Coverage for aiocoap/util/asyncio/recvmsg.py: 74%

81 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-16 16:09 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5from .. import socknumbers 

6 

7from asyncio import BaseProtocol 

8from asyncio.transports import BaseTransport 

9 

10class RecvmsgDatagramProtocol(BaseProtocol): 

11 """Callback interface similar to asyncio.DatagramProtocol, but dealing with 

12 recvmsg data.""" 

13 

14 def datagram_msg_received(self, data, ancdata, flags, address): 

15 """Called when some datagram is received.""" 

16 

17 def datagram_errqueue_received(self, data, ancdata, flags, address): 

18 """Called when some data is received from the error queue""" 

19 

20 def error_received(self, exc): 

21 """Called when a send or receive operation raises an OSError.""" 

22 

23def _set_result_unless_cancelled(fut, result): 

24 """Helper setting the result only if the future was not cancelled.""" 

25 if fut.cancelled(): 

26 return 

27 fut.set_result(result) 

28 

29class RecvmsgSelectorDatagramTransport(BaseTransport): 

30 """A simple loop-independent transport that largely mimicks 

31 DatagramTransport but interfaces a RecvmsgSelectorDatagramProtocol. 

32 

33 This does not implement any flow control, based on the assumption that it's 

34 not needed, for CoAP has its own flow control mechanisms.""" 

35 

36 max_size = 4096 # Buffer size passed to recvmsg() -- should suffice for a full MTU package and ample ancdata 

37 

38 def __init__(self, loop, sock, protocol, waiter): 

39 super().__init__(extra={'socket': sock}) 

40 self.__sock = sock 

41 # Persisted outside of sock because when GC breaks a reference cycle, 

42 # it can happen that the sock gets closed before this; we have to hope 

43 # that no new file gets opened and registered in the meantime. 

44 self.__sock_fileno = sock.fileno() 

45 self._loop = loop 

46 self._protocol = protocol 

47 

48 loop.call_soon(protocol.connection_made, self) 

49 # only start reading when connection_made() has been called 

50 import weakref 

51 # We could add error handling in here like this: 

52 # ``` 

53 # self = s() 

54 # if self is None or self.__sock is None: 

55 # # The read event happened briefly before .close() was called, 

56 # # but late enough that the caller of close did not yield to let 

57 # # the event out; when remove_reader was then called, the 

58 # # pending event was not removed, so it fires now that the 

59 # # socket is already closed. (Depending on the GC's whims, self 

60 # # may or may not have been GC'd, but if it wasn't yet, the 

61 # # closed state is indicated by the lack of a __sock. 

62 # # 

63 # # Thus, silently (preferably with an ICMP error, but really 

64 # # can't do that)... 

65 # return 

66 # ``` 

67 # That was done tentatively while debugging errors flying out of 

68 # _read_ready, but it turned out that this was not the actual error 

69 # source. Thus, I'm not adding the handler and assuming that close's 

70 # remove_reader is not racing against callbacks, and thus that s() is 

71 # always valid while the transport is around (and the weakref is really 

72 # only used to break up the reference cycles to ensure the GC is not 

73 # needed here). 

74 def rr(s=weakref.ref(self)): 

75 s()._read_ready() 

76 loop.call_soon(loop.add_reader, self.__sock_fileno, rr) 

77 loop.call_soon(_set_result_unless_cancelled, waiter, None) 

78 

79 def close(self): 

80 if self.__sock is None: 

81 return 

82 

83 if not self._loop.is_closed(): 

84 self._loop.call_soon(self._protocol.connection_lost, None) 

85 

86 self._loop.remove_reader(self.__sock_fileno) 

87 self.__sock.close() 

88 self.__sock = None 

89 self._protocol = None 

90 self._loop = None 

91 

92 def __del__(self): 

93 if self.__sock is not None: 

94 self.close() 

95 

96 def _read_ready(self): 

97 if socknumbers.HAS_RECVERR: 

98 try: 

99 data, ancdata, flags, addr = self.__sock.recvmsg(self.max_size, 1024, socknumbers.MSG_ERRQUEUE) 

100 except (BlockingIOError, InterruptedError): 

101 pass 

102 except OSError as exc: 

103 if repr(exc) == "OSError('received malformed or improperly truncated ancillary data',)": 

104 pass # workaround for https://bitbucket.org/pypy/pypy/issues/2649/recvmsg-with-empty-err-queue-raises-odd 

105 else: 

106 self._protocol.error_received(exc) 

107 except Exception as exc: 

108 self._fatal_error(exc, 'Fatal read error on datagram transport') 

109 else: 

110 self._protocol.datagram_errqueue_received(data, ancdata, flags, addr) 

111 

112 # copied and modified from _SelectorDatagramTransport 

113 try: 

114 data, ancdata, flags, addr = self.__sock.recvmsg(self.max_size, 1024) # TODO: find a way for the application to tell the trensport how much data is expected 

115 except (BlockingIOError, InterruptedError): 

116 pass 

117 except OSError as exc: 

118 self._protocol.error_received(exc) 

119 except Exception as exc: 

120 self._fatal_error(exc, 'Fatal read error on datagram transport') 

121 else: 

122 self._protocol.datagram_msg_received(data, ancdata, flags, addr) 

123 

124 def sendmsg(self, data, ancdata, flags, address): 

125 try: 

126 self.__sock.sendmsg((data,), ancdata, flags, address) 

127 return 

128 except OSError as exc: 

129 self._protocol.error_received(exc) 

130 return 

131 except Exception as exc: 

132 self._fatal_error(exc, 

133 'Fatal write error on datagram transport') 

134 return 

135 

136async def create_recvmsg_datagram_endpoint(loop, factory, sock): 

137 """Create a datagram connection that uses recvmsg rather than recvfrom, and 

138 a RecvmsgDatagramProtocol protocol type. 

139 

140 This is used like the create_datagram_endpoint method of an asyncio loop, 

141 but implemented in a generic way using the loop's add_reader method; thus, 

142 it's not a method of the loop but an independent function. 

143 

144 Due to the way it is used in aiocoap, socket is not an optional argument 

145 here; it could be were this module ever split off into a standalone 

146 package. 

147 """ 

148 sock.setblocking(False) 

149 

150 protocol = factory() 

151 waiter = loop.create_future() 

152 transport = RecvmsgSelectorDatagramTransport( 

153 loop, sock, protocol, waiter) 

154 

155 try: 

156 await waiter 

157 # see https://github.com/PyCQA/pycodestyle/issues/703 

158 except: # noqa: E722 

159 transport.close() 

160 raise 

161 

162 return transport, protocol