• index
  • modules |
  • next |
  • previous |
  • Project home »
  • pyroute2 0.7.3.post2 documentation »

Netlink parser data flow¶

NetlinkSocketBase: receive the data¶

When NetlinkSocketBase receives the data from a netlink socket, it can do it in two ways:

  1. get data directly with socket.recv() or socket.recv_into()

  2. run a buffer thread that receives the data asap and leaves in the buffer_queue to be consumed later by recv() or recv_into()

NetlinkSocketBase implements these two receive methods, that choose the data source – directly from the socket or from buffer_queue – depending on the buffer_thread property:

pyroute2.netlink.nlsocket.NetlinkSocketBase

def recv(self, *argv, **kwarg):
    if self.input_from_buffer_queue:
        data_in = self.buffer_queue.get()
        if isinstance(data_in, Exception):
            raise data_in
        return data_in
    return self._sock.recv(*argv, **kwarg)
def recv_into(self, data, *argv, **kwarg):
    if self.input_from_buffer_queue:
        data_in = self.buffer_queue.get()
        if isinstance(data, Exception):
            raise data_in
        data[:] = data_in
        return len(data_in)
    return self._sock.recv_into(data, *argv, **kwarg)
def buffer_thread_routine(self):
    poll = select.poll()
    poll.register(self._sock, select.POLLIN | select.POLLPRI)
    poll.register(self._ctrl_read, select.POLLIN | select.POLLPRI)
    sockfd = self._sock.fileno()
    while True:
        events = poll.poll()
        for (fd, event) in events:
            if fd == sockfd:
                try:
                    data = bytearray(64000)
                    self._sock.recv_into(data, 64000)
                    self.buffer_queue.put_nowait(data)
                except Exception as e:
                    self.buffer_queue.put(e)
                    return
            else:
                return
_images/aafig-811f971218e80cba0d029d458558f6d55f44c864.svg

Marshal: get and run parsers¶

Marshal should choose a proper parser depending on the key, flags and sequence_number. By default it uses only nlmsg->type as the key and nlmsg->flags, and there are several ways to customize getting parsers.

  1. Use custom key_format, key_offset and key_mask. The latter is used to partially match the key, while key_format and key_offset are used to struct.unpack() the key from the raw netlink data.

  2. You can overload Marshal.get_parser() and implement your own way to get parsers. A parser should be a simple function that gets only data, offset and length as arguments, and returns one dict compatible message.

_images/aafig-ba643f985e06c8709f402afe79b7c63269f24859.svg

pyroute2.netlink.nlsocket.Marshal

def parse(self, data, seq=None, callback=None, skip_alien_seq=False):
    '''
    Parse string data.

    At this moment all transport, except of the native
    Netlink is deprecated in this library, so we should
    not support any defragmentation on that level
    '''
    offset = 0

    # there must be at least one header in the buffer,
    # 'IHHII' == 16 bytes
    while offset <= len(data) - 16:
        # pick type and length
        (length, key, flags, sequence_number) = struct.unpack_from(
            'IHHI', data, offset
        )
        if skip_alien_seq and sequence_number != seq:
            continue
        if not 0 < length <= len(data):
            break
        # support custom parser keys
        # see also: pyroute2.netlink.diag.MarshalDiag
        if self.key_format is not None:
            (key,) = struct.unpack_from(
                self.key_format, data, offset + self.key_offset
            )
            if self.key_mask is not None:
                key &= self.key_mask

        parser = self.get_parser(key, flags, sequence_number)
        msg = parser(data, offset, length)
        offset += length
        if msg is None:
            continue

        if callable(callback) and seq == sequence_number:
            try:
                if callback(msg):
                    continue
            except Exception:
                pass

        mtype = msg['header'].get('type', None)
        if mtype in (1, 2, 3, 4):
            msg['event'] = mtypes.get(mtype, 'none')
        self.fix_message(msg)
        yield msg

The message parser routine must accept data, offset, length as the arguments, and must return a valid nlmsg or dict, with the mandatory fields, see the spec below. The parser can also return None which tells the marshal to skip this message. The parser must parse data for one message.

