dect
/
libnl
Archived
13
0
Fork 0

Fix indentation (spaces vs tabs)

Now, python files use pseudo-tab equal 4 spaces
This commit is contained in:
Коренберг Марк (ноутбук дома) 2012-06-05 01:08:27 +06:00
parent fb890a5b5e
commit 83f06bfbb8
9 changed files with 1909 additions and 1909 deletions

File diff suppressed because it is too large Load Diff

View File

@ -10,8 +10,8 @@ from __future__ import absolute_import
__version__ = "1.0"
__all__ = [
'AddressCache',
'Address']
'AddressCache',
'Address']
import datetime
from .. import core as netlink
@ -23,391 +23,391 @@ from .. import util as util
###########################################################################
# Address Cache
class AddressCache(netlink.Cache):
"""Cache containing network addresses"""
"""Cache containing network addresses"""
def __init__(self, cache=None):
if not cache:
cache = self._alloc_cache_name("route/addr")
def __init__(self, cache=None):
if not cache:
cache = self._alloc_cache_name("route/addr")
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
def __getitem__(self, key):
# Using ifindex=0 here implies that the local address itself
# is unique, otherwise the first occurence is returned.
return self.lookup(0, key)
def __getitem__(self, key):
# Using ifindex=0 here implies that the local address itself
# is unique, otherwise the first occurence is returned.
return self.lookup(0, key)
def lookup(self, ifindex, local):
if type(local) is str:
local = netlink.AbstractAddress(local)
def lookup(self, ifindex, local):
if type(local) is str:
local = netlink.AbstractAddress(local)
addr = capi.rtnl_addr_get(self._nl_cache, ifindex,
local._nl_addr)
if addr is None:
raise KeyError()
addr = capi.rtnl_addr_get(self._nl_cache, ifindex,
local._nl_addr)
if addr is None:
raise KeyError()
return Address._from_capi(addr)
return Address._from_capi(addr)
def _new_object(self, obj):
return Address(obj)
def _new_object(self, obj):
return Address(obj)
def _new_cache(self, cache):
return AddressCache(cache=cache)
def _new_cache(self, cache):
return AddressCache(cache=cache)
###########################################################################
# Address Object
class Address(netlink.Object):
"""Network address"""
"""Network address"""
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/addr", "address", obj)
self._rtnl_addr = self._obj2type(self._nl_object)
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/addr", "address", obj)
self._rtnl_addr = self._obj2type(self._nl_object)
@classmethod
def _from_capi(cls, obj):
return cls(capi.addr2obj(obj))
@classmethod
def _from_capi(cls, obj):
return cls(capi.addr2obj(obj))
def _obj2type(self, obj):
return capi.obj2addr(obj)
def _obj2type(self, obj):
return capi.obj2addr(obj)
def __cmp__(self, other):
# sort by:
# 1. network link
# 2. address family
# 3. local address (including prefixlen)
diff = self.ifindex - other.ifindex
def __cmp__(self, other):
# sort by:
# 1. network link
# 2. address family
# 3. local address (including prefixlen)
diff = self.ifindex - other.ifindex
if diff == 0:
diff = self.family - other.family
if diff == 0:
diff = capi.nl_addr_cmp(self.local, other.local)
if diff == 0:
diff = self.family - other.family
if diff == 0:
diff = capi.nl_addr_cmp(self.local, other.local)
return diff
return diff
def _new_instance(self, obj):
return Address(obj)
def _new_instance(self, obj):
return Address(obj)
#####################################################################
# ifindex
@netlink.nlattr('address.ifindex', type=int, immutable=True,
fmt=util.num)
@property
def ifindex(self):
"""interface index"""
return capi.rtnl_addr_get_ifindex(self._rtnl_addr)
#####################################################################
# ifindex
@netlink.nlattr('address.ifindex', type=int, immutable=True,
fmt=util.num)
@property
def ifindex(self):
"""interface index"""
return capi.rtnl_addr_get_ifindex(self._rtnl_addr)
@ifindex.setter
def ifindex(self, value):
link = Link.resolve(value)
if not link:
raise ValueError()
@ifindex.setter
def ifindex(self, value):
link = Link.resolve(value)
if not link:
raise ValueError()
self.link = link
self.link = link
#####################################################################
# link
@netlink.nlattr('address.link', type=str, fmt=util.string)
@property
def link(self):
link = capi.rtnl_addr_get_link(self._rtnl_addr)
if not link:
return None
#####################################################################
# link
@netlink.nlattr('address.link', type=str, fmt=util.string)
@property
def link(self):
link = capi.rtnl_addr_get_link(self._rtnl_addr)
if not link:
return None
return Link.Link.from_capi(link)
return Link.Link.from_capi(link)
@link.setter
def link(self, value):
if type(value) is str:
try:
value = Link.resolve(value)
except KeyError:
raise ValueError()
@link.setter
def link(self, value):
if type(value) is str:
try:
value = Link.resolve(value)
except KeyError:
raise ValueError()
capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link)
capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link)
# ifindex is immutable but we assume that if _orig does not
# have an ifindex specified, it was meant to be given here
if capi.rtnl_addr_get_ifindex(self._orig) == 0:
capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)
# ifindex is immutable but we assume that if _orig does not
# have an ifindex specified, it was meant to be given here
if capi.rtnl_addr_get_ifindex(self._orig) == 0:
capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)
#####################################################################
# label
@netlink.nlattr('address.label', type=str, fmt=util.string)
@property
def label(self):
"""address label"""
return capi.rtnl_addr_get_label(self._rtnl_addr)
#####################################################################
# label
@netlink.nlattr('address.label', type=str, fmt=util.string)
@property
def label(self):
"""address label"""
return capi.rtnl_addr_get_label(self._rtnl_addr)
@label.setter
def label(self, value):
capi.rtnl_addr_set_label(self._rtnl_addr, value)
@label.setter
def label(self, value):
capi.rtnl_addr_set_label(self._rtnl_addr, value)
#####################################################################
# flags
@netlink.nlattr('address.flags', type=str, fmt=util.string)
@property
def flags(self):
"""Flags
#####################################################################
# flags
@netlink.nlattr('address.flags', type=str, fmt=util.string)
@property
def flags(self):
"""Flags
Setting this property will *Not* reset flags to value you supply in
Setting this property will *Not* reset flags to value you supply in
Examples:
addr.flags = '+xxx' # add xxx flag
addr.flags = 'xxx' # exactly the same
addr.flags = '-xxx' # remove xxx flag
addr.flags = [ '+xxx', '-yyy' ] # list operation
"""
flags = capi.rtnl_addr_get_flags(self._rtnl_addr)
return capi.rtnl_addr_flags2str(flags, 256)[0].split(',')
Examples:
addr.flags = '+xxx' # add xxx flag
addr.flags = 'xxx' # exactly the same
addr.flags = '-xxx' # remove xxx flag
addr.flags = [ '+xxx', '-yyy' ] # list operation
"""
flags = capi.rtnl_addr_get_flags(self._rtnl_addr)
return capi.rtnl_addr_flags2str(flags, 256)[0].split(',')
def _set_flag(self, flag):
if flag.startswith('-'):
i = capi.rtnl_addr_str2flags(flag[1:])
capi.rtnl_addr_unset_flags(self._rtnl_addr, i)
elif flag.startswith('+'):
i = capi.rtnl_addr_str2flags(flag[1:])
capi.rtnl_addr_set_flags(self._rtnl_addr, i)
else:
i = capi.rtnl_addr_str2flags(flag)
capi.rtnl_addr_set_flags(self._rtnl_addr, i)
def _set_flag(self, flag):
if flag.startswith('-'):
i = capi.rtnl_addr_str2flags(flag[1:])
capi.rtnl_addr_unset_flags(self._rtnl_addr, i)
elif flag.startswith('+'):
i = capi.rtnl_addr_str2flags(flag[1:])
capi.rtnl_addr_set_flags(self._rtnl_addr, i)
else:
i = capi.rtnl_addr_str2flags(flag)
capi.rtnl_addr_set_flags(self._rtnl_addr, i)
@flags.setter
def flags(self, value):
if type(value) is list:
for flag in value:
self._set_flag(flag)
else:
self._set_flag(value)
@flags.setter
def flags(self, value):
if type(value) is list:
for flag in value:
self._set_flag(flag)
else:
self._set_flag(value)
#####################################################################
# family
@netlink.nlattr('address.family', type=int, immutable=True,
fmt=util.num)
@property
def family(self):
"""Address family"""
fam = capi.rtnl_addr_get_family(self._rtnl_addr)
return netlink.AddressFamily(fam)
#####################################################################
# family
@netlink.nlattr('address.family', type=int, immutable=True,
fmt=util.num)
@property
def family(self):
"""Address family"""
fam = capi.rtnl_addr_get_family(self._rtnl_addr)
return netlink.AddressFamily(fam)
@family.setter
def family(self, value):
if not isinstance(value, AddressFamily):
value = AddressFamily(value)
@family.setter
def family(self, value):
if not isinstance(value, AddressFamily):
value = AddressFamily(value)
capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
#####################################################################
# scope
@netlink.nlattr('address.scope', type=int, fmt=util.num)
@property
def scope(self):
"""Address scope"""
scope = capi.rtnl_addr_get_scope(self._rtnl_addr)
return capi.rtnl_scope2str(scope, 32)[0]
#####################################################################
# scope
@netlink.nlattr('address.scope', type=int, fmt=util.num)
@property
def scope(self):
"""Address scope"""
scope = capi.rtnl_addr_get_scope(self._rtnl_addr)
return capi.rtnl_scope2str(scope, 32)[0]
@scope.setter
def scope(self, value):
if type(value) is str:
value = capi.rtnl_str2scope(value)
capi.rtnl_addr_set_scope(self._rtnl_addr, value)
@scope.setter
def scope(self, value):
if type(value) is str:
value = capi.rtnl_str2scope(value)
capi.rtnl_addr_set_scope(self._rtnl_addr, value)
#####################################################################
# local address
@netlink.nlattr('address.local', type=str, immutable=True,
fmt=util.addr)
@property
def local(self):
"""Local address"""
a = capi.rtnl_addr_get_local(self._rtnl_addr)
return netlink.AbstractAddress(a)
#####################################################################
# local address
@netlink.nlattr('address.local', type=str, immutable=True,
fmt=util.addr)
@property
def local(self):
"""Local address"""
a = capi.rtnl_addr_get_local(self._rtnl_addr)
return netlink.AbstractAddress(a)
@local.setter
def local(self, value):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr)
@local.setter
def local(self, value):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr)
# local is immutable but we assume that if _orig does not
# have a local address specified, it was meant to be given here
if capi.rtnl_addr_get_local(self._orig) is None:
capi.rtnl_addr_set_local(self._orig, a._nl_addr)
# local is immutable but we assume that if _orig does not
# have a local address specified, it was meant to be given here
if capi.rtnl_addr_get_local(self._orig) is None:
capi.rtnl_addr_set_local(self._orig, a._nl_addr)
#####################################################################
# Peer address
@netlink.nlattr('address.peer', type=str, fmt=util.addr)
@property
def peer(self):
"""Peer address"""
a = capi.rtnl_addr_get_peer(self._rtnl_addr)
return netlink.AbstractAddress(a)
#####################################################################
# Peer address
@netlink.nlattr('address.peer', type=str, fmt=util.addr)
@property
def peer(self):
"""Peer address"""
a = capi.rtnl_addr_get_peer(self._rtnl_addr)
return netlink.AbstractAddress(a)
@peer.setter
def peer(self, value):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)
@peer.setter
def peer(self, value):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)
#####################################################################
# Broadcast address
@netlink.nlattr('address.broadcast', type=str, fmt=util.addr)
@property
def broadcast(self):
"""Broadcast address"""
a = capi.rtnl_addr_get_broadcast(self._rtnl_addr)
return netlink.AbstractAddress(a)
#####################################################################
# Broadcast address
@netlink.nlattr('address.broadcast', type=str, fmt=util.addr)
@property
def broadcast(self):
"""Broadcast address"""
a = capi.rtnl_addr_get_broadcast(self._rtnl_addr)
return netlink.AbstractAddress(a)
@broadcast.setter
def broadcast(self, value):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)
@broadcast.setter
def broadcast(self, value):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)
#####################################################################
# Multicast address
@netlink.nlattr('address.multicast', type=str, fmt=util.addr)
@property
def multicast(self):
"""multicast address"""
a = capi.rtnl_addr_get_multicast(self._rtnl_addr)
return netlink.AbstractAddress(a)
#####################################################################
# Multicast address
@netlink.nlattr('address.multicast', type=str, fmt=util.addr)
@property
def multicast(self):
"""multicast address"""
a = capi.rtnl_addr_get_multicast(self._rtnl_addr)
return netlink.AbstractAddress(a)
@multicast.setter
def multicast(self, value):
try:
a = netlink.AbstractAddress(value)
except ValueError as err:
raise AttributeError('multicast', err)
@multicast.setter
def multicast(self, value):
try:
a = netlink.AbstractAddress(value)
except ValueError as err:
raise AttributeError('multicast', err)
capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)
capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)
#####################################################################
# Anycast address
@netlink.nlattr('address.anycast', type=str, fmt=util.addr)
@property
def anycast(self):
"""anycast address"""
a = capi.rtnl_addr_get_anycast(self._rtnl_addr)
return netlink.AbstractAddress(a)
#####################################################################
# Anycast address
@netlink.nlattr('address.anycast', type=str, fmt=util.addr)
@property
def anycast(self):
"""anycast address"""
a = capi.rtnl_addr_get_anycast(self._rtnl_addr)
return netlink.AbstractAddress(a)
@anycast.setter
def anycast(self, value):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)
@anycast.setter
def anycast(self, value):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)
#####################################################################
# Valid lifetime
@netlink.nlattr('address.valid_lifetime', type=int, immutable=True,
fmt=util.num)
@property
def valid_lifetime(self):
"""Valid lifetime"""
msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr)
if msecs == 0xFFFFFFFF:
return None
else:
return datetime.timedelta(seconds=msecs)
#####################################################################
# Valid lifetime
@netlink.nlattr('address.valid_lifetime', type=int, immutable=True,
fmt=util.num)
@property
def valid_lifetime(self):
"""Valid lifetime"""
msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr)
if msecs == 0xFFFFFFFF:
return None
else:
return datetime.timedelta(seconds=msecs)
@valid_lifetime.setter
def valid_lifetime(self, value):
capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))
@valid_lifetime.setter
def valid_lifetime(self, value):
capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))
#####################################################################
# Preferred lifetime
@netlink.nlattr('address.preferred_lifetime', type=int,
immutable=True, fmt=util.num)
@property
def preferred_lifetime(self):
"""Preferred lifetime"""
msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr)
if msecs == 0xFFFFFFFF:
return None
else:
return datetime.timedelta(seconds=msecs)
#####################################################################
# Preferred lifetime
@netlink.nlattr('address.preferred_lifetime', type=int,
immutable=True, fmt=util.num)
@property
def preferred_lifetime(self):
"""Preferred lifetime"""
msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr)
if msecs == 0xFFFFFFFF:
return None
else:
return datetime.timedelta(seconds=msecs)
@preferred_lifetime.setter
def preferred_lifetime(self, value):
capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))
@preferred_lifetime.setter
def preferred_lifetime(self, value):
capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))
#####################################################################
# Creation Time
@netlink.nlattr('address.create_time', type=int, immutable=True,
fmt=util.num)
@property
def create_time(self):
"""Creation time"""
hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
return datetime.timedelta(milliseconds=10*hsec)
#####################################################################
# Creation Time
@netlink.nlattr('address.create_time', type=int, immutable=True,
fmt=util.num)
@property
def create_time(self):
"""Creation time"""
hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
return datetime.timedelta(milliseconds=10*hsec)
#####################################################################
# Last Update
@netlink.nlattr('address.last_update', type=int, immutable=True,
fmt=util.num)
@property
def last_update(self):
"""Last update"""
hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
return datetime.timedelta(milliseconds=10*hsec)
#####################################################################
# Last Update
@netlink.nlattr('address.last_update', type=int, immutable=True,
fmt=util.num)
@property
def last_update(self):
"""Last update"""
hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
return datetime.timedelta(milliseconds=10*hsec)
#####################################################################
# add()
def add(self, socket=None, flags=None):
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
#####################################################################
# add()
def add(self, socket=None, flags=None):
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if not flags:
flags = netlink.NLM_F_CREATE
if not flags:
flags = netlink.NLM_F_CREATE
ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags)
if ret < 0:
raise netlink.KernelError(ret)
ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags)
if ret < 0:
raise netlink.KernelError(ret)
#####################################################################
# delete()
def delete(self, socket, flags=0):
"""Attempt to delete this address in the kernel"""
ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
if ret < 0:
raise netlink.KernelError(ret)
#####################################################################
# delete()
def delete(self, socket, flags=0):
"""Attempt to delete this address in the kernel"""
ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
if ret < 0:
raise netlink.KernelError(ret)
###################################################################
# private properties
#
# Used for formatting output. USE AT OWN RISK
@property
def _flags(self):
return ','.join(self.flags)
###################################################################
# private properties
#
# Used for formatting output. USE AT OWN RISK
@property
def _flags(self):
return ','.join(self.flags)
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, nodev=False, indent=''):
"""Return address as formatted text"""
fmt = util.MyFormatter(self, indent)
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, nodev=False, indent=''):
"""Return address as formatted text"""
fmt = util.MyFormatter(self, indent)
buf = fmt.format('{a|local!b}')
buf = fmt.format('{a|local!b}')
if not nodev:
buf += fmt.format(' {a|ifindex}')
if not nodev:
buf += fmt.format(' {a|ifindex}')
buf += fmt.format(' {a|scope}')
buf += fmt.format(' {a|scope}')
if self.label:
buf += fmt.format(' "{a|label}"')
if self.label:
buf += fmt.format(' "{a|label}"')
buf += fmt.format(' <{a|_flags}>')
buf += fmt.format(' <{a|_flags}>')
if details:
buf += fmt.nl('\t{t|broadcast} {t|multicast}') \
+ fmt.nl('\t{t|peer} {t|anycast}')
if details:
buf += fmt.nl('\t{t|broadcast} {t|multicast}') \
+ fmt.nl('\t{t|peer} {t|anycast}')
if self.valid_lifetime:
buf += fmt.nl('\t{s|valid-lifetime!k} '\
'{a|valid_lifetime}')
if self.valid_lifetime:
buf += fmt.nl('\t{s|valid-lifetime!k} '\
'{a|valid_lifetime}')
if self.preferred_lifetime:
buf += fmt.nl('\t{s|preferred-lifetime!k} '\
'{a|preferred_lifetime}')
if self.preferred_lifetime:
buf += fmt.nl('\t{s|preferred-lifetime!k} '\
'{a|preferred_lifetime}')
if stats and (self.create_time or self.last_update):
buf += self.nl('\t{s|created!k} {a|create_time}'\
' {s|last-updated!k} {a|last_update}')
if stats and (self.create_time or self.last_update):
buf += self.nl('\t{s|created!k} {a|create_time}'\
' {s|last-updated!k} {a|last_update}')
return buf
return buf

