Coverage for aiocoap/util/asyncio/recvmsg.py: 74%
81 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-16 16:09 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5from .. import socknumbers
7from asyncio import BaseProtocol
8from asyncio.transports import BaseTransport
10class RecvmsgDatagramProtocol(BaseProtocol):
11 """Callback interface similar to asyncio.DatagramProtocol, but dealing with
12 recvmsg data."""
14 def datagram_msg_received(self, data, ancdata, flags, address):
15 """Called when some datagram is received."""
17 def datagram_errqueue_received(self, data, ancdata, flags, address):
18 """Called when some data is received from the error queue"""
20 def error_received(self, exc):
21 """Called when a send or receive operation raises an OSError."""
23def _set_result_unless_cancelled(fut, result):
24 """Helper setting the result only if the future was not cancelled."""
25 if fut.cancelled():
26 return
27 fut.set_result(result)
29class RecvmsgSelectorDatagramTransport(BaseTransport):
30 """A simple loop-independent transport that largely mimicks
31 DatagramTransport but interfaces a RecvmsgSelectorDatagramProtocol.
33 This does not implement any flow control, based on the assumption that it's
34 not needed, for CoAP has its own flow control mechanisms."""
36 max_size = 4096 # Buffer size passed to recvmsg() -- should suffice for a full MTU package and ample ancdata
38 def __init__(self, loop, sock, protocol, waiter):
39 super().__init__(extra={'socket': sock})
40 self.__sock = sock
41 # Persisted outside of sock because when GC breaks a reference cycle,
42 # it can happen that the sock gets closed before this; we have to hope
43 # that no new file gets opened and registered in the meantime.
44 self.__sock_fileno = sock.fileno()
45 self._loop = loop
46 self._protocol = protocol
48 loop.call_soon(protocol.connection_made, self)
49 # only start reading when connection_made() has been called
50 import weakref
51 # We could add error handling in here like this:
52 # ```
53 # self = s()
54 # if self is None or self.__sock is None:
55 # # The read event happened briefly before .close() was called,
56 # # but late enough that the caller of close did not yield to let
57 # # the event out; when remove_reader was then called, the
58 # # pending event was not removed, so it fires now that the
59 # # socket is already closed. (Depending on the GC's whims, self
60 # # may or may not have been GC'd, but if it wasn't yet, the
61 # # closed state is indicated by the lack of a __sock.
62 # #
63 # # Thus, silently (preferably with an ICMP error, but really
64 # # can't do that)...
65 # return
66 # ```
67 # That was done tentatively while debugging errors flying out of
68 # _read_ready, but it turned out that this was not the actual error
69 # source. Thus, I'm not adding the handler and assuming that close's
70 # remove_reader is not racing against callbacks, and thus that s() is
71 # always valid while the transport is around (and the weakref is really
72 # only used to break up the reference cycles to ensure the GC is not
73 # needed here).
74 def rr(s=weakref.ref(self)):
75 s()._read_ready()
76 loop.call_soon(loop.add_reader, self.__sock_fileno, rr)
77 loop.call_soon(_set_result_unless_cancelled, waiter, None)
79 def close(self):
80 if self.__sock is None:
81 return
83 if not self._loop.is_closed():
84 self._loop.call_soon(self._protocol.connection_lost, None)
86 self._loop.remove_reader(self.__sock_fileno)
87 self.__sock.close()
88 self.__sock = None
89 self._protocol = None
90 self._loop = None
92 def __del__(self):
93 if self.__sock is not None:
94 self.close()
96 def _read_ready(self):
97 if socknumbers.HAS_RECVERR:
98 try:
99 data, ancdata, flags, addr = self.__sock.recvmsg(self.max_size, 1024, socknumbers.MSG_ERRQUEUE)
100 except (BlockingIOError, InterruptedError):
101 pass
102 except OSError as exc:
103 if repr(exc) == "OSError('received malformed or improperly truncated ancillary data',)":
104 pass # workaround for https://bitbucket.org/pypy/pypy/issues/2649/recvmsg-with-empty-err-queue-raises-odd
105 else:
106 self._protocol.error_received(exc)
107 except Exception as exc:
108 self._fatal_error(exc, 'Fatal read error on datagram transport')
109 else:
110 self._protocol.datagram_errqueue_received(data, ancdata, flags, addr)
112 # copied and modified from _SelectorDatagramTransport
113 try:
114 data, ancdata, flags, addr = self.__sock.recvmsg(self.max_size, 1024) # TODO: find a way for the application to tell the trensport how much data is expected
115 except (BlockingIOError, InterruptedError):
116 pass
117 except OSError as exc:
118 self._protocol.error_received(exc)
119 except Exception as exc:
120 self._fatal_error(exc, 'Fatal read error on datagram transport')
121 else:
122 self._protocol.datagram_msg_received(data, ancdata, flags, addr)
124 def sendmsg(self, data, ancdata, flags, address):
125 try:
126 self.__sock.sendmsg((data,), ancdata, flags, address)
127 return
128 except OSError as exc:
129 self._protocol.error_received(exc)
130 return
131 except Exception as exc:
132 self._fatal_error(exc,
133 'Fatal write error on datagram transport')
134 return
136async def create_recvmsg_datagram_endpoint(loop, factory, sock):
137 """Create a datagram connection that uses recvmsg rather than recvfrom, and
138 a RecvmsgDatagramProtocol protocol type.
140 This is used like the create_datagram_endpoint method of an asyncio loop,
141 but implemented in a generic way using the loop's add_reader method; thus,
142 it's not a method of the loop but an independent function.
144 Due to the way it is used in aiocoap, socket is not an optional argument
145 here; it could be were this module ever split off into a standalone
146 package.
147 """
148 sock.setblocking(False)
150 protocol = factory()
151 waiter = loop.create_future()
152 transport = RecvmsgSelectorDatagramTransport(
153 loop, sock, protocol, waiter)
155 try:
156 await waiter
157 # see https://github.com/PyCQA/pycodestyle/issues/703
158 except: # noqa: E722
159 transport.close()
160 raise
162 return transport, protocol