From: Adrian Perez Date: Fri, 27 Mar 2009 21:23:51 +0000 (+0100) Subject: contrib/collectd-network.py: Add pure-Python implementation of collectd's network... X-Git-Tag: collectd-4.5.4~26 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=c25d0c6c95742307ab487b12e1f9afd8e6de901e;hp=a34e8aa2adda392a8ee11d974fc72592be308607;p=collectd.git contrib/collectd-network.py: Add pure-Python implementation of collectd's network protocol. Announcement can be found here: --- diff --git a/contrib/collectd-network.py b/contrib/collectd-network.py new file mode 100644 index 00000000..445b1838 --- /dev/null +++ b/contrib/collectd-network.py @@ -0,0 +1,318 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# vim: fileencoding=utf-8 +# +# Copyright © 2009 Adrian Perez +# +# Distributed under terms of the GPLv2 license. + +""" +Collectd network protocol implementation. +""" + +import socket +import struct + +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + +from datetime import datetime +from copy import deepcopy + + +DEFAULT_PORT = 25826 +"""Default port""" + +DEFAULT_IPv4_GROUP = "239.192.74.66" +"""Default IPv4 multicast group""" + +DEFAULT_IPv6_GROUP = "ff18::efc0:4a42" +"""Default IPv6 multicast group""" + + + +# Message kinds +TYPE_HOST = 0x0000 +TYPE_TIME = 0x0001 +TYPE_PLUGIN = 0x0002 +TYPE_PLUGIN_INSTANCE = 0x0003 +TYPE_TYPE = 0x0004 +TYPE_TYPE_INSTANCE = 0x0005 +TYPE_VALUES = 0x0006 +TYPE_INTERVAL = 0x0007 + +# For notifications +TYPE_MESSAGE = 0x0100 +TYPE_SEVERITY = 0x0101 + +# DS kinds +DS_TYPE_COUNTER = 0 +DS_TYPE_GAUGE = 1 + + +header = struct.Struct("!2H") +number = struct.Struct("!Q") +short = struct.Struct("!H") +double = struct.Struct(" blen - off: + raise ValueError("Packet longer than amount of data in buffer") + + if ptype not in _decoders: + raise ValueError("Message type %i not recognized" % ptype) + + yield ptype, _decoders[ptype](ptype, plen, buf[off:]) + off += plen + + + + + +class Data(object): + time = 0 + host = None + plugin = None + plugininstance = None + type = None + typeinstance = None + + def __init__(self, **kw): + [setattr(self, k, v) for k, v in kw.iteritems()] + + @property + def datetime(self): + return datetime.fromtimestamp(self.time) + + @property + def source(self): + buf = StringIO() + if self.host: + buf.write(self.host) + if self.plugin: + buf.write("/") + buf.write(self.plugin) + if self.plugininstance: + buf.write("/") + buf.write(self.plugininstance) + if self.type: + buf.write("/") + buf.write(self.type) + if self.typeinstance: + buf.write("/") + buf.write(self.typeinstance) + return buf.getvalue() + + def __str__(self): + return "[%i] %s" % (self.time, self.source) + + + +class Notification(Data): + FAILURE = 1 + WARNING = 2 + OKAY = 4 + + SEVERITY = { + FAILURE: "FAILURE", + WARNING: "WARNING", + OKAY : "OKAY", + } + + __severity = 0 + message = "" + + def __set_severity(self, value): + if value in (self.FAILURE, self.WARNING, self.OKAY): + self.__severity = value + + severity = property(lambda self: self.__severity, __set_severity) + + @property + def severitystring(self): + return self.SEVERITY.get(self.severity, "UNKNOWN") + + def __str__(self): + return "%s [%s] %s" % ( + super(Notification, self).__str__(), + self.severitystring, + self.message) + + + +class Values(Data, list): + def __str__(self): + return "%s %s" % (Data.__str__(self), list.__str__(self)) + + + +def interpret_opcodes(iterable): + vl = Values() + nt = Notification() + + for kind, data in iterable: + if kind == TYPE_TIME: + vl.time = nt.time = data + elif kind == TYPE_INTERVAL: + vl.interval = data + elif kind == TYPE_HOST: + vl.host = nt.host = data + elif kind == TYPE_PLUGIN: + vl.plugin = nt.plugin = data + elif kind == TYPE_PLUGIN_INSTANCE: + vl.plugininstance = nt.plugininstance = data + elif kind == TYPE_TYPE: + vl.type = nt.type = data + elif kind == TYPE_TYPE_INSTANCE: + vl.typeinstance = nt.typeinstance = data + elif kind == TYPE_SEVERITY: + nt.severity = data + elif kind == TYPE_MESSAGE: + nt.message = data + yield deepcopy(nt) + elif kind == TYPE_VALUES: + vl[:] = data + yield deepcopy(vl) + + + +class Reader(object): + """Network reader for collectd data. + + Listens on the network in a given address, which can be a multicast + group address, and handles reading data when it arrives. + """ + addr = None + host = None + port = DEFAULT_PORT + + BUFFER_SIZE = 1024 + + + def __init__(self, host=None, port=DEFAULT_PORT, multicast=False): + if host is None: + multicast = True + host = DEFAULT_IPv4_GROUP + + self.host, self.port = host, port + self.ipv6 = ":" in self.host + + family, socktype, proto, canonname, sockaddr = socket.getaddrinfo( + None if multicast else self.host, self.port, + socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC, + socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0] + + self._sock = socket.socket(family, socktype, proto) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._sock.bind(sockaddr) + + if multicast: + if hasattr(socket, "SO_REUSEPORT"): + self._sock.setsockopt( + socket.SOL_SOCKET, + socket.SO_REUSEPORT, 1) + + val = None + if family == socket.AF_INET: + assert "." in self.host + val = struct.pack("4sl", + socket.inet_aton(self.host), socket.INADDR_ANY) + elif family == socket.AF_INET6: + raise NotImplementedError("IPv6 support not ready yet") + else: + raise ValueError("Unsupported network address family") + + self._sock.setsockopt( + socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP, + socket.IP_ADD_MEMBERSHIP, val) + self._sock.setsockopt( + socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP, + socket.IP_MULTICAST_LOOP, 0) + + + def receive(self): + """Receives a single raw collect network packet. + """ + return self._sock.recv(self.BUFFER_SIZE) + + + def decode(self, buf=None): + """Decodes a given buffer or the next received packet. + """ + if buf is None: + buf = self.receive() + return decode_network_packet(buf) + + + def interpret(self, iterable=None): + """Interprets a sequence + """ + if iterable is None: + iterable = self.decode() + if isinstance(iterable, basestring): + iterable = self.decode(iterable) + return interpret_opcodes(iterable) + +