View File

@ -8,26 +8,26 @@ This module provides an interface to view configured network links,
modify them and to add and delete virtual network links.
The following is a basic example:
import netlink.core as netlink
import netlink.route.link as link
import netlink.core as netlink
import netlink.route.link as link
sock = netlink.Socket()
sock.connect(netlink.NETLINK_ROUTE)
sock = netlink.Socket()
sock.connect(netlink.NETLINK_ROUTE)
cache = link.LinkCache() # create new empty link cache
cache.refill(sock) # fill cache with all configured links
eth0 = cache['eth0'] # lookup link "eth0"
print eth0 # print basic configuration
cache = link.LinkCache() # create new empty link cache
cache.refill(sock) # fill cache with all configured links
eth0 = cache['eth0'] # lookup link "eth0"
print eth0 # print basic configuration
The module contains the following public classes:
- Link -- Represents a network link. Instances can be created directly
via the constructor (empty link objects) or via the refill()
method of a LinkCache.
via the constructor (empty link objects) or via the refill()
method of a LinkCache.
- LinkCache -- Derived from netlink.Cache, holds any number of
network links (Link instances). Main purpose is to keep
a local list of all network links configured in the
kernel.
network links (Link instances). Main purpose is to keep
a local list of all network links configured in the
kernel.
The following public functions exist:
- get_from_kernel(socket, name)
@ -38,9 +38,9 @@ from __future__ import absolute_import
__version__ = "0.1"
__all__ = [
'LinkCache',
'Link',
'get_from_kernel']
'LinkCache',
'Link',
'get_from_kernel']
import socket
import sys
@ -113,477 +113,477 @@ ICMP6_OUTERRORS = 56
###########################################################################
# Link Cache
class LinkCache(netlink.Cache):
"""Cache of network links"""
"""Cache of network links"""
def __init__(self, family=socket.AF_UNSPEC, cache=None):
if not cache:
cache = self._alloc_cache_name("route/link")
def __init__(self, family=socket.AF_UNSPEC, cache=None):
if not cache:
cache = self._alloc_cache_name("route/link")
self._info_module = None
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._set_arg1(family)
self._info_module = None
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._set_arg1(family)
def __getitem__(self, key):
if type(key) is int:
link = capi.rtnl_link_get(self._nl_cache, key)
else:
link = capi.rtnl_link_get_by_name(self._nl_cache, key)
def __getitem__(self, key):
if type(key) is int:
link = capi.rtnl_link_get(self._nl_cache, key)
else:
link = capi.rtnl_link_get_by_name(self._nl_cache, key)
if link is None:
raise KeyError()
else:
return Link.from_capi(link)
if link is None:
raise KeyError()
else:
return Link.from_capi(link)
def _new_object(self, obj):
return Link(obj)
def _new_object(self, obj):
return Link(obj)
def _new_cache(self, cache):
return LinkCache(family=self.arg1, cache=cache)
def _new_cache(self, cache):
return LinkCache(family=self.arg1, cache=cache)
###########################################################################
# Link Object
class Link(netlink.Object):
"""Network link"""
"""Network link"""
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/link", "link", obj)
self._rtnl_link = self._obj2type(self._nl_object)
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/link", "link", obj)
self._rtnl_link = self._obj2type(self._nl_object)
if self.type:
self._module_lookup('netlink.route.links.' + self.type)
if self.type:
self._module_lookup('netlink.route.links.' + self.type)
self.inet = inet.InetLink(self)
self.af = {'inet' : self.inet }
self.inet = inet.InetLink(self)
self.af = {'inet' : self.inet }
@classmethod
def from_capi(cls, obj):
return cls(capi.link2obj(obj))
@classmethod
def from_capi(cls, obj):
return cls(capi.link2obj(obj))
def _obj2type(self, obj):
return capi.obj2link(obj)
def _obj2type(self, obj):
return capi.obj2link(obj)
def __cmp__(self, other):
return self.ifindex - other.ifindex
def __cmp__(self, other):
return self.ifindex - other.ifindex
def _new_instance(self, obj):
if not obj:
raise ValueError()
def _new_instance(self, obj):
if not obj:
raise ValueError()
return Link(obj)
return Link(obj)
#####################################################################
# ifindex
@netlink.nlattr('link.ifindex', type=int, immutable=True, fmt=util.num)
@property
def ifindex(self):
"""interface index"""
return capi.rtnl_link_get_ifindex(self._rtnl_link)
#####################################################################
# ifindex
@netlink.nlattr('link.ifindex', type=int, immutable=True, fmt=util.num)
@property
def ifindex(self):
"""interface index"""
return capi.rtnl_link_get_ifindex(self._rtnl_link)
@ifindex.setter
def ifindex(self, value):
capi.rtnl_link_set_ifindex(self._rtnl_link, int(value))
@ifindex.setter
def ifindex(self, value):
capi.rtnl_link_set_ifindex(self._rtnl_link, int(value))
# ifindex is immutable but we assume that if _orig does not
# have an ifindex specified, it was meant to be given here
if capi.rtnl_link_get_ifindex(self._orig) == 0:
capi.rtnl_link_set_ifindex(self._orig, int(value))
# ifindex is immutable but we assume that if _orig does not
# have an ifindex specified, it was meant to be given here
if capi.rtnl_link_get_ifindex(self._orig) == 0:
capi.rtnl_link_set_ifindex(self._orig, int(value))
#####################################################################
# name
@netlink.nlattr('link.name', type=str, fmt=util.bold)
@property
def name(self):
"""Name of link"""
return capi.rtnl_link_get_name(self._rtnl_link)
#####################################################################
# name
@netlink.nlattr('link.name', type=str, fmt=util.bold)
@property
def name(self):
"""Name of link"""
return capi.rtnl_link_get_name(self._rtnl_link)
@name.setter
def name(self, value):
capi.rtnl_link_set_name(self._rtnl_link, value)
@name.setter
def name(self, value):
capi.rtnl_link_set_name(self._rtnl_link, value)
# name is the secondary identifier, if _orig does not have
# the name specified yet, assume it was meant to be specified
# here. ifindex will always take priority, therefore if ifindex
# is specified as well, this will be ignored automatically.
if capi.rtnl_link_get_name(self._orig) is None:
capi.rtnl_link_set_name(self._orig, value)
# name is the secondary identifier, if _orig does not have
# the name specified yet, assume it was meant to be specified
# here. ifindex will always take priority, therefore if ifindex
# is specified as well, this will be ignored automatically.
if capi.rtnl_link_get_name(self._orig) is None:
capi.rtnl_link_set_name(self._orig, value)
#####################################################################
# flags
@netlink.nlattr('link.flags', type=str, fmt=util.string)
@property
def flags(self):
"""Flags
Setting this property will *Not* reset flags to value you supply in
Examples:
link.flags = '+xxx' # add xxx flag
link.flags = 'xxx' # exactly the same
link.flags = '-xxx' # remove xxx flag
link.flags = [ '+xxx', '-yyy' ] # list operation
"""
flags = capi.rtnl_link_get_flags(self._rtnl_link)
return capi.rtnl_link_flags2str(flags, 256)[0].split(',')
#####################################################################
# flags
@netlink.nlattr('link.flags', type=str, fmt=util.string)
@property
def flags(self):
"""Flags
Setting this property will *Not* reset flags to value you supply in
Examples:
link.flags = '+xxx' # add xxx flag
link.flags = 'xxx' # exactly the same
link.flags = '-xxx' # remove xxx flag
link.flags = [ '+xxx', '-yyy' ] # list operation
"""
flags = capi.rtnl_link_get_flags(self._rtnl_link)
return capi.rtnl_link_flags2str(flags, 256)[0].split(',')
def _set_flag(self, flag):
if flag.startswith('-'):
i = capi.rtnl_link_str2flags(flag[1:])
capi.rtnl_link_unset_flags(self._rtnl_link, i)
elif flag.startswith('+'):
i = capi.rtnl_link_str2flags(flag[1:])
capi.rtnl_link_set_flags(self._rtnl_link, i)
else:
i = capi.rtnl_link_str2flags(flag)
capi.rtnl_link_set_flags(self._rtnl_link, i)
def _set_flag(self, flag):
if flag.startswith('-'):
i = capi.rtnl_link_str2flags(flag[1:])
capi.rtnl_link_unset_flags(self._rtnl_link, i)
elif flag.startswith('+'):
i = capi.rtnl_link_str2flags(flag[1:])
capi.rtnl_link_set_flags(self._rtnl_link, i)
else:
i = capi.rtnl_link_str2flags(flag)
capi.rtnl_link_set_flags(self._rtnl_link, i)
@flags.setter
def flags(self, value):
if not (type(value) is str):
for flag in value:
self._set_flag(flag)
else:
self._set_flag(value)
@flags.setter
def flags(self, value):
if not (type(value) is str):
for flag in value:
self._set_flag(flag)
else:
self._set_flag(value)
#####################################################################
# mtu
@netlink.nlattr('link.mtu', type=int, fmt=util.num)
@property
def mtu(self):
"""Maximum Transmission Unit"""
return capi.rtnl_link_get_mtu(self._rtnl_link)
#####################################################################
# mtu
@netlink.nlattr('link.mtu', type=int, fmt=util.num)
@property
def mtu(self):
"""Maximum Transmission Unit"""
return capi.rtnl_link_get_mtu(self._rtnl_link)
@mtu.setter
def mtu(self, value):
capi.rtnl_link_set_mtu(self._rtnl_link, int(value))
@mtu.setter
def mtu(self, value):
capi.rtnl_link_set_mtu(self._rtnl_link, int(value))
#####################################################################
# family
@netlink.nlattr('link.family', type=int, immutable=True, fmt=util.num)
@property
def family(self):
"""Address family"""
return capi.rtnl_link_get_family(self._rtnl_link)
#####################################################################
# family
@netlink.nlattr('link.family', type=int, immutable=True, fmt=util.num)
@property
def family(self):
"""Address family"""
return capi.rtnl_link_get_family(self._rtnl_link)
@family.setter
def family(self, value):
capi.rtnl_link_set_family(self._rtnl_link, value)
@family.setter
def family(self, value):
capi.rtnl_link_set_family(self._rtnl_link, value)
#####################################################################
# address
@netlink.nlattr('link.address', type=str, fmt=util.addr)
@property
def address(self):
"""Hardware address (MAC address)"""
a = capi.rtnl_link_get_addr(self._rtnl_link)
return netlink.AbstractAddress(a)
#####################################################################
# address
@netlink.nlattr('link.address', type=str, fmt=util.addr)
@property
def address(self):
"""Hardware address (MAC address)"""
a = capi.rtnl_link_get_addr(self._rtnl_link)
return netlink.AbstractAddress(a)
@address.setter
def address(self, value):
capi.rtnl_link_set_addr(self._rtnl_link, value._addr)
@address.setter
def address(self, value):
capi.rtnl_link_set_addr(self._rtnl_link, value._addr)
#####################################################################
# broadcast
@netlink.nlattr('link.broadcast', type=str, fmt=util.addr)
@property
def broadcast(self):
"""Hardware broadcast address"""
a = capi.rtnl_link_get_broadcast(self._rtnl_link)
return netlink.AbstractAddress(a)
#####################################################################
# broadcast
@netlink.nlattr('link.broadcast', type=str, fmt=util.addr)
@property
def broadcast(self):
"""Hardware broadcast address"""
a = capi.rtnl_link_get_broadcast(self._rtnl_link)
return netlink.AbstractAddress(a)
@broadcast.setter
def broadcast(self, value):
capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr)
@broadcast.setter
def broadcast(self, value):
capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr)
#####################################################################
# qdisc
@netlink.nlattr('link.qdisc', type=str, immutable=True, fmt=util.string)
@property
def qdisc(self):
"""Name of qdisc (cannot be changed)"""
return capi.rtnl_link_get_qdisc(self._rtnl_link)
#####################################################################
# qdisc
@netlink.nlattr('link.qdisc', type=str, immutable=True, fmt=util.string)
@property
def qdisc(self):
"""Name of qdisc (cannot be changed)"""
return capi.rtnl_link_get_qdisc(self._rtnl_link)
@qdisc.setter
def qdisc(self, value):
capi.rtnl_link_set_qdisc(self._rtnl_link, value)
@qdisc.setter
def qdisc(self, value):
capi.rtnl_link_set_qdisc(self._rtnl_link, value)
#####################################################################
# txqlen
@netlink.nlattr('link.txqlen', type=int, fmt=util.num)
@property
def txqlen(self):
"""Length of transmit queue"""
return capi.rtnl_link_get_txqlen(self._rtnl_link)
#####################################################################
# txqlen
@netlink.nlattr('link.txqlen', type=int, fmt=util.num)
@property
def txqlen(self):
"""Length of transmit queue"""
return capi.rtnl_link_get_txqlen(self._rtnl_link)
@txqlen.setter
def txqlen(self, value):
capi.rtnl_link_set_txqlen(self._rtnl_link, int(value))
@txqlen.setter
def txqlen(self, value):
capi.rtnl_link_set_txqlen(self._rtnl_link, int(value))
#####################################################################
# weight
@netlink.nlattr('link.weight', type=str, fmt=util.string)
@property
def weight(self):
"""Weight"""
v = capi.rtnl_link_get_weight(self._rtnl_link)
if v == 4294967295:
return 'max'
else:
return str(v)
#####################################################################
# weight
@netlink.nlattr('link.weight', type=str, fmt=util.string)
@property
def weight(self):
"""Weight"""
v = capi.rtnl_link_get_weight(self._rtnl_link)
if v == 4294967295:
return 'max'
else:
return str(v)
@weight.setter
def weight(self, value):
if value == 'max':
v = 4294967295
else:
v = int(value)
capi.rtnl_link_set_weight(self._rtnl_link, v)
@weight.setter
def weight(self, value):
if value == 'max':
v = 4294967295
else:
v = int(value)
capi.rtnl_link_set_weight(self._rtnl_link, v)
#####################################################################
# arptype
@netlink.nlattr('link.arptype', type=str, immutable=True, fmt=util.string)
@property
def arptype(self):
"""Type of link (cannot be changed)"""
type = capi.rtnl_link_get_arptype(self._rtnl_link)
return core_capi.nl_llproto2str(type, 64)[0]
#####################################################################
# arptype
@netlink.nlattr('link.arptype', type=str, immutable=True, fmt=util.string)
@property
def arptype(self):
"""Type of link (cannot be changed)"""
type = capi.rtnl_link_get_arptype(self._rtnl_link)
return core_capi.nl_llproto2str(type, 64)[0]
@arptype.setter
def arptype(self, value):
i = core_capi.nl_str2llproto(value)
capi.rtnl_link_set_arptype(self._rtnl_link, i)
@arptype.setter
def arptype(self, value):
i = core_capi.nl_str2llproto(value)
capi.rtnl_link_set_arptype(self._rtnl_link, i)
#####################################################################
# operstate
@netlink.nlattr('link.operstate', type=str, immutable=True,
fmt=util.string, title='state')
@property
def operstate(self):
"""Operational status"""
operstate = capi.rtnl_link_get_operstate(self._rtnl_link)
return capi.rtnl_link_operstate2str(operstate, 32)[0]
#####################################################################
# operstate
@netlink.nlattr('link.operstate', type=str, immutable=True,
fmt=util.string, title='state')
@property
def operstate(self):
"""Operational status"""
operstate = capi.rtnl_link_get_operstate(self._rtnl_link)
return capi.rtnl_link_operstate2str(operstate, 32)[0]
@operstate.setter
def operstate(self, value):
i = capi.rtnl_link_str2operstate(flag)
capi.rtnl_link_set_operstate(self._rtnl_link, i)
@operstate.setter
def operstate(self, value):
i = capi.rtnl_link_str2operstate(flag)
capi.rtnl_link_set_operstate(self._rtnl_link, i)
#####################################################################
# mode
@netlink.nlattr('link.mode', type=str, immutable=True, fmt=util.string)
@property
def mode(self):
"""Link mode"""
mode = capi.rtnl_link_get_linkmode(self._rtnl_link)
return capi.rtnl_link_mode2str(mode, 32)[0]
#####################################################################
# mode
@netlink.nlattr('link.mode', type=str, immutable=True, fmt=util.string)
@property
def mode(self):
"""Link mode"""
mode = capi.rtnl_link_get_linkmode(self._rtnl_link)
return capi.rtnl_link_mode2str(mode, 32)[0]
@mode.setter
def mode(self, value):
i = capi.rtnl_link_str2mode(flag)
capi.rtnl_link_set_linkmode(self._rtnl_link, i)
@mode.setter
def mode(self, value):
i = capi.rtnl_link_str2mode(flag)
capi.rtnl_link_set_linkmode(self._rtnl_link, i)
#####################################################################
# alias
@netlink.nlattr('link.alias', type=str, fmt=util.string)
@property
def alias(self):
"""Interface alias (SNMP)"""
return capi.rtnl_link_get_ifalias(self._rtnl_link)
#####################################################################
# alias
@netlink.nlattr('link.alias', type=str, fmt=util.string)
@property
def alias(self):
"""Interface alias (SNMP)"""
return capi.rtnl_link_get_ifalias(self._rtnl_link)
@alias.setter
def alias(self, value):
capi.rtnl_link_set_ifalias(self._rtnl_link, value)
@alias.setter
def alias(self, value):
capi.rtnl_link_set_ifalias(self._rtnl_link, value)
#####################################################################
# type
@netlink.nlattr('link.type', type=str, fmt=util.string)
@property
def type(self):
"""Link type"""
return capi.rtnl_link_get_type(self._rtnl_link)
#####################################################################
# type
@netlink.nlattr('link.type', type=str, fmt=util.string)
@property
def type(self):
"""Link type"""
return capi.rtnl_link_get_type(self._rtnl_link)
@type.setter
def type(self, value):
if capi.rtnl_link_set_type(self._rtnl_link, value) < 0:
raise NameError("unknown info type")
@type.setter
def type(self, value):
if capi.rtnl_link_set_type(self._rtnl_link, value) < 0:
raise NameError("unknown info type")
self._module_lookup('netlink.route.links.' + value)
self._module_lookup('netlink.route.links.' + value)
#####################################################################
# get_stat()
def get_stat(self, stat):
"""Retrieve statistical information"""
if type(stat) is str:
stat = capi.rtnl_link_str2stat(stat)
if stat < 0:
raise NameError("unknown name of statistic")
#####################################################################
# get_stat()
def get_stat(self, stat):
"""Retrieve statistical information"""
if type(stat) is str:
stat = capi.rtnl_link_str2stat(stat)
if stat < 0:
raise NameError("unknown name of statistic")
return capi.rtnl_link_get_stat(self._rtnl_link, stat)
return capi.rtnl_link_get_stat(self._rtnl_link, stat)
#####################################################################
# add()
def add(self, socket=None, flags=None):
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
#####################################################################
# add()
def add(self, socket=None, flags=None):
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if not flags:
flags = netlink.NLM_F_CREATE
if not flags:
flags = netlink.NLM_F_CREATE
ret = capi.rtnl_link_add(socket._sock, self._rtnl_link, flags)
if ret < 0:
raise netlink.KernelError(ret)
ret = capi.rtnl_link_add(socket._sock, self._rtnl_link, flags)
if ret < 0:
raise netlink.KernelError(ret)
#####################################################################
# change()
def change(self, socket=None, flags=0):
"""Commit changes made to the link object"""
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
#####################################################################
# change()
def change(self, socket=None, flags=0):
"""Commit changes made to the link object"""
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if not self._orig:
raise NetlinkError("Original link not available")
ret = capi.rtnl_link_change(socket._sock, self._orig, self._rtnl_link, flags)
if ret < 0:
raise netlink.KernelError(ret)
if not self._orig:
raise NetlinkError("Original link not available")
ret = capi.rtnl_link_change(socket._sock, self._orig, self._rtnl_link, flags)
if ret < 0:
raise netlink.KernelError(ret)
#####################################################################
# delete()
def delete(self, socket=None):
"""Attempt to delete this link in the kernel"""
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
#####################################################################
# delete()
def delete(self, socket=None):
"""Attempt to delete this link in the kernel"""
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
ret = capi.rtnl_link_delete(socket._sock, self._rtnl_link)
if ret < 0:
raise netlink.KernelError(ret)
ret = capi.rtnl_link_delete(socket._sock, self._rtnl_link)
if ret < 0:
raise netlink.KernelError(ret)
###################################################################
# private properties
#
# Used for formatting output. USE AT OWN RISK
@property
def _state(self):
if 'up' in self.flags:
buf = util.good('up')
if 'lowerup' not in self.flags:
buf += ' ' + util.bad('no-carrier')
else:
buf = util.bad('down')
return buf
###################################################################
# private properties
#
# Used for formatting output. USE AT OWN RISK
@property
def _state(self):
if 'up' in self.flags:
buf = util.good('up')
if 'lowerup' not in self.flags:
buf += ' ' + util.bad('no-carrier')
else:
buf = util.bad('down')
return buf
@property
def _brief(self):
return self._module_brief() + self._foreach_af('brief')
@property
def _brief(self):
return self._module_brief() + self._foreach_af('brief')
@property
def _flags(self):
ignore = ['up', 'running', 'lowerup']
return ','.join([flag for flag in self.flags if flag not in ignore])
@property
def _flags(self):
ignore = ['up', 'running', 'lowerup']
return ','.join([flag for flag in self.flags if flag not in ignore])
def _foreach_af(self, name, args=None):
buf = ''
for af in self.af:
try:
func = getattr(self.af[af], name)
s = str(func(args))
if len(s) > 0:
buf += ' ' + s
except AttributeError:
pass
return buf
def _foreach_af(self, name, args=None):
buf = ''
for af in self.af:
try:
func = getattr(self.af[af], name)
s = str(func(args))
if len(s) > 0:
buf += ' ' + s
except AttributeError:
pass
return buf
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, indent=''):
"""Return link as formatted text"""
fmt = util.MyFormatter(self, indent)
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, indent=''):
"""Return link as formatted text"""
fmt = util.MyFormatter(self, indent)
buf = fmt.format('{a|ifindex} {a|name} {a|arptype} {a|address} '\
'{a|_state} <{a|_flags}> {a|_brief}')
buf = fmt.format('{a|ifindex} {a|name} {a|arptype} {a|address} '\
'{a|_state} <{a|_flags}> {a|_brief}')
if details:
buf += fmt.nl('\t{t|mtu} {t|txqlen} {t|weight} '\
'{t|qdisc} {t|operstate}')
buf += fmt.nl('\t{t|broadcast} {t|alias}')
if details:
buf += fmt.nl('\t{t|mtu} {t|txqlen} {t|weight} '\
'{t|qdisc} {t|operstate}')
buf += fmt.nl('\t{t|broadcast} {t|alias}')
buf += self._foreach_af('details', fmt)
buf += self._foreach_af('details', fmt)
if stats:
l = [['Packets', RX_PACKETS, TX_PACKETS],
['Bytes', RX_BYTES, TX_BYTES],
['Errors', RX_ERRORS, TX_ERRORS],
['Dropped', RX_DROPPED, TX_DROPPED],
['Compressed', RX_COMPRESSED, TX_COMPRESSED],
['FIFO Errors', RX_FIFO_ERR, TX_FIFO_ERR],
['Length Errors', RX_LEN_ERR, None],
['Over Errors', RX_OVER_ERR, None],
['CRC Errors', RX_CRC_ERR, None],
['Frame Errors', RX_FRAME_ERR, None],
['Missed Errors', RX_MISSED_ERR, None],
['Abort Errors', None, TX_ABORT_ERR],
['Carrier Errors', None, TX_CARRIER_ERR],
['Heartbeat Errors', None, TX_HBEAT_ERR],
['Window Errors', None, TX_WIN_ERR],
['Collisions', None, COLLISIONS],
['Multicast', None, MULTICAST],
['', None, None],
['Ipv6:', None, None],
['Packets', IP6_INPKTS, IP6_OUTPKTS],
['Bytes', IP6_INOCTETS, IP6_OUTOCTETS],
['Discards', IP6_INDISCARDS, IP6_OUTDISCARDS],
['Multicast Packets', IP6_INMCASTPKTS, IP6_OUTMCASTPKTS],
['Multicast Bytes', IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS],
['Broadcast Packets', IP6_INBCASTPKTS, IP6_OUTBCASTPKTS],
['Broadcast Bytes', IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS],
['Delivers', IP6_INDELIVERS, None],
['Forwarded', None, IP6_OUTFORWDATAGRAMS],
['No Routes', IP6_INNOROUTES, IP6_OUTNOROUTES],
['Header Errors', IP6_INHDRERRORS, None],
['Too Big Errors', IP6_INTOOBIGERRORS, None],
['Address Errors', IP6_INADDRERRORS, None],
['Unknown Protocol', IP6_INUNKNOWNPROTOS, None],
['Truncated Packets', IP6_INTRUNCATEDPKTS, None],
['Reasm Timeouts', IP6_REASMTIMEOUT, None],
['Reasm Requests', IP6_REASMREQDS, None],
['Reasm Failures', IP6_REASMFAILS, None],
['Reasm OK', IP6_REASMOKS, None],
['Frag Created', None, IP6_FRAGCREATES],
['Frag Failures', None, IP6_FRAGFAILS],
['Frag OK', None, IP6_FRAGOKS],
['', None, None],
['ICMPv6:', None, None],
['Messages', ICMP6_INMSGS, ICMP6_OUTMSGS],
['Errors', ICMP6_INERRORS, ICMP6_OUTERRORS]]
if stats:
l = [['Packets', RX_PACKETS, TX_PACKETS],
['Bytes', RX_BYTES, TX_BYTES],
['Errors', RX_ERRORS, TX_ERRORS],
['Dropped', RX_DROPPED, TX_DROPPED],
['Compressed', RX_COMPRESSED, TX_COMPRESSED],
['FIFO Errors', RX_FIFO_ERR, TX_FIFO_ERR],
['Length Errors', RX_LEN_ERR, None],
['Over Errors', RX_OVER_ERR, None],
['CRC Errors', RX_CRC_ERR, None],
['Frame Errors', RX_FRAME_ERR, None],
['Missed Errors', RX_MISSED_ERR, None],
['Abort Errors', None, TX_ABORT_ERR],
['Carrier Errors', None, TX_CARRIER_ERR],
['Heartbeat Errors', None, TX_HBEAT_ERR],
['Window Errors', None, TX_WIN_ERR],
['Collisions', None, COLLISIONS],
['Multicast', None, MULTICAST],
['', None, None],
['Ipv6:', None, None],
['Packets', IP6_INPKTS, IP6_OUTPKTS],
['Bytes', IP6_INOCTETS, IP6_OUTOCTETS],
['Discards', IP6_INDISCARDS, IP6_OUTDISCARDS],
['Multicast Packets', IP6_INMCASTPKTS, IP6_OUTMCASTPKTS],
['Multicast Bytes', IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS],
['Broadcast Packets', IP6_INBCASTPKTS, IP6_OUTBCASTPKTS],
['Broadcast Bytes', IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS],
['Delivers', IP6_INDELIVERS, None],
['Forwarded', None, IP6_OUTFORWDATAGRAMS],
['No Routes', IP6_INNOROUTES, IP6_OUTNOROUTES],
['Header Errors', IP6_INHDRERRORS, None],
['Too Big Errors', IP6_INTOOBIGERRORS, None],
['Address Errors', IP6_INADDRERRORS, None],
['Unknown Protocol', IP6_INUNKNOWNPROTOS, None],
['Truncated Packets', IP6_INTRUNCATEDPKTS, None],
['Reasm Timeouts', IP6_REASMTIMEOUT, None],
['Reasm Requests', IP6_REASMREQDS, None],
['Reasm Failures', IP6_REASMFAILS, None],
['Reasm OK', IP6_REASMOKS, None],
['Frag Created', None, IP6_FRAGCREATES],
['Frag Failures', None, IP6_FRAGFAILS],
['Frag OK', None, IP6_FRAGOKS],
['', None, None],
['ICMPv6:', None, None],
['Messages', ICMP6_INMSGS, ICMP6_OUTMSGS],
['Errors', ICMP6_INERRORS, ICMP6_OUTERRORS]]
buf += '\n\t%s%s%s%s\n' % (33 * ' ', util.title('RX'),
15 * ' ', util.title('TX'))
buf += '\n\t%s%s%s%s\n' % (33 * ' ', util.title('RX'),
15 * ' ', util.title('TX'))
for row in l:
row[0] = util.kw(row[0])
row[1] = self.get_stat(row[1]) if row[1] else ''
row[2] = self.get_stat(row[2]) if row[2] else ''
buf += '\t{0:27} {1:>16} {2:>16}\n'.format(*row)
for row in l:
row[0] = util.kw(row[0])
row[1] = self.get_stat(row[1]) if row[1] else ''
row[2] = self.get_stat(row[2]) if row[2] else ''
buf += '\t{0:27} {1:>16} {2:>16}\n'.format(*row)
buf += self._foreach_af('stats')
buf += self._foreach_af('stats')
return buf
return buf
def get(name, socket=None):
"""Lookup Link object directly from kernel"""
if not name:
raise ValueError()
"""Lookup Link object directly from kernel"""
if not name:
raise ValueError()
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
link = capi.get_from_kernel(socket._sock, 0, name)
if not link:
return None
link = capi.get_from_kernel(socket._sock, 0, name)
if not link:
return None
return Link.from_capi(link)
return Link.from_capi(link)
_link_cache = LinkCache()
def resolve(name):
_link_cache.refill()
return _link_cache[name]
_link_cache.refill()
return _link_cache[name]