Mandatory message fields, expected by NetlinkSocketBase methods:

{
    'header': {
        'type': int,
        'flags': int,
        'error': None or NetlinkError(),
        'sequence_number': int,
    }
}
_images/aafig-631e4f53c8408ef4570630f4c9c4d7d1fa8ec7a3.svg

Per-request parsers¶

Sometimes it may be reasonable to handle a particular response with a spcific parser, rather than a generic one. An example is IPRoute.get_default_routes(), which could be slow on systems with huge amounts of routes.

Instead of parsing every route record as rtmsg, this method assigns a specific parser to its request. The custom parser doesn’t parse records blindly, but looks up only for default route records in the dump, and then parses only matched records with the standard routine:

pyroute2.iproute.linux.IPRoute

def get_default_routes(self, family=AF_UNSPEC, table=DEFAULT_TABLE):
    '''
    Get default routes
    '''
    msg = rtmsg()
    msg['family'] = family

    routes = self.nlm_request(
        msg,
        msg_type=RTM_GETROUTE,
        msg_flags=NLM_F_DUMP | NLM_F_REQUEST,
        parser=default_routes,
    )

    if table is None:
        return routes
    else:
        return self.filter_messages({'table': table}, routes)

pyroute2.iproute.parsers

def default_routes(data, offset, length):
    '''
    Only for RTM_NEWROUTE.

    This parser returns:

    * rtmsg() -- only for default routes (no RTA_DST)
    * nlmsg() -- NLMSG_DONE
    * None for any other messages
    '''
    # get message header
    header = dict(
        zip(
            ('length', 'type', 'flags', 'sequence_number'),
            struct.unpack_from('IHHI', data, offset),
        )
    )
    header['error'] = None
    if header['type'] == NLMSG_DONE:
        msg = nlmsg()
        msg['header'] = header
        msg.length = msg['header']['length']
        return msg

    # skip to NLA: offset + nlmsg header + rtmsg data
    cursor = offset + 28

    # iterate NLA, if meet RTA_DST -- return None (not a default route)
    while cursor < offset + length:
        nla_length, nla_type = struct.unpack_from('HH', data, cursor)
        nla_length = (nla_length + 3) & ~3  # align, page size = 4
        cursor += nla_length
        if nla_type == 1:
            return

    # no RTA_DST, a default route -- spend time to decode using the
    # standard routine
    msg = rtmsg(data, offset=offset)
    msg.decode()
    msg['header']['error'] = None  # required
    return msg

To assign a custom parser to a request/response communication, you should know first sequence_number, be it allocated dynamically with NetlinkSocketBase.addr_pool.alloc() or assigned statically. Then you can create a record in NetlinkSocketBase.seq_map:

#
def my_parser(data, offset, length):
    ...
    return parsed_message

msg_seq = nlsocket.addr_pool.alloc()
msg = nlmsg()
msg['header'] = {
    'type': my_type,
    'flags': NLM_F_REQUEST | NLM_F_ACK,
    'sequence_number': msg_seq,
}
msg['data'] = my_data
msg.encode()
nlsocket.seq_map[msg_seq] = my_parser
nlsocket.sendto(msg.data, (0, 0))
for reponse_message in nlsocket.get(msg_seq=msg_seq):
    handle(response_message)

NetlinkSocketBase: pick correct messages¶

The netlink protocol is asynchronous, so responses to several requests may come simultaneously. Also the kernel may send broadcast messages that are not responses, and have sequence_number == 0. As the response may contain multiple messages, and may or may not be terminated by some specific type of message, the task of returning relevant messages from the flow is a bit complicated.

Let’s look at an example:

_images/aafig-72649a104f2f190a48fc317b4d6029b2326a0ca2.svg

The message flow on the diagram features sequence_number == 0 broadcasts and sequence_number == 1 request and response packets. To complicate it even further you can run a request with sequence_number == 2 before the final response with sequence_number == 1 comes.

