Merge pull request #20 from FreifunkBremen/RRD-module

RRD module
rewrite
Nils Schneider 2014-02-21 15:06:04 +01:00
commit d6bd31f8d0
4 changed files with 446 additions and 93 deletions

35
GlobalRRD.py Normal file
View File

@ -0,0 +1,35 @@
import os
import subprocess
from RRD import RRD, DS, RRA
class GlobalRRD(RRD):
ds_list = [
# Number of nodes available
DS('nodes', 'GAUGE', 120, 0, float('NaN')),
# Number of client available
DS('clients', 'GAUGE', 120, 0, float('NaN')),
]
rra_list = [
RRA('AVERAGE', 0.5, 1, 120), # 2 hours of 1 minute samples
RRA('AVERAGE', 0.5, 60, 744), # 31 days of 1 hour samples
RRA('AVERAGE', 0.5, 1440, 1780),# ~5 years of 1 day samples
]
def __init__(self, directory):
super().__init__(os.path.join(directory, "nodes.rrd"))
self.ensureSanity(self.ds_list, self.rra_list, step=60)
def update(self, nodeCount, clientCount):
super().update({'nodes': nodeCount, 'clients': clientCount})
def graph(self, filename, timeframe):
args = ["rrdtool", 'graph', filename,
'-s', '-' + timeframe,
'-w', '800',
'-h' '400',
'DEF:nodes=' + self.filename + ':nodes:AVERAGE',
'LINE1:nodes#F00:nodes\\l',
'DEF:clients=' + self.filename + ':clients:AVERAGE',
'LINE2:clients#00F:clients',
]
subprocess.check_output(args)

54
NodeRRD.py Normal file
View File

@ -0,0 +1,54 @@
import os
import subprocess
from node import Node
from RRD import RRD, DS, RRA
class NodeRRD(RRD):
ds_list = [
DS('upstate', 'GAUGE', 120, 0, 1),
DS('clients', 'GAUGE', 120, 0, float('NaN')),
]
rra_list = [
RRA('AVERAGE', 0.5, 1, 120), # 2 hours of 1 minute samples
RRA('AVERAGE', 0.5, 5, 1440), # 5 days of 5 minute samples
RRA('AVERAGE', 0.5, 60, 720), # 30 days of 1 hour samples
RRA('AVERAGE', 0.5, 720, 730), # 1 year of 12 hour samples
]
def __init__(self, filename, node = None):
"""
Create a new RRD for a given node.
If the RRD isn't supposed to be updated, the node can be omitted.
"""
self.node = node
super().__init__(filename)
self.ensureSanity(self.ds_list, self.rra_list, step=60)
@property
def imagename(self):
return os.path.basename(self.filename).rsplit('.', 2)[0] + ".png"
def update(self):
super().update({'upstate': 1, 'clients': self.node.clients})
def graph(self, directory, timeframe):
"""
Create a graph in the given directory. The file will be named
basename.png if the RRD file is named basename.rrd
"""
args = ['rrdtool','graph', os.path.join(directory, self.imagename),
'-s', '-' + timeframe ,
'-w', '800',
'-h', '400',
'-l', '0',
'-y', '1:1',
'DEF:clients=' + self.filename + ':clients:AVERAGE',
'VDEF:maxc=clients,MAXIMUM',
'CDEF:c=0,clients,ADDNAN',
'CDEF:d=clients,UN,maxc,UN,1,maxc,IF,*',
'AREA:c#0F0:up\\l',
'AREA:d#F00:down\\l',
'LINE1:c#00F:clients connected\\l',
]
subprocess.check_output(args)

343
RRD.py Normal file
View File