View File

@ -14,12 +14,12 @@ __all__ = ['init']
from ... import core as netlink
from .. import capi as capi
class DummyLink(object):
def __init__(self, link):
self._rtnl_link = link
def __init__(self, link):
self._rtnl_link = link
def brief(self):
return 'dummy'
def brief(self):
return 'dummy'
def init(link):
link.dummy = DummyLink(link._rtnl_link)
return link.dummy
link.dummy = DummyLink(link._rtnl_link)
return link.dummy

View File

@ -41,98 +41,98 @@ DEVCONF_PROXY_ARP_PVLAN = 25
DEVCONF_MAX = DEVCONF_PROXY_ARP_PVLAN
def _resolve(id):
if type(id) is str:
id = capi.rtnl_link_inet_str2devconf(id)[0]
if id < 0:
raise NameError("unknown configuration id")
return id
if type(id) is str:
id = capi.rtnl_link_inet_str2devconf(id)[0]
if id < 0:
raise NameError("unknown configuration id")
return id
class InetLink(object):
def __init__(self, link):
self._link = link
def __init__(self, link):
self._link = link
def details(self, fmt):
buf = fmt.nl('\n\t{0}\n\t'.format(util.title('Configuration:')))
def details(self, fmt):
buf = fmt.nl('\n\t{0}\n\t'.format(util.title('Configuration:')))
for i in range(DEVCONF_FORWARDING,DEVCONF_MAX+1):
if i & 1 and i > 1:
buf += fmt.nl('\t')
txt = util.kw(capi.rtnl_link_inet_devconf2str(i, 32)[0])
buf += fmt.format('{0:28s} {1:12} ', txt,
self.get_conf(i))
for i in range(DEVCONF_FORWARDING,DEVCONF_MAX+1):
if i & 1 and i > 1:
buf += fmt.nl('\t')
txt = util.kw(capi.rtnl_link_inet_devconf2str(i, 32)[0])
buf += fmt.format('{0:28s} {1:12} ', txt,
self.get_conf(i))
return buf
return buf
def get_conf(self, id):
return capi.inet_get_conf(self._link._rtnl_link, _resolve(id))
def get_conf(self, id):
return capi.inet_get_conf(self._link._rtnl_link, _resolve(id))
def set_conf(self, id, value):
return capi.rtnl_link_inet_set_conf(self._link._rtnl_link,
_resolve(id), int(value))
def set_conf(self, id, value):
return capi.rtnl_link_inet_set_conf(self._link._rtnl_link,
_resolve(id), int(value))
@netlink.nlattr('link.inet.forwarding', type=bool, fmt=util.bool)
@property
def forwarding(self):
return bool(self.get_conf(DEVCONF_FORWARDING))
@netlink.nlattr('link.inet.forwarding', type=bool, fmt=util.bool)
@property
def forwarding(self):
return bool(self.get_conf(DEVCONF_FORWARDING))
@forwarding.setter
def forwarding(self, value):
self.set_conf(DEVCONF_FORWARDING, int(value))
@forwarding.setter
def forwarding(self, value):
self.set_conf(DEVCONF_FORWARDING, int(value))
@netlink.nlattr('link.inet.mc_forwarding', type=bool, fmt=util.bool)
@property
def mc_forwarding(self):
return bool(self.get_conf(DEVCONF_MC_FORWARDING))
@netlink.nlattr('link.inet.mc_forwarding', type=bool, fmt=util.bool)
@property
def mc_forwarding(self):
return bool(self.get_conf(DEVCONF_MC_FORWARDING))
@mc_forwarding.setter
def mc_forwarding(self, value):
self.set_conf(DEVCONF_MC_FORWARDING, int(value))
@mc_forwarding.setter
def mc_forwarding(self, value):
self.set_conf(DEVCONF_MC_FORWARDING, int(value))
@netlink.nlattr('link.inet.proxy_arp', type=bool, fmt=util.bool)
@property
def proxy_arp(self):
return bool(self.get_conf(DEVCONF_PROXY_ARP))
@netlink.nlattr('link.inet.proxy_arp', type=bool, fmt=util.bool)
@property
def proxy_arp(self):
return bool(self.get_conf(DEVCONF_PROXY_ARP))
@proxy_arp.setter
def proxy_arp(self, value):
self.set_conf(DEVCONF_PROXY_ARP, int(value))
@proxy_arp.setter
def proxy_arp(self, value):
self.set_conf(DEVCONF_PROXY_ARP, int(value))
@netlink.nlattr('link.inet.accept_redirects', type=bool, fmt=util.bool)
@property
def accept_redirects(self):
return bool(self.get_conf(DEVCONF_ACCEPT_REDIRECTS))
@netlink.nlattr('link.inet.accept_redirects', type=bool, fmt=util.bool)
@property
def accept_redirects(self):
return bool(self.get_conf(DEVCONF_ACCEPT_REDIRECTS))
@accept_redirects.setter
def accept_redirects(self, value):
self.set_conf(DEVCONF_ACCEPT_REDIRECTS, int(value))
@accept_redirects.setter
def accept_redirects(self, value):
self.set_conf(DEVCONF_ACCEPT_REDIRECTS, int(value))
@netlink.nlattr('link.inet.secure_redirects', type=bool, fmt=util.bool)
@property
def secure_redirects(self):
return bool(self.get_conf(DEVCONF_SECURE_REDIRECTS))
@netlink.nlattr('link.inet.secure_redirects', type=bool, fmt=util.bool)
@property
def secure_redirects(self):
return bool(self.get_conf(DEVCONF_SECURE_REDIRECTS))
@secure_redirects.setter
def secure_redirects(self, value):
self.set_conf(DEVCONF_SECURE_REDIRECTS, int(value))
@secure_redirects.setter
def secure_redirects(self, value):
self.set_conf(DEVCONF_SECURE_REDIRECTS, int(value))
@netlink.nlattr('link.inet.send_redirects', type=bool, fmt=util.bool)
@property
def send_redirects(self):
return bool(self.get_conf(DEVCONF_SEND_REDIRECTS))
@netlink.nlattr('link.inet.send_redirects', type=bool, fmt=util.bool)
@property
def send_redirects(self):
return bool(self.get_conf(DEVCONF_SEND_REDIRECTS))
@send_redirects.setter
def send_redirects(self, value):
self.set_conf(DEVCONF_SEND_REDIRECTS, int(value))
@send_redirects.setter
def send_redirects(self, value):
self.set_conf(DEVCONF_SEND_REDIRECTS, int(value))
@netlink.nlattr('link.inet.shared_media', type=bool, fmt=util.bool)
@property
def shared_media(self):
return bool(self.get_conf(DEVCONF_SHARED_MEDIA))
@netlink.nlattr('link.inet.shared_media', type=bool, fmt=util.bool)
@property
def shared_media(self):
return bool(self.get_conf(DEVCONF_SHARED_MEDIA))
@shared_media.setter
def shared_media(self, value):
self.set_conf(DEVCONF_SHARED_MEDIA, int(value))
@shared_media.setter
def shared_media(self, value):
self.set_conf(DEVCONF_SHARED_MEDIA, int(value))
# IPV4_DEVCONF_RP_FILTER,
# IPV4_DEVCONF_ACCEPT_SOURCE_ROUTE,

