Netlink parser data flow¶
NetlinkRequest¶
In order to run a query and get the response to it, AsyncNetlinkSocket utilizes NetlinkRequest class that calculates required message flags, allocates the sequence number, completes the query message, and encodes it.
When the message is about to being sent, NetlinkRequest first tries a proxy, if any registered, and if NetlinkRequest.proxy() returns True then NetlinkRequest stops processing, otherwise sends it using the underlying socket.
NetlinkRequest.response() collects all the response packets for its sequence number, buffered so far, and returns an async iterator over the arrived response messages.
Marshal: get and run parsers¶
Marshal should choose a proper parser depending on the key, flags and sequence_number. By default it uses only nlmsg->type as the key and nlmsg->flags, and there are several ways to customize getting parsers.
Use custom key_format, key_offset and key_mask. The latter is used to partially match the key, while key_format and key_offset are used to struct.unpack() the key from the raw netlink data.
You can overload Marshal.get_parser() and implement your own way to get parsers. A parser should be a simple function that gets only data, offset and length as arguments, and returns one dict compatible message.
pyroute2.netlink.nlsocket.Marshal
def parse(
self,
data: bytes,
seq: Optional[int] = None,
callback: Optional[Callable] = None,
skip_alien_seq: bool = False,
) -> Generator[nlmsg, None, None]:
'''
Parse string data.
At this moment all transport, except of the native
Netlink is deprecated in this library, so we should
not support any defragmentation on that level
'''
offset = 0
# there must be at least one header in the buffer,
# 'IHHII' == 16 bytes
while offset <= len(data) - 16:
# pick type and length
(length, key, flags, sequence_number) = struct.unpack_from(
'IHHI', data, offset
)
if skip_alien_seq and sequence_number != seq:
continue
if not 0 < length <= len(data):
break
# support custom parser keys
# see also: pyroute2.netlink.diag.MarshalDiag
if self.key_format is not None:
(key,) = struct.unpack_from(
self.key_format, data, offset + self.key_offset
)
if self.key_mask is not None:
key &= self.key_mask
parser = self.get_parser(key, flags, sequence_number)
msg = parser(data, offset, length)
offset += length
if msg is None:
continue
if callable(callback) and seq == sequence_number:
try:
if callback(msg):
continue
except Exception:
pass
mtype = msg['header'].get('type', None)
if mtype in (1, 2, 3, 4) and 'event' not in msg:
msg['event'] = mtypes.get(mtype, 'none')
self.fix_message(msg)
yield msg
The message parser routine must accept data, offset, length as the arguments, and must return a valid nlmsg or dict, with the mandatory fields, see the spec below. The parser can also return None which tells the marshal to skip this message. The parser must parse data for one message.
Mandatory message fields, expected by NetlinkSocketBase methods:
{
'header': {
'type': int,
'flags': int,
'error': None or NetlinkError(),
'sequence_number': int,
}
}
Per-request parsers¶
To assign a custom parser to a request/response communication, it is enough to provide the parser function to the NetlinkRequest object.
An example is IPRoute.get_default_routes(), which could be slow on systems with huge amounts of routes.
Instead of parsing every route record as rtmsg, this method assigns a specific parser to its request. The custom parser doesn’t parse records blindly, but looks up only for default route records in the dump, and then parses only matched records with the standard routine:
pyroute2.iproute.linux.IPRoute
async def get_default_routes(self, family=AF_UNSPEC, table=DEFAULT_TABLE):
'''
Get default routes
'''
msg = rtmsg()
msg['family'] = family
dump_filter, _ = get_dump_filter(
'route', 'dump', {'table': table} if table is not None else {}
)
request = NetlinkRequest(
self,
msg,
msg_type=RTM_GETROUTE,
msg_flags=NLM_F_DUMP | NLM_F_REQUEST,
parser=default_routes,
)
await request.send()
return request.response()
pyroute2.iproute.parsers
def default_routes(data, offset, length):
'''
Only for RTM_NEWROUTE.
This parser returns:
* rtmsg() -- only for default routes (no RTA_DST)
* nlmsg() -- NLMSG_DONE
* None for any other messages
'''
header = get_header(data, offset)
if header['type'] == NLMSG_DONE:
return msg_done(header)
# skip to NLA: offset + nlmsg header + rtmsg data
cursor = offset + 28
# iterate NLA, if meet RTA_DST -- return None (not a default route)
while cursor < offset + length:
nla_length, nla_type = struct.unpack_from('HH', data, cursor)
nla_length = (nla_length + 3) & ~3 # align, page size = 4
cursor += nla_length
if nla_type == 1:
return
# no RTA_DST, a default route -- spend time to decode using the
# standard routine
msg = rtmsg(data, offset=offset)
msg.decode()
msg['header']['error'] = None # required
return msg
NetlinkRequest: pick correct messages¶
The netlink protocol is asynchronous, so responses to several requests may come simultaneously. Also the kernel may send broadcast messages that are not responses, and have sequence_number == 0. As the response may contain multiple messages, and may or may not be terminated by some specific type of message, the task of returning relevant messages from the flow is a bit complicated.
Let’s look at an example:
The message flow on the diagram features sequence_number == 0 broadcasts and sequence_number == 1 request and response packets. To complicate it even further you can run a request with sequence_number == 2 before the final response with sequence_number == 1 comes.
To handle that, pyroute2 protocol objects buffer all the messages, and NetlinkRequest only gets the reponse.
pyroute2.netlink.nlsocket.NetlinkRequest
async def response(self):
async for msg in self.sock.get(
msg_seq=self.msg_seq,
terminate=self.terminate,
callback=self.callback,
):
if self.dump_filter is not None and not self.match_one_message(
self.dump_filter, msg
):
continue
for cr in self.sock.callbacks:
try:
if cr[0](msg):
cr[1](msg, *cr[2])
except Exception:
log.warning("Callback fail: %{cr}")
yield msg
self.cleanup()