@ -0,0 +1,343 @@
import subprocess
import re
import io
import os
from tempfile import TemporaryFile
from operator import xor, eq
from functools import reduce
from itertools import starmap
import math
class RRDIncompatibleException(Exception):
"""
Is raised when an RRD doesn't have the desired definition and cannot be
upgraded to it.
"""
pass
class RRDOutdatedException(Exception):
"""
Is raised when an RRD doesn't have the desired definition, but can be
upgraded to it.
"""
pass
if not hasattr(__builtins__, "FileNotFoundError"):
class FileNotFoundError(Exception):
pass
class RRD:
"""
An RRD is a Round Robin Database, a database which forgets old data and
aggregates multiple records into new ones.
It contains multiple Data Sources (DS) which can be thought of as columns,
and Round Robin Archives (RRA) which can be thought of as tables with the
DS as columns and time-dependant rows.
"""
# rra[2].cdp_prep[0].value = 1,8583033333e+03
_info_regex = re.compile("""
(?P<section>[a-z_]+)
\[ (?P<key>[a-zA-Z0-9_]+) \]
\.
|
(?P<name>[a-z_]+)
\s*=\s*
"? (?P<value>.*?) "?
$""", re.X)
_cached_info = None
def _exec_rrdtool(self, cmd, *args, **kwargs):
pargs = ["rrdtool", cmd, self.filename]
for k,v in kwargs.items():
pargs.extend(["--" + k, str(v)])
pargs.extend(args)
subprocess.check_output(pargs)
def __init__(self, filename):
self.filename = filename
def ensureSanity(self, ds_list, rra_list, **kwargs):
"""
Create or upgrade the RRD file if necessary to contain all DS in
ds_list. If it needs to be created, the RRAs in rra_list and any kwargs
will be used for creation. Note that RRAs and options of an existing
database are NOT modified!
"""
try:
self.checkSanity(ds_list)
except FileNotFoundError:
self.create(ds_list, rra_list, **kwargs)
except RRDOutdatedException:
self.upgrade(ds_list)
def checkSanity(self, ds_list=()):
"""
Check if the RRD file exists and contains (at least) the DS listed in
ds_list.
"""
if not os.path.exists(self.filename):
raise FileNotFoundError(self.filename)
info = self.info()
if set(ds_list) - set(info['ds'].values()) != set():
if set((ds.name, ds.type) for ds in ds_list) \
- set((ds.name, ds.type) for ds in info['ds'].values()) != set():
raise RRDIncompatibleException()
else:
raise RRDOutdatedException()
def upgrade(self, dss):
"""
Upgrade the DS definitions (!) of this RRD.
(To update its values, use update())
The list dss contains DSS objects to be updated or added. The
parameters of a DS can be changed, but not its type. New DS are always
added at the end in the order of their appearance in the list.
This is done internally via an rrdtool dump -> rrdtool restore and
modifying the dump on the fly.
"""
info = self.info()
new_ds = list(info['ds'].values())
new_ds.sort(key=lambda ds: ds.index)
for ds in dss:
if ds.name in info['ds']:
old_ds = info['ds'][ds.name]
if info['ds'][ds.name].type != ds.type:
raise RuntimeError('Cannot convert existing DS "%s" from type "%s" to "%s"' %
(ds.name, old_ds.type, ds.type))
ds.index = old_ds.index
new_ds[ds.index] = ds
else:
ds.index = len(new_ds)
new_ds.append(ds)
added_ds_num = len(new_ds) - len(info['ds'])
dump = subprocess.Popen(
["rrdtool", "dump", self.filename],
stdout=subprocess.PIPE
)
restore = subprocess.Popen(
["rrdtool", "restore", "-", self.filename + ".new"],
stdin=subprocess.PIPE
)
echo = True
ds_definitions = True
for line in dump.stdout:
if ds_definitions and b'<ds>' in line:
echo = False
if b'<!-- Round Robin Archives -->' in line:
ds_definitions = False
for ds in new_ds:
restore.stdin.write(bytes("""
<ds>
<name> %s </name>
<type> %s </type>
<minimal_heartbeat>%i</minimal_heartbeat>
<min>%s</min>
<max>%s</max>
<!-- PDP Status -->
<last_ds>%s</last_ds>
<value>%s</value>
<unknown_sec> %i </unknown_sec>
</ds>
""" % (
ds.name,
ds.type,
ds.args[0],
ds.args[1],
ds.args[2],
ds.last_ds,
ds.value,
ds.unknown_sec)
, "utf-8"))
if b'</cdp_prep>' in line:
restore.stdin.write(added_ds_num*b"""
<ds>
<primary_value> NaN </primary_value>
<secondary_value> NaN </secondary_value>
<value> NaN </value>
<unknown_datapoints> 0 </unknown_datapoints>
</ds>
""")
# echoing of input line
if echo:
restore.stdin.write(
line.replace(
b'</row>',
(added_ds_num*b'<v>NaN</v>')+b'</row>'
)
)
if ds_definitions and b'</ds>' in line:
echo = True
dump.stdout.close()
restore.stdin.close()
try:
dump.wait(1)
except subprocess.TimeoutExpired:
dump.kill()
try:
restore.wait(2)
except subprocess.TimeoutExpired:
dump.kill()
raise RuntimeError("rrdtool restore process killed")
os.rename(self.filename + ".new", self.filename)
self._cached_info = None
def create(self, ds_list, rra_list, **kwargs):
"""
Create a new RRD file with the specified list of RRAs and DSs.
Any kwargs are passed as --key=value to rrdtool create.
"""
self._exec_rrdtool(
"create",
*map(str, rra_list + ds_list),
**kwargs
)
self._cached_info = None
def update(self, V):
"""
Update the RRD with new values V.
V can be either list or dict:
* If it is a dict, its keys must be DS names in the RRD and it is
ensured that the correct DS are updated with the correct values, by
passing a "template" to rrdtool update (see man rrdupdate).
* If it is a list, no template is generated and the order of the
values in V must be the same as that of the DS in the RRD.
"""
try:
args = ['N:' + ':'.join(map(str, V.values()))]
kwargs = {'template': ':'.join(V.keys())}
except AttributeError:
args = ['N:' + ':'.join(map(str, V))]
kwargs = {}
self._exec_rrdtool("update", *args, **kwargs)
self._cached_info = None
def info(self):
"""
Return a dictionary with information about the RRD.
See `man rrdinfo` for more details.
"""
if self._cached_info:
return self._cached_info
env = os.environ.copy()
env["LC_ALL"] = "C"
proc = subprocess.Popen(
["rrdtool", "info", self.filename],
stdout=subprocess.PIPE,
env=env
)
out, err = proc.communicate()
out = out.decode()
info = {}
for line in out.splitlines():
base = info
for match in self._info_regex.finditer(line):
section, key, name, value = match.group("section", "key", "name", "value")
if section and key:
try:
key = int(key)
except ValueError:
pass
if section not in base:
base[section] = {}
if key not in base[section]:
base[section][key] = {}
base = base[section][key]
if name and value:
try:
base[name] = int(value)
except ValueError:
try:
base[name] = float(value)
except:
base[name] = value
dss = {}
for name, ds in info['ds'].items():
ds_obj = DS(name, ds['type'], ds['minimal_heartbeat'], ds['min'], ds['max'])
ds_obj.index = ds['index']
ds_obj.last_ds = ds['last_ds']
ds_obj.value = ds['value']
ds_obj.unknown_sec = ds['unknown_sec']
dss[name] = ds_obj
info['ds'] = dss
rras = []
for rra in info['rra'].values():
rras.append(RRA(rra['cf'], rra['xff'], rra['pdp_per_row'], rra['rows']))
info['rra'] = rras
self._cached_info = info
return info
class DS:
"""
DS stands for Data Source and represents one line of data points in a Round
Robin Database (RRD).
"""
name = None
type = None
args = []
index = -1
last_ds = 'U'
value = 0
unknown_sec = 0
def __init__(self, name, dst, *args):
self.name = name
self.type = dst
self.args = args
def __str__(self):
return "DS:%s:%s:%s" % (
self.name,
self.type,
":".join(map(str, self._nan_to_U_args()))
)
def __repr__(self):
return "%s(%r, %r, %s)" % (
self.__class__.__name__,
self.name,
self.type,
", ".join(map(repr, self.args))
)
def __eq__(self, other):
return all(starmap(eq, zip(self._compare_keys(), other._compare_keys())))
def __hash__(self):
return reduce(xor, map(hash, self._compare_keys()))
def _nan_to_U_args(self):
return tuple(
'U' if type(arg) is float and math.isnan(arg)
else arg
for arg in self.args
)
def _compare_keys(self):
return (self.name, self.type, self._nan_to_U_args())
class RRA:
def __init__(self, cf, *args):
self.cf = cf
self.args = args
def __str__(self):
return "RRA:%s:%s" % (self.cf, ":".join(map(str, self.args)))
def __repr__(self):
return "%s(%r, %s)" % (
self.__class__.__name__,
self.cf,
", ".join(map(repr, self.args))
)

