Authorization plugins¶
AAA concept¶
AAA refers to Authentication, Authorization and Accounting. NDB provides a minimalistic API to integrate Authorization routines, leaving the rest -- Authentication and Accounting -- to the user.
Some of NDB routines and RTNL object methods are guarded with a parametrized decorator. The decorator takes the only parameter tag:
@check_auth('obj:read')
def __getitem__(self, key):
...
@check_auth('obj:modify')
def __setitem__(self, key, value):
...
AuthManager¶
The tag is checked by AuthManager.check(...) routine. The routine is the only method that must be provided by AuthManager-compatible objects, and must be defined as:
def check(self, obj, tag):
# -> True: grant access to the tag
# -> False: reject access
# -> raise Exception(): reject access with a specific exception
...
NDB module provides an example AuthManager:
from pyroute2 import NDB
from pyroute2.ndb.auth_manager import AuthManager
ndb = NDB(log='debug')
am = AuthManager({'obj:list': False, # deny dump(), summary()
'obj:read': True, # permit reading RTNL attributes
'obj:modify': True}, # permit add_ip(), commit() etc.
ndb.log.channel('auth'))
ap = ndb.auth_proxy(am)
ap.interfaces.summary() # <-- fails with PermissionError
You can implement custom AuthManager classes, the only requirement -- they must provide .check(self, obj, tag) routine, which returns True or False or raises an exception.
Usecase: OpenStack Keystone auth¶
Say we have a public service that provides access to NDB instance via HTTP, and authenticates users via Keystone. Then the auth flow could be:
Accept a connection from a client
Create custom auth manager object A
A.__init__() validates X-Auth-Token against Keystone (Authentication)
A.check() checks that X-Auth-Token is not expired (Authorization)
The auth result is being logged (Accounting)
An example AuthManager with OpenStack APIv3 support you may find in the /examples/ndb/ directory.
'''
:test:argv:14080769fe05e1f8b837fb43ca0f0ba4
A simplest example of a custom AuthManager and its usage
with `AuthProxy` objects.
Here we authenticate the auth token against Keystone and
allow any NDB operations until it is expired.
One can get such token with a curl request::
$ cat request.json
{ "auth": {
"identity": {
"methods": ["password"],
"password": {
"user": {
"name": "admin",
"domain": { "name": "admin_domain" },
"password": "secret"
}
}
},
"scope": {
"project": {
"id": "f0af12d451fb4bccbb38217e7f9afe9a"
}
}
}
}
$ curl -i \
-H "Content-Type: application/json" \
-d "@request.json" \
http://keystone:5000/v3/auth/tokens
`X-Subject-Token` header in the response will be the token we need. Say we
get `14080769fe05e1f8b837fb43ca0f0ba4` as `X-Subject-Token`. Then you can
run::
$ . openstack.rc # <-- your OpenStack APIv3 RC file
$ export PYTHONPATH=`pwd`
$ python examples/ndb/keystone_auth.py 14080769fe05e1f8b837fb43ca0f0ba4
Using this example you can implement services that export NDB via any RPC,
e.g. HTTP, and use Keystone integration. Same scheme may be used for any
other Auth API, be it RADIUS or like that.
An example of a simple HTTP service you can find in /cli/pyroute2-cli.
'''
import os
import sys
import time
from dateutil.parser import parse as isodate
from keystoneauth1.identity import v3
from keystoneauth1 import session
from keystoneclient.v3 import client as ksclient
from keystoneclient.v3.tokens import TokenManager
from pyroute2 import NDB
class OSAuthManager(object):
def __init__(self, token, log):
# create a Keystone password object
auth = v3.Password(
auth_url=os.environ.get('OS_AUTH_URL'),
username=os.environ.get('OS_USERNAME'),
password=os.environ.get('OS_PASSWORD'),
user_domain_name=(os.environ.get('OS_USER_DOMAIN_NAME')),
project_id=os.environ.get('OS_PROJECT_ID'),
)
# create a session object
sess = session.Session(auth=auth)
# create a token manager
tmanager = TokenManager(ksclient.Client(session=sess))
# validate the token
keystone_response = tmanager.validate(token)
# init attrs
self.log = log
self.expire = isodate(keystone_response['expires_at']).timestamp()
def check(self, obj, tag):
#
# totally ignore obj and tag, validate only token expiration
#
# problems to be solved before you use this code in production:
# 1. access levels: read-only, read-write -- match tag
# 2. how to deal with revoked tokens
#
if time.time() > self.expire:
self.log.error('%s permission denied' % (tag,))
raise PermissionError('keystone token has been expired')
self.log.info('%s permission granted' % (tag,))
return True
with NDB(log='debug') as ndb:
# create a utility log channel
log = ndb.log.channel('main')
# create an AuthManager-compatible object
log.info('request keystone auth')
am = OSAuthManager(sys.argv[1], ndb.log.channel('keystone'))
log.info('keystone auth complete, expires %s' % am.expire)
# create an auth proxy for this particular token
ap = ndb.auth_proxy(am)
# validate access via that proxy
print(ap.interfaces['lo'])
Usecase: RADIUS auth¶
'''
:test:argv:testing
:test:argv:secret
:test:environ:RADIUS_SERVER=127.0.0.1
:test:environ:RADIUS_SECRET=secret
An example of using RADIUS authentication with NDB.
In order to run the example you can setup a FreeRADIUS server::
# /etc/raddb/clients
client test {
ipaddr = 192.168.122.101 # IP addr of your client
secret = s3cr3t
}
# /etc/raddb/users
testing Cleartext-Password := "secret"
Then setup your client::
# download RADIUS dictionaries
$ export GITSERVER=https://raw.githubusercontent.com
$ export DICTPATH=pyradius/pyrad/master/example
$ wget $GITSERVER/$DICTPATH/dictionary
$ wget $GITSERVER/$DICTPATH/dictionary.freeradius
# setup the environment
$ cat radius.rc
export RADIUS_SERVER=192.168.122.1
export RADIUS_SECRET=s3cr3t
export PYTHONPATH=`pwd`
$ . radius.rc
$ python examples/ndb/radius_auth.py testing secret
'''
import os
import sys
from pyrad.client import Client
from pyrad.dictionary import Dictionary
import pyrad.packet
from pyroute2 import NDB
class RadiusAuthManager(object):
def __init__(self, user, password, log):
client = Client(
server=os.environ.get('RADIUS_SERVER'),
secret=os.environ.get('RADIUS_SECRET').encode('ascii'),
dict=Dictionary('dictionary'),
)
req = client.CreateAuthPacket(
code=pyrad.packet.AccessRequest, User_Name=user
)
req['User-Password'] = req.PwCrypt(password)
reply = client.SendPacket(req)
self.auth = reply.code
self.log = log
def check(self, obj, tag):
#
self.log.info('%s access' % (tag,))
return self.auth == pyrad.packet.AccessAccept
with NDB(log='debug') as ndb:
# create a utility log channel
log = ndb.log.channel('main')
# create an AuthManager-compatible object
log.info('request radius auth')
am = RadiusAuthManager(sys.argv[1], sys.argv[2], ndb.log.channel('radius'))
log.info('radius auth complete')
# create an auth proxy for these credentials
ap = ndb.auth_proxy(am)
# validate access via that proxy
print(ap.interfaces['lo'])