View File

@ -12,64 +12,64 @@ from __future__ import absolute_import
from ... import core as netlink
from .. import capi as capi
class VLANLink(object):
def __init__(self, link):
self._link = link
def __init__(self, link):
self._link = link
###################################################################
# id
@netlink.nlattr('link.vlan.id', type=int)
@property
def id(self):
"""vlan identifier"""
return capi.rtnl_link_vlan_get_id(self._link)
###################################################################
# id
@netlink.nlattr('link.vlan.id', type=int)
@property
def id(self):
"""vlan identifier"""
return capi.rtnl_link_vlan_get_id(self._link)
@id.setter
def id(self, value):
capi.rtnl_link_vlan_set_id(self._link, int(value))
@id.setter
def id(self, value):
capi.rtnl_link_vlan_set_id(self._link, int(value))
###################################################################
# flags
@netlink.nlattr('link.vlan.flags', type=str)
@property
def flags(self):
""" VLAN flags
Setting this property will *Not* reset flags to value you supply in
Examples:
link.flags = '+xxx' # add xxx flag
link.flags = 'xxx' # exactly the same
link.flags = '-xxx' # remove xxx flag
link.flags = [ '+xxx', '-yyy' ] # list operation
"""
flags = capi.rtnl_link_vlan_get_flags(self._link)
return capi.rtnl_link_vlan_flags2str(flags, 256)[0].split(',')
###################################################################
# flags
@netlink.nlattr('link.vlan.flags', type=str)
@property
def flags(self):
""" VLAN flags
Setting this property will *Not* reset flags to value you supply in
Examples:
link.flags = '+xxx' # add xxx flag
link.flags = 'xxx' # exactly the same
link.flags = '-xxx' # remove xxx flag
link.flags = [ '+xxx', '-yyy' ] # list operation
"""
flags = capi.rtnl_link_vlan_get_flags(self._link)
return capi.rtnl_link_vlan_flags2str(flags, 256)[0].split(',')
def _set_flag(self, flag):
if flag.startswith('-'):
i = capi.rtnl_link_vlan_str2flags(flag[1:])
capi.rtnl_link_vlan_unset_flags(self._link, i)
elif flag.startswith('+'):
i = capi.rtnl_link_vlan_str2flags(flag[1:])
capi.rtnl_link_vlan_set_flags(self._link, i)
else:
i = capi.rtnl_link_vlan_str2flags(flag)
capi.rtnl_link_vlan_set_flags(self._link, i)
def _set_flag(self, flag):
if flag.startswith('-'):
i = capi.rtnl_link_vlan_str2flags(flag[1:])
capi.rtnl_link_vlan_unset_flags(self._link, i)
elif flag.startswith('+'):
i = capi.rtnl_link_vlan_str2flags(flag[1:])
capi.rtnl_link_vlan_set_flags(self._link, i)
else:
i = capi.rtnl_link_vlan_str2flags(flag)
capi.rtnl_link_vlan_set_flags(self._link, i)
@flags.setter
def flags(self, value):
if type(value) is list:
for flag in value:
self._set_flag(flag)
else:
self._set_flag(value)
@flags.setter
def flags(self, value):
if type(value) is list:
for flag in value:
self._set_flag(flag)
else:
self._set_flag(value)
###################################################################
# TODO:
# - ingress map
# - egress map
###################################################################
# TODO:
# - ingress map
# - egress map
def brief(self):
return 'vlan-id {0}'.format(self.id)
def brief(self):
return 'vlan-id {0}'.format(self.id)
def init(link):
link.vlan = VLANLink(link._link)
return link.vlan
link.vlan = VLANLink(link._link)
return link.vlan

