Authorization plugins

AAA concept

AAA refers to Authentication, Authorization and Accounting. NDB provides a minimalistic API to integrate Authorization routines, leaving the rest – Authentication and Accounting – to the user.

Some of NDB routines and RTNL object methods are guarded with a parametrized decorator. The decorator takes the only parameter tag:

def __getitem__(self, key):

def __setitem__(self, key, value):


The tag is checked by AuthManager.check(…) routine. The routine is the only method that must be provided by AuthManager-compatible objects, and must be defined as:

def check(self, obj, tag):
    # -> True: grant access to the tag
    # -> False: reject access
    # -> raise Exception(): reject access with a specific exception

NDB module provides an example AuthManager:

from pyroute2 import NDB
from pyroute2.ndb.auth_manager import AuthManager

ndb = NDB(log='debug')

am = AuthManager({'obj:list': False,    # deny dump(), summary()
                  'obj:read': True,     # permit reading RTNL attributes
                  'obj:modify': True},  # permit add_ip(), commit() etc.

ap = ndb.auth_proxy(am)
ap.interfaces.summary()  # <-- fails with PermissionError

You can implement custom AuthManager classes, the only requirement – they must provide .check(self, obj, tag) routine, which returns True or False or raises an exception.

Usecase: OpenStack Keystone auth

Say we have a public service that provides access to NDB instance via HTTP, and authenticates users via Keystone. Then the auth flow could be:

  1. Accept a connection from a client

  2. Create custom auth manager object A

  3. A.__init__() validates X-Auth-Token against Keystone (Authentication)

  4. A.check() checks that X-Auth-Token is not expired (Authorization)

  5. The auth result is being logged (Accounting)

An example AuthManager with OpenStack APIv3 support you may find in the /examples/ndb/ directory.
A simplest example of a custom AuthManager and its usage
with `AuthProxy` objects.

Here we authenticate the auth token against Keystone and
allow any NDB operations until it is expired.

One can get such token with a curl request::

    $ cat request.json
    { "auth": {
        "identity": {
          "methods": ["password"],
          "password": {
            "user": {
              "name": "admin",
              "domain": { "name": "admin_domain" },
              "password": "secret"
        "scope": {
          "project": {
            "id": "f0af12d451fb4bccbb38217e7f9afe9a"

    $ curl -i \
            -H "Content-Type: application/json" \
            -d "@request.json" \

`X-Subject-Token` header in the response will be the token we need. Say we
get `14080769fe05e1f8b837fb43ca0f0ba4` as `X-Subject-Token`. Then you can

    $ . openstack.rc  # <-- your OpenStack APIv3 RC file
    $ export PYTHONPATH=`pwd`
    $ python examples/ndb/ 14080769fe05e1f8b837fb43ca0f0ba4

Using this example you can implement services that export NDB via any RPC,
e.g. HTTP, and use Keystone integration. Same scheme may be used for any
other Auth API, be it RADIUS or like that.

An example of a simple HTTP service you can find in /cli/pyroute2-cli.

import os
import sys
import time
from dateutil.parser import parse as isodate
from keystoneauth1.identity import v3
from keystoneauth1 import session
from keystoneclient.v3 import client as ksclient
from keystoneclient.v3.tokens import TokenManager
from pyroute2 import NDB

class OSAuthManager(object):

    def __init__(self, token, log):
        # create a Keystone password object
        auth = v3.Password(auth_url=os.environ.get('OS_AUTH_URL'),
        # create a session object
        sess = session.Session(auth=auth)
        # create a token manager
        tmanager = TokenManager(ksclient.Client(session=sess))
        # validate the token
        keystone_response = tmanager.validate(token)
        # init attrs
        self.log = log
        self.expire = isodate(keystone_response['expires_at']).timestamp()

    def check(self, obj, tag):
        # totally ignore obj and tag, validate only token expiration
        # problems to be solved before you use this code in production:
        # 1. access levels: read-only, read-write -- match tag
        # 2. how to deal with revoked tokens
        if time.time() > self.expire:
            self.log.error('%s permission denied' % (tag, ))
            raise PermissionError('keystone token has been expired')'%s permission granted' % (tag, ))
        return True

with NDB(log='debug') as ndb:
    # create a utility log channel
    log ='main')

    # create an AuthManager-compatible object'request keystone auth')
    am = OSAuthManager(sys.argv[1],'keystone'))'keystone auth complete, expires %s' % am.expire)

    # create an auth proxy for this particular token
    ap = ndb.auth_proxy(am)

    # validate access via that proxy

Usecase: RADIUS auth
An example of using RADIUS authentication with NDB.

In order to run the example you can setup a FreeRADIUS server::

    # /etc/raddb/clients
    client test {
        ipaddr =  # IP addr of your client
        secret = s3cr3t

    # /etc/raddb/users
    testing Cleartext-Password := "secret"

Then setup your client::

    # download RADIUS dictionaries
    $ export GITSERVER=
    $ export DICTPATH=pyradius/pyrad/master/example
    $ wget $GITSERVER/$DICTPATH/dictionary
    $ wget $GITSERVER/$DICTPATH/dictionary.freeradius

    # setup the environment
    $ cat radius.rc
    export RADIUS_SERVER=
    export RADIUS_SECRET=s3cr3t
    export PYTHONPATH=`pwd`

    $ . radius.rc
    $ python examples/ndb/ testing secret


import os
import sys
from pyrad.client import Client
from pyrad.dictionary import Dictionary
import pyrad.packet
from pyroute2 import NDB

class RadiusAuthManager(object):

    def __init__(self, user, password, log):
        client = Client(server=os.environ.get('RADIUS_SERVER'),
        req = client.CreateAuthPacket(code=pyrad.packet.AccessRequest,
        req['User-Password'] = req.PwCrypt(password)
        reply = client.SendPacket(req)
        self.auth = reply.code
        self.log = log

    def check(self, obj, tag):
        #'%s access' % (tag, ))
        return self.auth == pyrad.packet.AccessAccept

with NDB(log='debug') as ndb:
    # create a utility log channel
    log ='main')

    # create an AuthManager-compatible object'request radius auth')
    am = RadiusAuthManager(sys.argv[1],
                 'radius'))'radius auth complete')

    # create an auth proxy for these credentials
    ap = ndb.auth_proxy(am)

    # validate access via that proxy