107
rrd.py
View File

@ -2,6 +2,8 @@
import subprocess import subprocess
import time import time
import os import os
from GlobalRRD import GlobalRRD
from NodeRRD import NodeRRD
class rrd: class rrd:
def __init__( self def __init__( self
@ -11,7 +13,7 @@ class rrd:
, displayTimeNode = "1d" , displayTimeNode = "1d"
): ):
self.dbPath = databaseDirectory self.dbPath = databaseDirectory
self.globalDbFile = databaseDirectory + "/nodes.rrd" self.globalDb = GlobalRRD(self.dbPath)
self.imagePath = imagePath self.imagePath = imagePath
self.displayTimeGlobal = displayTimeGlobal self.displayTimeGlobal = displayTimeGlobal
self.displayTimeNode = displayTimeNode self.displayTimeNode = displayTimeNode
@ -24,90 +26,6 @@ class rrd:
except: except:
os.mkdir(self.imagePath) os.mkdir(self.imagePath)
def checkAndCreateIfNeededGlobalDatabase(self):
""" Creates the global database file iff it did not exist.
"""
if not os.path.exists(self.globalDbFile):
# Create Database with rrdtool
args = ["rrdtool",'create', self.globalDbFile
,'--start', str(round(self.currentTimeInt - 60))
,'--step' , '60'
# Number of nodes available
,'DS:nodes:GAUGE:120:0:U'
# Number of client available
,'DS:clients:GAUGE:120:0:U'
,'RRA:AVERAGE:0.5:1:120'
,'RRA:AVERAGE:0.5:60:744'
,'RRA:AVERAGE:0.5:1440:1780'
]
subprocess.call(args)
def updateGlobalDatabase(self,nodeCount,clientCount):
""" Adds a new (#Nodes,#Clients) entry to the global database.
"""
# Update Global RRDatabase
args = ["rrdtool",'updatev', self.globalDbFile
# #Nodes #Clients
, self.currentTime + ":"+str(nodeCount)+":"+str(clientCount)
]
subprocess.check_output(args)
def createGlobalGraph(self):
nodeGraph = self.imagePath + "/" + "globalGraph.png"
args = ["rrdtool", 'graph', nodeGraph, '-s', '-' + self.displayTimeGlobal, '-w', '800', '-h' '400'
,'DEF:nodes=' + self.globalDbFile + ':nodes:AVERAGE', 'LINE1:nodes#F00:nodes\\l'
,'DEF:clients=' + self.globalDbFile + ':clients:AVERAGE','LINE2:clients#00F:clients'
]
subprocess.check_output(args)
def nodeMACToRRDFile(self,nodeMAC):
return self.dbPath + "/" + str(nodeMAC).replace(":","") + ".rrd"
def nodeMACToPNGFile(self,nodeMAC):
return self.imagePath + "/" + str(nodeMAC).replace(":","") + ".png"
def checkAndCreateIfNeededNodeDatabase(self,nodePrimaryMAC):
# TODO check for bad nodeNames
nodeFile = self.nodeMACToRRDFile(nodePrimaryMAC);
if not os.path.exists(nodeFile):
# TODO Skalen anpassen
args = ["rrdtool",'create',nodeFile
,'--start',str(round(self.currentTimeInt - 60))
,'--step' , '60'
,'DS:upstate:GAUGE:120:0:1'
,'DS:clients:GAUGE:120:0:200'
,'RRA:AVERAGE:0.5:1:120'
,'RRA:AVERAGE:0.5:5:1440'
,'RRA:AVERAGE:0.5:60:720'
,'RRA:AVERAGE:0.5:720:730'
]
subprocess.check_output(args)
# Call only if node is up
def updateNodeDatabase(self,nodePrimaryMAC,clientCount):
nodeFile = self.nodeMACToRRDFile(nodePrimaryMAC)
# Update Global RRDatabase
args = ["rrdtool",'updatev', nodeFile
# #Upstate #Clients
, self.currentTime + ":"+str(1)+":"+str(clientCount)
]
subprocess.check_output(args)
def createNodeGraph(self,nodePrimaryMAC):
nodeGraph = self.nodeMACToPNGFile(nodePrimaryMAC)
nodeFile = self.nodeMACToRRDFile(nodePrimaryMAC)
args = ['rrdtool','graph', nodeGraph, '-s', '-' + self.displayTimeNode , '-w', '800', '-h', '400', '-l', '0', '-y', '1:1',
'DEF:clients=' + nodeFile + ':clients:AVERAGE',
'VDEF:maxc=clients,MAXIMUM',
'CDEF:c=0,clients,ADDNAN',
'CDEF:d=clients,UN,maxc,UN,1,maxc,IF,*',
'AREA:c#0F0:up\\l',
'AREA:d#F00:down\\l',
'LINE1:c#00F:clients connected\\l',
]
subprocess.check_output(args)
def update_database(self,db): def update_database(self,db):
nodes = {} nodes = {}
clientCount = 0 clientCount = 0
@ -128,17 +46,19 @@ class rrd:
elif target in nodes and not source in nodes: elif target in nodes and not source in nodes:
nodes[target].clients += 1 nodes[target].clients += 1
self.checkAndCreateIfNeededGlobalDatabase() self.globalDb.update(len(nodes), clientCount)
self.updateGlobalDatabase(len(nodes),clientCount) for node in nodes.values():
for mac in nodes: rrd = NodeRRD(
self.checkAndCreateIfNeededNodeDatabase(mac) os.path.join(self.dbPath, str(node.id).replace(':', '') + '.rrd'),
self.updateNodeDatabase(mac,nodes[mac].clients) node
)
rrd.update()
def update_images(self): def update_images(self):
""" Creates a image for every rrd file in the database directory. """ Creates an image for every rrd file in the database directory.
""" """
self.createGlobalGraph() self.globalDb.graph(os.path.join(self.imagePath, "globalGraph.png"), self.displayTimeGlobal)
nodeDbFiles = os.listdir(self.dbPath) nodeDbFiles = os.listdir(self.dbPath)
@ -148,4 +68,5 @@ class rrd:
nodeName = os.path.basename(fileName).split('.') nodeName = os.path.basename(fileName).split('.')
if nodeName[1] == 'rrd' and not nodeName[0] == "nodes": if nodeName[1] == 'rrd' and not nodeName[0] == "nodes":
self.createNodeGraph(nodeName[0]) rrd = NodeRRD(os.path.join(self.dbPath, fileName))
rrd.graph(self.imagePath, self.displayTimeNode)