View File

@ -14,150 +14,150 @@ from .. import capi as capi
from .. import tc as tc
class HTBQdisc(object):
def __init__(self, qdisc):
self._qdisc = qdisc
def __init__(self, qdisc):
self._qdisc = qdisc
###################################################################
# default class
@netlink.nlattr('qdisc.htb.default_class', type=int)
@property
def default_class(self):
return tc.Handle(capi.rtnl_htb_get_defcls(self._qdisc._rtnl_qdisc))
###################################################################
# default class
@netlink.nlattr('qdisc.htb.default_class', type=int)
@property
def default_class(self):
return tc.Handle(capi.rtnl_htb_get_defcls(self._qdisc._rtnl_qdisc))
@default_class.setter
def default_class(self, value):
capi.rtnl_htb_set_defcls(self._qdisc._rtnl_qdisc, int(value))
@default_class.setter
def default_class(self, value):
capi.rtnl_htb_set_defcls(self._qdisc._rtnl_qdisc, int(value))
#####################################################################
# r2q
@netlink.nlattr('qdisc.htb.r2q', type=int)
@property
def r2q(self):
return capi.rtnl_htb_get_rate2quantum(self._qdisc._rtnl_qdisc)
#####################################################################
# r2q
@netlink.nlattr('qdisc.htb.r2q', type=int)
@property
def r2q(self):
return capi.rtnl_htb_get_rate2quantum(self._qdisc._rtnl_qdisc)
@r2q.setter
def r2q(self, value):
capi.rtnl_htb_get_rate2quantum(self._qdisc._rtnl_qdisc,
int(value))
@r2q.setter
def r2q(self, value):
capi.rtnl_htb_get_rate2quantum(self._qdisc._rtnl_qdisc,
int(value))
def brief(self):
fmt = util.MyFormatter(self)
def brief(self):
fmt = util.MyFormatter(self)
ret = ' {s|default-class!k} {a|default_class}'
ret = ' {s|default-class!k} {a|default_class}'
if self.r2q:
ret += ' {s|r2q!k} {a|r2q}'
if self.r2q:
ret += ' {s|r2q!k} {a|r2q}'
return fmt.format(ret)
return fmt.format(ret)
class HTBClass(object):
def __init__(self, cl):
self._class = cl
def __init__(self, cl):
self._class = cl
#####################################################################
# rate
@netlink.nlattr('class.htb.rate', type=str)
@property
def rate(self):
rate = capi.rtnl_htb_get_rate(self._class._rtnl_class)
return util.Rate(rate)
#####################################################################
# rate
@netlink.nlattr('class.htb.rate', type=str)
@property
def rate(self):
rate = capi.rtnl_htb_get_rate(self._class._rtnl_class)
return util.Rate(rate)
@rate.setter
def rate(self, value):
capi.rtnl_htb_set_rate(self._class._rtnl_class, int(value))
@rate.setter
def rate(self, value):
capi.rtnl_htb_set_rate(self._class._rtnl_class, int(value))
#####################################################################
# ceil
@netlink.nlattr('class.htb.ceil', type=str)
@property
def ceil(self):
ceil = capi.rtnl_htb_get_ceil(self._class._rtnl_class)
return util.Rate(ceil)
#####################################################################
# ceil
@netlink.nlattr('class.htb.ceil', type=str)
@property
def ceil(self):
ceil = capi.rtnl_htb_get_ceil(self._class._rtnl_class)
return util.Rate(ceil)
@ceil.setter
def ceil(self, value):
capi.rtnl_htb_set_ceil(self._class._rtnl_class, int(value))
@ceil.setter
def ceil(self, value):
capi.rtnl_htb_set_ceil(self._class._rtnl_class, int(value))
#####################################################################
# burst
@netlink.nlattr('class.htb.burst', type=str)
@property
def burst(self):
burst = capi.rtnl_htb_get_rbuffer(self._class._rtnl_class)
return util.Size(burst)
#####################################################################
# burst
@netlink.nlattr('class.htb.burst', type=str)
@property
def burst(self):
burst = capi.rtnl_htb_get_rbuffer(self._class._rtnl_class)
return util.Size(burst)
@burst.setter
def burst(self, value):
capi.rtnl_htb_set_rbuffer(self._class._rtnl_class, int(value))
@burst.setter
def burst(self, value):
capi.rtnl_htb_set_rbuffer(self._class._rtnl_class, int(value))
#####################################################################
# ceil burst
@netlink.nlattr('class.htb.ceil_burst', type=str)
@property
def ceil_burst(self):
burst = capi.rtnl_htb_get_cbuffer(self._class._rtnl_class)
return util.Size(burst)
#####################################################################
# ceil burst
@netlink.nlattr('class.htb.ceil_burst', type=str)
@property
def ceil_burst(self):
burst = capi.rtnl_htb_get_cbuffer(self._class._rtnl_class)
return util.Size(burst)
@ceil_burst.setter
def ceil_burst(self, value):
capi.rtnl_htb_set_cbuffer(self._class._rtnl_class, int(value))
@ceil_burst.setter
def ceil_burst(self, value):
capi.rtnl_htb_set_cbuffer(self._class._rtnl_class, int(value))
#####################################################################
# priority
@netlink.nlattr('class.htb.prio', type=int)
@property
def prio(self):
return capi.rtnl_htb_get_prio(self._class._rtnl_class)
#####################################################################
# priority
@netlink.nlattr('class.htb.prio', type=int)
@property
def prio(self):
return capi.rtnl_htb_get_prio(self._class._rtnl_class)
@prio.setter
def prio(self, value):
capi.rtnl_htb_set_prio(self._class._rtnl_class, int(value))
@prio.setter
def prio(self, value):
capi.rtnl_htb_set_prio(self._class._rtnl_class, int(value))
#####################################################################
# quantum
@netlink.nlattr('class.htb.quantum', type=int)
@property
def quantum(self):
return capi.rtnl_htb_get_quantum(self._class._rtnl_class)
#####################################################################
# quantum
@netlink.nlattr('class.htb.quantum', type=int)
@property
def quantum(self):
return capi.rtnl_htb_get_quantum(self._class._rtnl_class)
@quantum.setter
def quantum(self, value):
capi.rtnl_htb_set_quantum(self._class._rtnl_class, int(value))
@quantum.setter
def quantum(self, value):
capi.rtnl_htb_set_quantum(self._class._rtnl_class, int(value))
#####################################################################
# level
@netlink.nlattr('class.htb.level', type=int)
@property
def level(self):
level = capi.rtnl_htb_get_level(self._class._rtnl_class)
#####################################################################
# level
@netlink.nlattr('class.htb.level', type=int)
@property
def level(self):
level = capi.rtnl_htb_get_level(self._class._rtnl_class)
@level.setter
def level(self, value):
capi.rtnl_htb_set_level(self._class._rtnl_class, int(value))
@level.setter
def level(self, value):
capi.rtnl_htb_set_level(self._class._rtnl_class, int(value))
def brief(self):
fmt = util.MyFormatter(self)
def brief(self):
fmt = util.MyFormatter(self)
ret = ' {t|prio} {t|rate}'
ret = ' {t|prio} {t|rate}'
if self.rate != self.ceil:
ret += ' {s|borrow-up-to!k} {a|ceil}'
if self.rate != self.ceil:
ret += ' {s|borrow-up-to!k} {a|ceil}'
ret += ' {t|burst}'
ret += ' {t|burst}'
return fmt.format(ret)
return fmt.format(ret)
def details(self):
fmt = util.MyFormatter(self)
def details(self):
fmt = util.MyFormatter(self)
return fmt.nl('\t{t|level} {t|quantum}')
return fmt.nl('\t{t|level} {t|quantum}')
def init_qdisc(qdisc):
qdisc.htb = HTBQdisc(qdisc)
return qdisc.htb
qdisc.htb = HTBQdisc(qdisc)
return qdisc.htb
def init_class(cl):
cl.htb = HTBClass(cl)
return cl.htb
cl.htb = HTBClass(cl)
return cl.htb
#extern void rtnl_htb_set_quantum(struct rtnl_class *, uint32_t quantum);

