Netlink parser data flow¶
NetlinkSocketBase: receive the data¶
When NetlinkSocketBase receives the data from a netlink socket, it can do it in two ways:
get data directly with socket.recv() or socket.recv_into()
run a buffer thread that receives the data asap and leaves in the buffer_queue to be consumed later by recv() or recv_into()
NetlinkSocketBase implements these two receive methods, that choose the data source – directly from the socket or from buffer_queue – depending on the buffer_thread property:
pyroute2.netlink.nlsocket.NetlinkSocketBase
def recv(self, *argv, **kwarg):
if self.input_from_buffer_queue:
data_in = self.buffer_queue.get()
if isinstance(data_in, Exception):
raise data_in
return data_in
return self._sock.recv(*argv, **kwarg)
def recv_into(self, data, *argv, **kwarg):
if self.input_from_buffer_queue:
data_in = self.buffer_queue.get()
if isinstance(data, Exception):
raise data_in
data[:] = data_in
return len(data_in)
return self._sock.recv_into(data, *argv, **kwarg)
def buffer_thread_routine(self):
poll = select.poll()
poll.register(self._sock, select.POLLIN | select.POLLPRI)
poll.register(self._ctrl_read, select.POLLIN | select.POLLPRI)
sockfd = self._sock.fileno()
while True:
events = poll.poll()
for (fd, event) in events:
if fd == sockfd:
try:
data = bytearray(64000)
self._sock.recv_into(data, 64000)
self.buffer_queue.put_nowait(data)
except Exception as e:
self.buffer_queue.put(e)
return
else:
return
Marshal: get and run parsers¶
Marshal should choose a proper parser depending on the key, flags and sequence_number. By default it uses only nlmsg->type as the key and nlmsg->flags, and there are several ways to customize getting parsers.
Use custom key_format, key_offset and key_mask. The latter is used to partially match the key, while key_format and key_offset are used to struct.unpack() the key from the raw netlink data.
You can overload Marshal.get_parser() and implement your own way to get parsers. A parser should be a simple function that gets only data, offset and length as arguments, and returns one dict compatible message.
pyroute2.netlink.nlsocket.Marshal
def parse(self, data, seq=None, callback=None, skip_alien_seq=False):
'''
Parse string data.
At this moment all transport, except of the native
Netlink is deprecated in this library, so we should
not support any defragmentation on that level
'''
offset = 0
# there must be at least one header in the buffer,
# 'IHHII' == 16 bytes
while offset <= len(data) - 16:
# pick type and length
(length, key, flags, sequence_number) = struct.unpack_from(
'IHHI', data, offset
)
if skip_alien_seq and sequence_number != seq:
continue
if not 0 < length <= len(data):
break
# support custom parser keys
# see also: pyroute2.netlink.diag.MarshalDiag
if self.key_format is not None:
(key,) = struct.unpack_from(
self.key_format, data, offset + self.key_offset
)
if self.key_mask is not None:
key &= self.key_mask
parser = self.get_parser(key, flags, sequence_number)
msg = parser(data, offset, length)
offset += length
if msg is None:
continue
if callable(callback) and seq == sequence_number:
try:
if callback(msg):
continue
except Exception:
pass
mtype = msg['header'].get('type', None)
if mtype in (1, 2, 3, 4):
msg['event'] = mtypes.get(mtype, 'none')
self.fix_message(msg)
yield msg
The message parser routine must accept data, offset, length as the arguments, and must return a valid nlmsg or dict, with the mandatory fields, see the spec below. The parser can also return None which tells the marshal to skip this message. The parser must parse data for one message.
Mandatory message fields, expected by NetlinkSocketBase methods:
{
'header': {
'type': int,
'flags': int,
'error': None or NetlinkError(),
'sequence_number': int,
}
}
Per-request parsers¶
Sometimes it may be reasonable to handle a particular response with a spcific parser, rather than a generic one. An example is IPRoute.get_default_routes(), which could be slow on systems with huge amounts of routes.
Instead of parsing every route record as rtmsg, this method assigns a specific parser to its request. The custom parser doesn’t parse records blindly, but looks up only for default route records in the dump, and then parses only matched records with the standard routine:
pyroute2.iproute.linux.IPRoute
def get_default_routes(self, family=AF_UNSPEC, table=DEFAULT_TABLE):
'''
Get default routes
'''
msg = rtmsg()
msg['family'] = family
routes = self.nlm_request(
msg,
msg_type=RTM_GETROUTE,
msg_flags=NLM_F_DUMP | NLM_F_REQUEST,
parser=default_routes,
)
if table is None:
return routes
else:
return self.filter_messages({'table': table}, routes)
pyroute2.iproute.parsers
def default_routes(data, offset, length):
'''
Only for RTM_NEWROUTE.
This parser returns:
* rtmsg() -- only for default routes (no RTA_DST)
* nlmsg() -- NLMSG_DONE
* None for any other messages
'''
# get message header
header = dict(
zip(
('length', 'type', 'flags', 'sequence_number'),
struct.unpack_from('IHHI', data, offset),
)
)
header['error'] = None
if header['type'] == NLMSG_DONE:
msg = nlmsg()
msg['header'] = header
msg.length = msg['header']['length']
return msg
# skip to NLA: offset + nlmsg header + rtmsg data
cursor = offset + 28
# iterate NLA, if meet RTA_DST -- return None (not a default route)
while cursor < offset + length:
nla_length, nla_type = struct.unpack_from('HH', data, cursor)
nla_length = (nla_length + 3) & ~3 # align, page size = 4
cursor += nla_length
if nla_type == 1:
return
# no RTA_DST, a default route -- spend time to decode using the
# standard routine
msg = rtmsg(data, offset=offset)
msg.decode()
msg['header']['error'] = None # required
return msg
To assign a custom parser to a request/response communication, you should know first sequence_number, be it allocated dynamically with NetlinkSocketBase.addr_pool.alloc() or assigned statically. Then you can create a record in NetlinkSocketBase.seq_map:
#
def my_parser(data, offset, length):
...
return parsed_message
msg_seq = nlsocket.addr_pool.alloc()
msg = nlmsg()
msg['header'] = {
'type': my_type,
'flags': NLM_F_REQUEST | NLM_F_ACK,
'sequence_number': msg_seq,
}
msg['data'] = my_data
msg.encode()
nlsocket.seq_map[msg_seq] = my_parser
nlsocket.sendto(msg.data, (0, 0))
for reponse_message in nlsocket.get(msg_seq=msg_seq):
handle(response_message)
NetlinkSocketBase: pick correct messages¶
The netlink protocol is asynchronous, so responses to several requests may come simultaneously. Also the kernel may send broadcast messages that are not responses, and have sequence_number == 0. As the response may contain multiple messages, and may or may not be terminated by some specific type of message, the task of returning relevant messages from the flow is a bit complicated.
Let’s look at an example:
The message flow on the diagram features sequence_number == 0 broadcasts and sequence_number == 1 request and response packets. To complicate it even further you can run a request with sequence_number == 2 before the final response with sequence_number == 1 comes.
To handle that, NetlinkSocketBase.get() buffers all the irrelevant messages, returns ones with only the requested sequence_number, and uses locks to wait on the resource.
The current implementation is relatively complicated and will be changed in the future.
pyroute2.netlink.nlsocket.NetlinkSocketBase
def get(
self,
bufsize=DEFAULT_RCVBUF,
msg_seq=0,
terminate=None,
callback=None,
noraise=False,
):
'''
Get parsed messages list. If `msg_seq` is given, return
only messages with that `msg['header']['sequence_number']`,
saving all other messages into `self.backlog`.
The routine is thread-safe.
The `bufsize` parameter can be:
- -1: bufsize will be calculated from the first 4 bytes of
the network data
- 0: bufsize will be calculated from SO_RCVBUF sockopt
- int >= 0: just a bufsize
If `noraise` is true, error messages will be treated as any
other message.
'''
ctime = time.time()
with self.lock[msg_seq]:
if bufsize == -1:
# get bufsize from the network data
bufsize = struct.unpack("I", self.recv(4, MSG_PEEK))[0]
elif bufsize == 0:
# get bufsize from SO_RCVBUF
bufsize = self.getsockopt(SOL_SOCKET, SO_RCVBUF) // 2
tmsg = None
enough = False
backlog_acquired = False
try:
while not enough:
# 8<-----------------------------------------------------------
#
# This stage changes the backlog, so use mutex to
# prevent side changes
self.backlog_lock.acquire()
backlog_acquired = True
##
# Stage 1. BEGIN
#
# 8<-----------------------------------------------------------
#
# Check backlog and return already collected
# messages.
#
if msg_seq == 0 and self.backlog[0]:
# Zero queue.
#
# Load the backlog, if there is valid
# content in it
for msg in self.backlog[0]:
yield msg
self.backlog[0] = []
# And just exit
break
elif msg_seq != 0 and len(self.backlog.get(msg_seq, [])):
# Any other msg_seq.
#
# Collect messages up to the terminator.
# Terminator conditions:
# * NLMSG_ERROR != 0
# * NLMSG_DONE
# * terminate() function (if defined)
# * not NLM_F_MULTI
#
# Please note, that if terminator not occured,
# more `recv()` rounds CAN be required.
for msg in tuple(self.backlog[msg_seq]):
# Drop the message from the backlog, if any
self.backlog[msg_seq].remove(msg)
# If there is an error, raise exception
if (
msg['header']['error'] is not None
and not noraise
):
# reschedule all the remaining messages,
# including errors and acks, into a
# separate deque
self.error_deque.extend(self.backlog[msg_seq])
# flush the backlog for this msg_seq
del self.backlog[msg_seq]
# The loop is done
raise msg['header']['error']
# If it is the terminator message, say "enough"
# and requeue all the rest into Zero queue
if terminate is not None:
tmsg = terminate(msg)
if isinstance(tmsg, nlmsg):
yield msg
if (msg['header']['type'] == NLMSG_DONE) or tmsg:
# The loop is done
enough = True
# If it is just a normal message, append it to
# the response
if not enough:
# finish the loop on single messages
if not msg['header']['flags'] & NLM_F_MULTI:
enough = True
yield msg
# Enough is enough, requeue the rest and delete
# our backlog
if enough:
self.backlog[0].extend(self.backlog[msg_seq])
del self.backlog[msg_seq]
break
# Next iteration
self.backlog_lock.release()
backlog_acquired = False
else:
# Stage 1. END
#
# 8<-------------------------------------------------------
#
# Stage 2. BEGIN
#
# 8<-------------------------------------------------------
#
# Receive the data from the socket and put the messages
# into the backlog
#
self.backlog_lock.release()
backlog_acquired = False
##
#
# Control the timeout. We should not be within the
# function more than TIMEOUT seconds. All the locks
# MUST be released here.
#
if (msg_seq != 0) and (
time.time() - ctime > self.get_timeout
):
# requeue already received for that msg_seq
self.backlog[0].extend(self.backlog[msg_seq])
del self.backlog[msg_seq]
# throw an exception
if self.get_timeout_exception:
raise self.get_timeout_exception()
else:
return
#
if self.read_lock.acquire(False):
try:
self.change_master.clear()
# If the socket is free to read from, occupy
# it and wait for the data
#
# This is a time consuming process, so all the
# locks, except the read lock must be released
data = self.recv(bufsize)
# Parse data
msgs = tuple(
self.marshal.parse(data, msg_seq, callback)
)
# Reset ctime -- timeout should be measured
# for every turn separately
ctime = time.time()
#
current = self.buffer_queue.qsize()
delta = current - self.qsize
delay = 0
if delta > 10:
delay = min(
3, max(0.01, float(current) / 60000)
)
message = (
"Packet burst: "
"delta=%s qsize=%s delay=%s"
% (delta, current, delay)
)
if delay < 1:
log.debug(message)
else:
log.warning(message)
time.sleep(delay)
self.qsize = current
# We've got the data, lock the backlog again
with self.backlog_lock:
for msg in msgs:
msg['header']['target'] = self.target
msg['header']['stats'] = Stats(
current, delta, delay
)
seq = msg['header']['sequence_number']
if seq not in self.backlog:
if (
msg['header']['type']
== NLMSG_ERROR
):
# Drop orphaned NLMSG_ERROR
# messages
continue
seq = 0
# 8<-----------------------------------
# Callbacks section
for cr in self.callbacks:
try:
if cr[0](msg):
cr[1](msg, *cr[2])
except:
# FIXME
#
# Usually such code formatting
# means that the method should
# be refactored to avoid such
# indentation.
#
# Plz do something with it.
#
lw = log.warning
lw("Callback fail: %s" % (cr))
lw(traceback.format_exc())
# 8<-----------------------------------
self.backlog[seq].append(msg)
# Now wake up other threads
self.change_master.set()
finally:
# Finally, release the read lock: all data
# processed
self.read_lock.release()
else:
# If the socket is occupied and there is still no
# data for us, wait for the next master change or
# for a timeout
self.change_master.wait(1)
# 8<-------------------------------------------------------
#
# Stage 2. END
#
# 8<-------------------------------------------------------
finally:
if backlog_acquired:
self.backlog_lock.release()