Base netlink socket and marshal¶
All the netlink providers are derived from the socket class, so they provide normal socket API, including getsockopt(), setsockopt(), they can be used in poll/select I/O loops etc.
asynchronous I/O¶
To run async reader thread, one should call NetlinkSocket.bind(async_cache=True). In that case a background thread will be launched. The thread will automatically collect all the messages and store into a userspace buffer.
Note
There is no need to turn on async I/O, if you don't plan to receive broadcast messages.
ENOBUF and async I/O¶
When Netlink messages arrive faster than a program reads then from the socket, the messages overflow the socket buffer and one gets ENOBUF on recv():
... self.recv(bufsize)
error: [Errno 105] No buffer space available
One way to avoid ENOBUF, is to use async I/O. Then the library not only reads and buffers all the messages, but also re-prioritizes threads. Suppressing the parser activity, the library increases the response delay, but spares CPU to read and enqueue arriving messages as fast, as it is possible.
With logging level DEBUG you can notice messages, that the library started to calm down the parser thread:
DEBUG:root:Packet burst: the reader thread priority
is increased, beware of delays on netlink calls
Counters: delta=25 qsize=25 delay=0.1
This state requires no immediate action, but just some more attention. When the delay between messages on the parser thread exceeds 1 second, DEBUG messages become WARNING ones:
WARNING:root:Packet burst: the reader thread priority
is increased, beware of delays on netlink calls
Counters: delta=2525 qsize=213536 delay=3
This state means, that almost all the CPU resources are dedicated to the reader thread. It doesn't mean, that the reader thread consumes 100% CPU -- it means, that the CPU is reserved for the case of more intensive bursts. The library will return to the normal state only when the broadcast storm will be over, and then the CPU will be 100% loaded with the parser for some time, when it will process all the messages queued so far.
when async I/O doesn't help¶
Sometimes, even turning async I/O doesn't fix ENOBUF. Mostly it means, that in this particular case the Python performance is not enough even to read and store the raw data from the socket. There is no workaround for such cases, except of using something not Python-based.
One can still play around with SO_RCVBUF socket option, but it doesn't help much. So keep it in mind, and if you expect massive broadcast Netlink storms, perform stress testing prior to deploy a solution in the production.
classes¶
- class pyroute2.netlink.nlsocket.Stats(qsize, delta, delay)¶
- delay¶
Alias for field number 2
- delta¶
Alias for field number 1
- qsize¶
Alias for field number 0
- class pyroute2.netlink.nlsocket.Marshal¶
Generic marshalling class
- default_message_class¶
alias of
nlmsg
- parse(data, seq=None, callback=None, skip_alien_seq=False)¶
Parse string data.
At this moment all transport, except of the native Netlink is deprecated in this library, so we should not support any defragmentation on that level
- class pyroute2.netlink.nlsocket.NetlinkSocketBaseSafe¶
Thread-safe base class for netlink sockets. It buffers all incoming messages regardless sequence numbers, and returns only messages with requested numbers. This is done using synchronization primitives in a quite complicated manner.
- put(msg, msg_type, msg_flags=1, addr=(0, 0), msg_seq=0, msg_pid=None)¶
Construct a message from a dictionary and send it to the socket. Parameters:
msg -- the message in the dictionary format
msg_type -- the message type
msg_flags -- the message flags to use in the request
addr -- sendto() addr, default (0, 0)
msg_seq -- sequence number to use
msg_pid -- pid to use, if None -- use os.getpid()
Example:
s = IPRSocket() s.bind() s.put({'index': 1}, RTM_GETLINK) s.get() s.close()
Please notice, that the return value of s.get() can be not the result of s.put(), but any broadcast message. To fix that, use msg_seq -- the response must contain the same msg['header']['sequence_number'] value.
- get(bufsize=65536, msg_seq=0, terminate=None, callback=None, noraise=False)¶
Get parsed messages list. If msg_seq is given, return only messages with that msg['header']['sequence_number'], saving all other messages into self.backlog.
The routine is thread-safe.
The bufsize parameter can be:
- -1: bufsize will be calculated from the first 4 bytes of
the network data
0: bufsize will be calculated from SO_RCVBUF sockopt
int >= 0: just a bufsize
If noraise is true, error messages will be treated as any other message.
- class pyroute2.netlink.nlsocket.NetlinkSocketBaseUnsafe¶
Thread unsafe nlsocket base class. Does not implement any locks on message processing. Discards any message if the sequence number does not match.
- class pyroute2.netlink.nlsocket.NetlinkSocketBase(family=16, port=None, pid=None, fileno=None, sndbuf=1048576, rcvbuf=1048576, all_ns=False, async_qsize=None, nlm_generator=None, target='localhost', ext_ack=False, strict_check=False, groups=0)¶
Generic netlink socket.
- register_callback(callback, predicate=<function NetlinkSocketBase.<lambda>>, args=None)¶
Register a callback to run on a message arrival.
Callback is the function that will be called with the message as the first argument. Predicate is the optional callable object, that returns True or False. Upon True, the callback will be called. Upon False it will not. Args is a list or tuple of arguments.
Simplest example, assume ipr is the IPRoute() instance:
# create a simplest callback that will print messages def cb(msg): print(msg) # register callback for any message: ipr.register_callback(cb)
More complex example, with filtering:
# Set object's attribute after the message key def cb(msg, obj): obj.some_attr = msg["some key"] # Register the callback only for the loopback device, index 1: ipr.register_callback(cb, lambda x: x.get('index', None) == 1, (self, ))
Please note: you do not need to register the default 0 queue to invoke callbacks on broadcast messages. Callbacks are iterated before messages get enqueued.
- unregister_callback(callback)¶
Remove the first reference to the function from the callback register
- register_policy(policy, msg_class=None)¶
Register netlink encoding/decoding policy. Can be specified in two ways: nlsocket.register_policy(MSG_ID, msg_class) to register one particular rule, or nlsocket.register_policy({MSG_ID1: msg_class}) to register several rules at once. E.g.:
policy = {RTM_NEWLINK: ifinfmsg, RTM_DELLINK: ifinfmsg, RTM_NEWADDR: ifaddrmsg, RTM_DELADDR: ifaddrmsg} nlsocket.register_policy(policy)
One can call register_policy() as many times, as one want to -- it will just extend the current policy scheme, not replace it.
- unregister_policy(policy)¶
Unregister policy. Policy can be:
int -- then it will just remove one policy
list or tuple of ints -- remove all given
dict -- remove policies by keys from dict
In the last case the routine will ignore dict values, it is implemented so just to make it compatible with get_policy_map() return value.
- get_policy_map(policy=None)¶
Return policy for a given message type or for all message types. Policy parameter can be either int, or a list of ints. Always return dictionary.
- nlm_request_batch(msgs, noraise=False)¶
This function is for messages which are expected to have side effects. Do not blindly retry in case of errors as this might duplicate them.
- class pyroute2.netlink.nlsocket.BatchBacklogQueue(iterable=(), /)¶
- append(*argv, **kwarg)¶
Append object to the end of the list.
- pop(*argv, **kwarg)¶
Remove and return item at index (default last).
Raises IndexError if list is empty or index is out of range.
- class pyroute2.netlink.nlsocket.BatchBacklog¶
- class pyroute2.netlink.nlsocket.BatchSocket(family=16, port=None, pid=None, fileno=None, sndbuf=1048576, rcvbuf=1048576, all_ns=False, async_qsize=None, nlm_generator=None, target='localhost', ext_ack=False, strict_check=False, groups=0)¶
- get(*argv, **kwarg)¶
Get parsed messages list. If msg_seq is given, return only messages with that msg['header']['sequence_number'], saving all other messages into self.backlog.
The routine is thread-safe.
The bufsize parameter can be:
- -1: bufsize will be calculated from the first 4 bytes of
the network data
0: bufsize will be calculated from SO_RCVBUF sockopt
int >= 0: just a bufsize
If noraise is true, error messages will be treated as any other message.
- class pyroute2.netlink.nlsocket.NetlinkSocket(family=16, port=None, pid=None, fileno=None, sndbuf=1048576, rcvbuf=1048576, all_ns=False, async_qsize=None, nlm_generator=None, target='localhost', ext_ack=False, strict_check=False, groups=0)¶
- bind(groups=0, pid=None, **kwarg)¶
Bind the socket to given multicast groups, using given pid.
If pid is None, use automatic port allocation
If pid == 0, use process' pid
If pid == <int>, use the value instead of pid
- close(code=104)¶
Correctly close the socket and free all resources.
- class pyroute2.netlink.nlsocket.ChaoticNetlinkSocket(*argv, **kwarg)¶
- get(*argv, **kwarg)¶
Get parsed messages list. If msg_seq is given, return only messages with that msg['header']['sequence_number'], saving all other messages into self.backlog.
The routine is thread-safe.
The bufsize parameter can be:
- -1: bufsize will be calculated from the first 4 bytes of
the network data
0: bufsize will be calculated from SO_RCVBUF sockopt
int >= 0: just a bufsize
If noraise is true, error messages will be treated as any other message.