To handle that, NetlinkSocketBase.get() buffers all the irrelevant messages, returns ones with only the requested sequence_number, and uses locks to wait on the resource.

The current implementation is relatively complicated and will be changed in the future.

pyroute2.netlink.nlsocket.NetlinkSocketBase

def get(
    self,
    bufsize=DEFAULT_RCVBUF,
    msg_seq=0,
    terminate=None,
    callback=None,
    noraise=False,
):
    '''
    Get parsed messages list. If `msg_seq` is given, return
    only messages with that `msg['header']['sequence_number']`,
    saving all other messages into `self.backlog`.

    The routine is thread-safe.

    The `bufsize` parameter can be:

        - -1: bufsize will be calculated from the first 4 bytes of
            the network data
        - 0: bufsize will be calculated from SO_RCVBUF sockopt
        - int >= 0: just a bufsize

    If `noraise` is true, error messages will be treated as any
    other message.
    '''
    ctime = time.time()

    with self.lock[msg_seq]:
        if bufsize == -1:
            # get bufsize from the network data
            bufsize = struct.unpack("I", self.recv(4, MSG_PEEK))[0]
        elif bufsize == 0:
            # get bufsize from SO_RCVBUF
            bufsize = self.getsockopt(SOL_SOCKET, SO_RCVBUF) // 2

        tmsg = None
        enough = False
        backlog_acquired = False
        try:
            while not enough:
                # 8<-----------------------------------------------------------
                #
                # This stage changes the backlog, so use mutex to
                # prevent side changes
                self.backlog_lock.acquire()
                backlog_acquired = True
                ##
                # Stage 1. BEGIN
                #
                # 8<-----------------------------------------------------------
                #
                # Check backlog and return already collected
                # messages.
                #
                if msg_seq == 0 and self.backlog[0]:
                    # Zero queue.
                    #
                    # Load the backlog, if there is valid
                    # content in it
                    for msg in self.backlog[0]:
                        yield msg
                    self.backlog[0] = []
                    # And just exit
                    break
                elif msg_seq != 0 and len(self.backlog.get(msg_seq, [])):
                    # Any other msg_seq.
                    #
                    # Collect messages up to the terminator.
                    # Terminator conditions:
                    #  * NLMSG_ERROR != 0
                    #  * NLMSG_DONE
                    #  * terminate() function (if defined)
                    #  * not NLM_F_MULTI
                    #
                    # Please note, that if terminator not occured,
                    # more `recv()` rounds CAN be required.
                    for msg in tuple(self.backlog[msg_seq]):

                        # Drop the message from the backlog, if any
                        self.backlog[msg_seq].remove(msg)

                        # If there is an error, raise exception
                        if (
                            msg['header']['error'] is not None
                            and not noraise
                        ):
                            # reschedule all the remaining messages,
                            # including errors and acks, into a
                            # separate deque
                            self.error_deque.extend(self.backlog[msg_seq])
                            # flush the backlog for this msg_seq
                            del self.backlog[msg_seq]
                            # The loop is done
                            raise msg['header']['error']

                        # If it is the terminator message, say "enough"
                        # and requeue all the rest into Zero queue
                        if terminate is not None:
                            tmsg = terminate(msg)
                            if isinstance(tmsg, nlmsg):
                                yield msg
                        if (msg['header']['type'] == NLMSG_DONE) or tmsg:
                            # The loop is done
                            enough = True

                        # If it is just a normal message, append it to
                        # the response
                        if not enough:
                            # finish the loop on single messages
                            if not msg['header']['flags'] & NLM_F_MULTI:
                                enough = True
                            yield msg

                        # Enough is enough, requeue the rest and delete
                        # our backlog
                        if enough:
                            self.backlog[0].extend(self.backlog[msg_seq])
                            del self.backlog[msg_seq]
                            break

                    # Next iteration
                    self.backlog_lock.release()
                    backlog_acquired = False
                else:
                    # Stage 1. END
                    #
                    # 8<-------------------------------------------------------
                    #
                    # Stage 2. BEGIN
                    #
                    # 8<-------------------------------------------------------
                    #
                    # Receive the data from the socket and put the messages
                    # into the backlog
                    #
                    self.backlog_lock.release()
                    backlog_acquired = False
                    ##
                    #
                    # Control the timeout. We should not be within the
                    # function more than TIMEOUT seconds. All the locks
                    # MUST be released here.
                    #
                    if (msg_seq != 0) and (
                        time.time() - ctime > self.get_timeout
                    ):
                        # requeue already received for that msg_seq
                        self.backlog[0].extend(self.backlog[msg_seq])
                        del self.backlog[msg_seq]
                        # throw an exception
                        if self.get_timeout_exception:
                            raise self.get_timeout_exception()
                        else:
                            return
                    #
                    if self.read_lock.acquire(False):
                        try:
                            self.change_master.clear()
                            # If the socket is free to read from, occupy
                            # it and wait for the data
                            #
                            # This is a time consuming process, so all the
                            # locks, except the read lock must be released
                            data = self.recv(bufsize)
                            # Parse data
                            msgs = tuple(
                                self.marshal.parse(data, msg_seq, callback)
                            )
                            # Reset ctime -- timeout should be measured
                            # for every turn separately
                            ctime = time.time()
                            #
                            current = self.buffer_queue.qsize()
                            delta = current - self.qsize
                            delay = 0
                            if delta > 10:
                                delay = min(
                                    3, max(0.01, float(current) / 60000)
                                )
                                message = (
                                    "Packet burst: "
                                    "delta=%s qsize=%s delay=%s"
                                    % (delta, current, delay)
                                )
                                if delay < 1:
                                    log.debug(message)
                                else:
                                    log.warning(message)
                                time.sleep(delay)
                            self.qsize = current

                            # We've got the data, lock the backlog again
                            with self.backlog_lock:
                                for msg in msgs:
                                    msg['header']['target'] = self.target
                                    msg['header']['stats'] = Stats(
                                        current, delta, delay
                                    )
                                    seq = msg['header']['sequence_number']
                                    if seq not in self.backlog:
                                        if (
                                            msg['header']['type']
                                            == NLMSG_ERROR
                                        ):
                                            # Drop orphaned NLMSG_ERROR
                                            # messages
                                            continue
                                        seq = 0
                                    # 8<-----------------------------------
                                    # Callbacks section
                                    for cr in self.callbacks:
                                        try:
                                            if cr[0](msg):
                                                cr[1](msg, *cr[2])
                                        except:
                                            # FIXME
                                            #
                                            # Usually such code formatting
                                            # means that the method should
                                            # be refactored to avoid such
                                            # indentation.
                                            #
                                            # Plz do something with it.
                                            #
                                            lw = log.warning
                                            lw("Callback fail: %s" % (cr))
                                            lw(traceback.format_exc())
                                    # 8<-----------------------------------
                                    self.backlog[seq].append(msg)

                            # Now wake up other threads
                            self.change_master.set()
                        finally:
                            # Finally, release the read lock: all data
                            # processed
                            self.read_lock.release()
                    else:
                        # If the socket is occupied and there is still no
                        # data for us, wait for the next master change or
                        # for a timeout
                        self.change_master.wait(1)
                    # 8<-------------------------------------------------------
                    #
                    # Stage 2. END
                    #
                    # 8<-------------------------------------------------------
        finally:
            if backlog_acquired:
                self.backlog_lock.release()

Table of Contents

  • Netlink parser data flow
    • NetlinkSocketBase: receive the data
    • Marshal: get and run parsers
    • Per-request parsers
    • NetlinkSocketBase: pick correct messages

Previous topic

Module architecture

Next topic

Netlink

This Page

  • Show Source

Quick search

Navigation

  • index
  • modules |
  • next |
  • previous |
  • Project home »
  • pyroute2 0.7.3.post2 documentation »
  • Netlink parser data flow
© Copyright Peter Saveliev and PyRoute2 team. Created using Sphinx 5.1.1.