View File

@ -4,12 +4,12 @@
from __future__ import absolute_import
__all__ = [
'TcCache',
'Tc',
'QdiscCache',
'Qdisc',
'TcClassCache',
'TcClass']
'TcCache',
'Tc',
'QdiscCache',
'Qdisc',
'TcClassCache',
'TcClass']
import socket
import sys
@ -47,218 +47,218 @@ STAT_MAX = STAT_OVERLIMITS
###########################################################################
# Handle
class Handle(object):
""" Traffic control handle
""" Traffic control handle
Representation of a traffic control handle which uniquely identifies
each traffic control object in its link namespace.
Representation of a traffic control handle which uniquely identifies
each traffic control object in its link namespace.
handle = tc.Handle('10:20')
handle = tc.handle('root')
print int(handle)
print str(handle)
"""
def __init__(self, val=None):
if type(val) is str:
val = capi.tc_str2handle(val)
elif not val:
val = 0
handle = tc.Handle('10:20')
handle = tc.handle('root')
print int(handle)
print str(handle)
"""
def __init__(self, val=None):
if type(val) is str:
val = capi.tc_str2handle(val)
elif not val:
val = 0
self._val = int(val)
self._val = int(val)
def __cmp__(self, other):
if other is None:
other = 0
def __cmp__(self, other):
if other is None:
other = 0
if isinstance(other, Handle):
return int(self) - int(other)
elif isinstance(other, int):
return int(self) - other
else:
raise TypeError()
if isinstance(other, Handle):
return int(self) - int(other)
elif isinstance(other, int):
return int(self) - other
else:
raise TypeError()
def __int__(self):
return self._val
def __int__(self):
return self._val
def __str__(self):
return capi.rtnl_tc_handle2str(self._val, 64)[0]
def __str__(self):
return capi.rtnl_tc_handle2str(self._val, 64)[0]
def isroot(self):
return self._val == TC_H_ROOT or self._val == TC_H_INGRESS
def isroot(self):
return self._val == TC_H_ROOT or self._val == TC_H_INGRESS
###########################################################################
# TC Cache
class TcCache(netlink.Cache):
"""Cache of traffic control object"""
"""Cache of traffic control object"""
def __getitem__(self, key):
raise NotImplementedError()
def __getitem__(self, key):
raise NotImplementedError()
###########################################################################
# Tc Object
class Tc(netlink.Object):
def __cmp__(self, other):
diff = self.ifindex - other.ifindex
if diff == 0:
diff = int(self.handle) - int(other.handle)
return diff
def __cmp__(self, other):
diff = self.ifindex - other.ifindex
if diff == 0:
diff = int(self.handle) - int(other.handle)
return diff
def _tc_module_lookup(self):
self._module_lookup(self._module_path + self.kind,
'init_' + self._name)
def _tc_module_lookup(self):
self._module_lookup(self._module_path + self.kind,
'init_' + self._name)
@property
def root(self):
"""True if tc object is a root object"""
return self.parent.isroot()
@property
def root(self):
"""True if tc object is a root object"""
return self.parent.isroot()
#####################################################################
# ifindex
@property
def ifindex(self):
"""interface index"""
return capi.rtnl_tc_get_ifindex(self._rtnl_tc)
#####################################################################
# ifindex
@property
def ifindex(self):
"""interface index"""
return capi.rtnl_tc_get_ifindex(self._rtnl_tc)
@ifindex.setter
def ifindex(self, value):
capi.rtnl_tc_set_ifindex(self._rtnl_tc, int(value))
@ifindex.setter
def ifindex(self, value):
capi.rtnl_tc_set_ifindex(self._rtnl_tc, int(value))
#####################################################################
# link
@property
def link(self):
link = capi.rtnl_tc_get_link(self._rtnl_tc)
if not link:
return None
#####################################################################
# link
@property
def link(self):
link = capi.rtnl_tc_get_link(self._rtnl_tc)
if not link:
return None
return Link.Link.from_capi(link)
return Link.Link.from_capi(link)
@link.setter
def link(self, value):
capi.rtnl_tc_set_link(self._rtnl_tc, value._link)
@link.setter
def link(self, value):
capi.rtnl_tc_set_link(self._rtnl_tc, value._link)
#####################################################################
# mtu
@property
def mtu(self):
return capi.rtnl_tc_get_mtu(self._rtnl_tc)
#####################################################################
# mtu
@property
def mtu(self):
return capi.rtnl_tc_get_mtu(self._rtnl_tc)
@mtu.setter
def mtu(self, value):
capi.rtnl_tc_set_mtu(self._rtnl_tc, int(value))
@mtu.setter
def mtu(self, value):
capi.rtnl_tc_set_mtu(self._rtnl_tc, int(value))
#####################################################################
# mpu
@property
def mpu(self):
return capi.rtnl_tc_get_mpu(self._rtnl_tc)
#####################################################################
# mpu
@property
def mpu(self):
return capi.rtnl_tc_get_mpu(self._rtnl_tc)
@mpu.setter
def mpu(self, value):
capi.rtnl_tc_set_mpu(self._rtnl_tc, int(value))
@mpu.setter
def mpu(self, value):
capi.rtnl_tc_set_mpu(self._rtnl_tc, int(value))
#####################################################################
# overhead
@property
def overhead(self):
return capi.rtnl_tc_get_overhead(self._rtnl_tc)
#####################################################################
# overhead
@property
def overhead(self):
return capi.rtnl_tc_get_overhead(self._rtnl_tc)
@overhead.setter
def overhead(self, value):
capi.rtnl_tc_set_overhead(self._rtnl_tc, int(value))
@overhead.setter
def overhead(self, value):
capi.rtnl_tc_set_overhead(self._rtnl_tc, int(value))
#####################################################################
# linktype
@property
def linktype(self):
return capi.rtnl_tc_get_linktype(self._rtnl_tc)
#####################################################################
# linktype
@property
def linktype(self):
return capi.rtnl_tc_get_linktype(self._rtnl_tc)
@linktype.setter
def linktype(self, value):
capi.rtnl_tc_set_linktype(self._rtnl_tc, int(value))
@linktype.setter
def linktype(self, value):
capi.rtnl_tc_set_linktype(self._rtnl_tc, int(value))
#####################################################################
# handle
@property
def handle(self):
return Handle(capi.rtnl_tc_get_handle(self._rtnl_tc))
#####################################################################
# handle
@property
def handle(self):
return Handle(capi.rtnl_tc_get_handle(self._rtnl_tc))
@handle.setter
def handle(self, value):
capi.rtnl_tc_set_handle(self._rtnl_tc, int(value))
@handle.setter
def handle(self, value):
capi.rtnl_tc_set_handle(self._rtnl_tc, int(value))
#####################################################################
# parent
@property
def parent(self):
return Handle(capi.rtnl_tc_get_parent(self._rtnl_tc))
#####################################################################
# parent
@property
def parent(self):
return Handle(capi.rtnl_tc_get_parent(self._rtnl_tc))
@parent.setter
def parent(self, value):
capi.rtnl_tc_set_parent(self._rtnl_tc, int(value))
@parent.setter
def parent(self, value):
capi.rtnl_tc_set_parent(self._rtnl_tc, int(value))
#####################################################################
# kind
@property
def kind(self):
return capi.rtnl_tc_get_kind(self._rtnl_tc)
#####################################################################
# kind
@property
def kind(self):
return capi.rtnl_tc_get_kind(self._rtnl_tc)
@kind.setter
def kind(self, value):
capi.rtnl_tc_set_kind(self._rtnl_tc, value)
self._tc_module_lookup()
@kind.setter
def kind(self, value):
capi.rtnl_tc_set_kind(self._rtnl_tc, value)
self._tc_module_lookup()
def get_stat(self, id):
return capi.rtnl_tc_get_stat(self._rtnl_tc, id)
def get_stat(self, id):
return capi.rtnl_tc_get_stat(self._rtnl_tc, id)
@property
def _dev(self):
buf = util.kw('dev') + ' '
@property
def _dev(self):
buf = util.kw('dev') + ' '
if self.link:
return buf + util.string(self.link.name)
else:
return buf + util.num(self.ifindex)
if self.link:
return buf + util.string(self.link.name)
else:
return buf + util.num(self.ifindex)
def brief(self, title, nodev=False, noparent=False):
ret = title + ' {a|kind} {a|handle}'
def brief(self, title, nodev=False, noparent=False):
ret = title + ' {a|kind} {a|handle}'
if not nodev:
ret += ' {a|_dev}'
if not nodev:
ret += ' {a|_dev}'
if not noparent:
ret += ' {t|parent}'
if not noparent:
ret += ' {t|parent}'
return ret + self._module_brief()
return ret + self._module_brief()
def details(self):
return '{t|mtu} {t|mpu} {t|overhead} {t|linktype}'
def details(self):
return '{t|mtu} {t|mpu} {t|overhead} {t|linktype}'
@property
def packets(self):
return self.get_stat(STAT_PACKETS)
@property
def packets(self):
return self.get_stat(STAT_PACKETS)
@property
def bytes(self):
return self.get_stat(STAT_BYTES)
@property
def bytes(self):
return self.get_stat(STAT_BYTES)
@property
def qlen(self):
return self.get_stat(STAT_QLEN)
@property
def qlen(self):
return self.get_stat(STAT_QLEN)
def stats(self, fmt):
return fmt.nl('{t|packets} {t|bytes} {t|qlen}')
def stats(self, fmt):
return fmt.nl('{t|packets} {t|bytes} {t|qlen}')
###########################################################################
# Queueing discipline cache
class QdiscCache(netlink.Cache):
"""Cache of qdiscs"""
"""Cache of qdiscs"""
def __init__(self, cache=None):
if not cache:
cache = self._alloc_cache_name("route/qdisc")
def __init__(self, cache=None):
if not cache:
cache = self._alloc_cache_name("route/qdisc")
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
# def __getitem__(self, key):
# if type(key) is int:
@ -271,56 +271,56 @@ class QdiscCache(netlink.Cache):
# else:
# return Qdisc._from_capi(capi.qdisc2obj(qdisc))
def _new_object(self, obj):
return Qdisc(obj)
def _new_object(self, obj):
return Qdisc(obj)
def _new_cache(self, cache):
return QdiscCache(cache=cache)
def _new_cache(self, cache):
return QdiscCache(cache=cache)
###########################################################################
# Qdisc Object
class Qdisc(Tc):
"""Queueing discipline"""
"""Queueing discipline"""
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/qdisc", "qdisc", obj)
self._module_path = 'netlink.route.qdisc.'
self._rtnl_qdisc = self._obj2type(self._nl_object)
self._rtnl_tc = capi.obj2tc(self._nl_object)
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/qdisc", "qdisc", obj)
self._module_path = 'netlink.route.qdisc.'
self._rtnl_qdisc = self._obj2type(self._nl_object)
self._rtnl_tc = capi.obj2tc(self._nl_object)
netlink.add_attr('qdisc.handle', fmt=util.handle)
netlink.add_attr('qdisc.parent', fmt=util.handle)
netlink.add_attr('qdisc.kind', fmt=util.bold)
netlink.add_attr('qdisc.handle', fmt=util.handle)
netlink.add_attr('qdisc.parent', fmt=util.handle)
netlink.add_attr('qdisc.kind', fmt=util.bold)
if self.kind:
self._tc_module_lookup()
if self.kind:
self._tc_module_lookup()
@classmethod
def from_capi(cls, obj):
return cls(capi.qdisc2obj(obj))
@classmethod
def from_capi(cls, obj):
return cls(capi.qdisc2obj(obj))
def _obj2type(self, obj):
return capi.obj2qdisc(obj)
def _obj2type(self, obj):
return capi.obj2qdisc(obj)
def _new_instance(self, obj):
if not obj:
raise ValueError()
def _new_instance(self, obj):
if not obj:
raise ValueError()
return Qdisc(obj)
return Qdisc(obj)
@property
def childs(self):
ret = []
@property
def childs(self):
ret = []
if int(self.handle):
ret += get_cls(self.ifindex, parent=self.handle)
if int(self.handle):
ret += get_cls(self.ifindex, parent=self.handle)
if self.root:
ret += get_class(self.ifindex, parent=TC_H_ROOT)
if self.root:
ret += get_class(self.ifindex, parent=TC_H_ROOT)
ret += get_class(self.ifindex, parent=self.handle)
ret += get_class(self.ifindex, parent=self.handle)
return ret
return ret
# #####################################################################
# # add()
@ -350,22 +350,22 @@ class Qdisc(Tc):
# if ret < 0:
# raise netlink.KernelError(ret)
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, nodev=False,
noparent=False, indent=''):
"""Return qdisc as formatted text"""
fmt = util.MyFormatter(self, indent)
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, nodev=False,
noparent=False, indent=''):
"""Return qdisc as formatted text"""
fmt = util.MyFormatter(self, indent)
buf = fmt.format(self.brief('qdisc', nodev, noparent))
buf = fmt.format(self.brief('qdisc', nodev, noparent))
if details:
buf += fmt.nl('\t' + self.details())
if details:
buf += fmt.nl('\t' + self.details())
if stats:
buf += self.stats(fmt)
if stats:
buf += self.stats(fmt)
# if stats:
# l = [['Packets', RX_PACKETS, TX_PACKETS],
@ -423,230 +423,230 @@ class Qdisc(Tc):
# row[2] = self.get_stat(row[2]) if row[2] else ''
# buf += '\t{0:27} {1:>16} {2:>16}\n'.format(*row)
return buf
return buf
###########################################################################
# Traffic class cache
class TcClassCache(netlink.Cache):
"""Cache of traffic classes"""
"""Cache of traffic classes"""
def __init__(self, ifindex, cache=None):
if not cache:
cache = self._alloc_cache_name("route/class")
def __init__(self, ifindex, cache=None):
if not cache:
cache = self._alloc_cache_name("route/class")
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._set_arg1(ifindex)
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._set_arg1(ifindex)
def _new_object(self, obj):
return TcClass(obj)
def _new_object(self, obj):
return TcClass(obj)
def _new_cache(self, cache):
return TcClassCache(self.arg1, cache=cache)
def _new_cache(self, cache):
return TcClassCache(self.arg1, cache=cache)
###########################################################################
# Traffic Class Object
class TcClass(Tc):
"""Traffic Class"""
"""Traffic Class"""
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/class", "class", obj)
self._module_path = 'netlink.route.qdisc.'
self._rtnl_class = self._obj2type(self._nl_object)
self._rtnl_tc = capi.obj2tc(self._nl_object)
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/class", "class", obj)
self._module_path = 'netlink.route.qdisc.'
self._rtnl_class = self._obj2type(self._nl_object)
self._rtnl_tc = capi.obj2tc(self._nl_object)
netlink.add_attr('class.handle', fmt=util.handle)
netlink.add_attr('class.parent', fmt=util.handle)
netlink.add_attr('class.kind', fmt=util.bold)
netlink.add_attr('class.handle', fmt=util.handle)
netlink.add_attr('class.parent', fmt=util.handle)
netlink.add_attr('class.kind', fmt=util.bold)
if self.kind:
self._tc_module_lookup()
if self.kind:
self._tc_module_lookup()
@classmethod
def from_capi(cls, obj):
return cls(capi.class2obj(obj))
@classmethod
def from_capi(cls, obj):
return cls(capi.class2obj(obj))
def _obj2type(self, obj):
return capi.obj2class(obj)
def _obj2type(self, obj):
return capi.obj2class(obj)
def _new_instance(self, obj):
if not obj:
raise ValueError()
def _new_instance(self, obj):
if not obj:
raise ValueError()
return TcClass(obj)
return TcClass(obj)
@property
def childs(self):
ret = []
@property
def childs(self):
ret = []
# classes can have classifiers, child classes and leaf
# qdiscs
ret += get_cls(self.ifindex, parent=self.handle)
ret += get_class(self.ifindex, parent=self.handle)
ret += get_qdisc(self.ifindex, parent=self.handle)
# classes can have classifiers, child classes and leaf
# qdiscs
ret += get_cls(self.ifindex, parent=self.handle)
ret += get_class(self.ifindex, parent=self.handle)
ret += get_qdisc(self.ifindex, parent=self.handle)
return ret
return ret
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, nodev=False,
noparent=False, indent=''):
"""Return class as formatted text"""
fmt = util.MyFormatter(self, indent)
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, nodev=False,
noparent=False, indent=''):
"""Return class as formatted text"""
fmt = util.MyFormatter(self, indent)
buf = fmt.format(self.brief('class', nodev, noparent))
buf = fmt.format(self.brief('class', nodev, noparent))
if details:
buf += fmt.nl('\t' + self.details())
if details:
buf += fmt.nl('\t' + self.details())
return buf
return buf
###########################################################################
# Classifier Cache
class ClassifierCache(netlink.Cache):
"""Cache of traffic classifiers objects"""
"""Cache of traffic classifiers objects"""
def __init__(self, ifindex, parent, cache=None):
if not cache:
cache = self._alloc_cache_name("route/cls")
def __init__(self, ifindex, parent, cache=None):
if not cache:
cache = self._alloc_cache_name("route/cls")
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._set_arg1(ifindex)
self._set_arg2(int(parent))
self._protocol = netlink.NETLINK_ROUTE
self._nl_cache = cache
self._set_arg1(ifindex)
self._set_arg2(int(parent))
def _new_object(self, obj):
return Classifier(obj)
def _new_object(self, obj):
return Classifier(obj)
def _new_cache(self, cache):
return ClassifierCache(self.arg1, self.arg2, cache=cache)
def _new_cache(self, cache):
return ClassifierCache(self.arg1, self.arg2, cache=cache)
###########################################################################
# Classifier Object
class Classifier(Tc):
"""Classifier"""
"""Classifier"""
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/cls", "cls", obj)
self._module_path = 'netlink.route.cls.'
self._rtnl_cls = self._obj2type(self._nl_object)
self._rtnl_tc = capi.obj2tc(self._nl_object)
def __init__(self, obj=None):
netlink.Object.__init__(self, "route/cls", "cls", obj)
self._module_path = 'netlink.route.cls.'
self._rtnl_cls = self._obj2type(self._nl_object)
self._rtnl_tc = capi.obj2tc(self._nl_object)
netlink.add_attr('cls.handle', fmt=util.handle)
netlink.add_attr('cls.parent', fmt=util.handle)
netlink.add_attr('cls.kind', fmt=util.bold)
netlink.add_attr('cls.handle', fmt=util.handle)
netlink.add_attr('cls.parent', fmt=util.handle)
netlink.add_attr('cls.kind', fmt=util.bold)
@classmethod
def from_capi(cls, obj):
return cls(capi.cls2obj(obj))
@classmethod
def from_capi(cls, obj):
return cls(capi.cls2obj(obj))
def _obj2type(self, obj):
return capi.obj2cls(obj)
def _obj2type(self, obj):
return capi.obj2cls(obj)
def _new_instance(self, obj):
if not obj:
raise ValueError()
def _new_instance(self, obj):
if not obj:
raise ValueError()
return Classifier(obj)
return Classifier(obj)
#####################################################################
# priority
@property
def priority(self):
return capi.rtnl_cls_get_prio(self._rtnl_cls)
#####################################################################
# priority
@property
def priority(self):
return capi.rtnl_cls_get_prio(self._rtnl_cls)
@priority.setter
def priority(self, value):
capi.rtnl_cls_set_prio(self._rtnl_cls, int(value))
@priority.setter
def priority(self, value):
capi.rtnl_cls_set_prio(self._rtnl_cls, int(value))
#####################################################################
# protocol
@property
def protocol(self):
return capi.rtnl_cls_get_protocol(self._rtnl_cls)
#####################################################################
# protocol
@property
def protocol(self):
return capi.rtnl_cls_get_protocol(self._rtnl_cls)
@protocol.setter
def protocol(self, value):
capi.rtnl_cls_set_protocol(self._rtnl_cls, int(value))
@protocol.setter
def protocol(self, value):
capi.rtnl_cls_set_protocol(self._rtnl_cls, int(value))
@property
def childs(self):
return []
@property
def childs(self):
return []
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, nodev=False,
noparent=False, indent=''):
"""Return class as formatted text"""
fmt = util.MyFormatter(self, indent)
###################################################################
#
# format(details=False, stats=False)
#
def format(self, details=False, stats=False, nodev=False,
noparent=False, indent=''):
"""Return class as formatted text"""
fmt = util.MyFormatter(self, indent)
buf = fmt.format(self.brief('classifier', nodev, noparent))
buf += fmt.format(' {t|priority} {t|protocol}')
buf = fmt.format(self.brief('classifier', nodev, noparent))
buf += fmt.format(' {t|priority} {t|protocol}')
if details:
buf += fmt.nl('\t' + self.details())
if details:
buf += fmt.nl('\t' + self.details())
return buf
return buf
_qdisc_cache = QdiscCache()
def get_qdisc(ifindex, handle=None, parent=None):
l = []
l = []
_qdisc_cache.refill()
_qdisc_cache.refill()
for qdisc in _qdisc_cache:
if qdisc.ifindex == ifindex and \
(handle == None or qdisc.handle == handle) and \
(parent == None or qdisc.parent == parent):
l.append(qdisc)
for qdisc in _qdisc_cache:
if qdisc.ifindex == ifindex and \
(handle == None or qdisc.handle == handle) and \
(parent == None or qdisc.parent == parent):
l.append(qdisc)
return l
return l
_class_cache = {}
def get_class(ifindex, parent, handle=None):
l = []
l = []
try:
cache = _class_cache[ifindex]
except KeyError:
cache = TcClassCache(ifindex)
_class_cache[ifindex] = cache
try:
cache = _class_cache[ifindex]
except KeyError:
cache = TcClassCache(ifindex)
_class_cache[ifindex] = cache
cache.refill()
cache.refill()
for cl in cache:
if (parent == None or cl.parent == parent) and \
(handle == None or cl.handle == handle):
l.append(cl)
for cl in cache:
if (parent == None or cl.parent == parent) and \
(handle == None or cl.handle == handle):
l.append(cl)
return l
return l
_cls_cache = {}
def get_cls(ifindex, parent, handle=None):
l = []
l = []
try:
chain = _cls_cache[ifindex]
except KeyError:
_cls_cache[ifindex] = {}
try:
chain = _cls_cache[ifindex]
except KeyError:
_cls_cache[ifindex] = {}
try:
cache = _cls_cache[ifindex][parent]
except KeyError:
cache = ClassifierCache(ifindex, parent)
_cls_cache[ifindex][parent] = cache
try:
cache = _cls_cache[ifindex][parent]
except KeyError:
cache = ClassifierCache(ifindex, parent)
_cls_cache[ifindex][parent] = cache
cache.refill()
cache.refill()
for cls in cache:
if handle == None or cls.handle == handle:
l.append(cls)
for cls in cache:
if handle == None or cls.handle == handle:
l.append(cls)
return l
return l

View File

@ -18,162 +18,162 @@ import types
__version__ = "1.0"
def _color(t, c):
return b'{esc}[{color}m{text}{esc}[0m'.format(esc=b'\x1b', color=c, text=t)
return b'{esc}[{color}m{text}{esc}[0m'.format(esc=b'\x1b', color=c, text=t)
def black(t):
return _color(t, 30)
return _color(t, 30)
def red(t):
return _color(t, 31)
return _color(t, 31)
def green(t):
return _color(t, 32)
return _color(t, 32)
def yellow(t):
return _color(t, 33)
return _color(t, 33)
def blue(t):
return _color(t, 34)
return _color(t, 34)
def magenta(t):
return _color(t, 35)
return _color(t, 35)
def cyan(t):
return _color(t, 36)
return _color(t, 36)
def white(t):
return _color(t, 37)
return _color(t, 37)
def bold(t):
return _color(t, 1)
return _color(t, 1)
def kw(t):
return yellow(t)
return yellow(t)
def num(t):
return str(t)
return str(t)
def string(t):
return t
return t
def addr(t):
return str(t)
return str(t)
def bad(t):
return red(t)
return red(t)
def good(t):
return green(t)
return green(t)
def title(t):
return t
return t
def bool(t):
return str(t)
return str(t)
def handle(t):
return str(t)
return str(t)
class MyFormatter(Formatter):
def __init__(self, obj, indent=''):
self._obj = obj
self._indent = indent
def __init__(self, obj, indent=''):
self._obj = obj
self._indent = indent
def _nlattr(self, key):
value = getattr(self._obj, key)
title = None
def _nlattr(self, key):
value = getattr(self._obj, key)
title = None
if isinstance(value, types.MethodType):
value = value()
if isinstance(value, types.MethodType):
value = value()
try:
d = netlink.attrs[self._obj._name + '.' + key]
try:
d = netlink.attrs[self._obj._name + '.' + key]
if 'fmt' in d:
value = d['fmt'](value)
if 'fmt' in d:
value = d['fmt'](value)
if 'title' in d:
title = d['title']
except KeyError:
pass
except AttributeError:
pass
if 'title' in d:
title = d['title']
except KeyError:
pass
except AttributeError:
pass
return title, str(value)
return title, str(value)
def get_value(self, key, args, kwds):
# Let default get_value() handle ints
if not isinstance(key, str):
return Formatter.get_value(self, key, args, kwds)
def get_value(self, key, args, kwds):
# Let default get_value() handle ints
if not isinstance(key, str):
return Formatter.get_value(self, key, args, kwds)
# HACK, we allow defining strings via fields to allow
# conversions
if key[:2] == 's|':
return key[2:]
# HACK, we allow defining strings via fields to allow
# conversions
if key[:2] == 's|':
return key[2:]
if key[:2] == 't|':
# title mode ("TITLE ATTR")
include_title = True
elif key[:2] == 'a|':
# plain attribute mode ("ATTR")
include_title = False
else:
# No special field, have default get_value() get it
return Formatter.get_value(self, key, args, kwds)
if key[:2] == 't|':
# title mode ("TITLE ATTR")
include_title = True
elif key[:2] == 'a|':
# plain attribute mode ("ATTR")
include_title = False
else:
# No special field, have default get_value() get it
return Formatter.get_value(self, key, args, kwds)
key = key[2:]
(title, value) = self._nlattr(key)
key = key[2:]
(title, value) = self._nlattr(key)
if include_title:
if not title:
title = key # fall back to key as title
value = '{0} {1}'.format(kw(title), value)
if include_title:
if not title:
title = key # fall back to key as title
value = '{0} {1}'.format(kw(title), value)
return value
return value
def convert_field(self, value, conversion):
if conversion == 'r':
return repr(value)
elif conversion == 's':
return str(value)
elif conversion == 'k':
return kw(value)
elif conversion == 'b':
return bold(value)
elif conversion is None:
return value
def convert_field(self, value, conversion):
if conversion == 'r':
return repr(value)
elif conversion == 's':
return str(value)
elif conversion == 'k':
return kw(value)
elif conversion == 'b':
return bold(value)
elif conversion is None:
return value
raise ValueError("Unknown converion specifier {0!s}".format(conversion))
raise ValueError("Unknown converion specifier {0!s}".format(conversion))
def nl(self, format_string=''):
return '\n' + self._indent + self.format(format_string)
def nl(self, format_string=''):
return '\n' + self._indent + self.format(format_string)
NL_BYTE_RATE = 0
NL_BIT_RATE = 1
class Rate(object):
def __init__(self, rate, mode=NL_BYTE_RATE):
self._rate = rate
self._mode = mode
def __init__(self, rate, mode=NL_BYTE_RATE):
self._rate = rate
self._mode = mode
def __str__(self):
return capi.nl_rate2str(self._rate, self._mode, 32)[1]
def __str__(self):
return capi.nl_rate2str(self._rate, self._mode, 32)[1]
def __int__(self):
return self._rate
def __int__(self):
return self._rate
def __cmp__(self, other):
return int(self) - int(other)
def __cmp__(self, other):
return int(self) - int(other)
class Size(object):
def __init__(self, size):
self._size = size
def __init__(self, size):
self._size = size
def __str__(self):
return capi.nl_size2str(self._size, 32)[0]
def __str__(self):
return capi.nl_size2str(self._size, 32)[0]
def __int__(self):
return self._size
def __int__(self):
return self._size
def __cmp__(self, other):
return int(self) - int(other)
def __cmp__(self, other):
return int(self) - int(other)