Merge branch 'collectd-4.5' into collectd-4.6
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 11 Apr 2009 07:34:32 +0000 (09:34 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 11 Apr 2009 07:34:32 +0000 (09:34 +0200)
configure.in
contrib/collectd-network.py [new file with mode: 0644]
src/collectd-unixsock.pod
src/collectd.h
src/exec.c
src/network.c
src/plugin.c

index 3514572..2344cdb 100644 (file)
@@ -1802,8 +1802,9 @@ if test "x$with_libperl" = "xyes" \
 then
   SAVE_CFLAGS=$CFLAGS
   SAVE_LDFLAGS=$LDFLAGS
-  PERL_CFLAGS=`$perl_interpreter -MExtUtils::Embed -e ccopts`
-  PERL_LDFLAGS=`$perl_interpreter -MExtUtils::Embed -e ldopts`
+dnl ARCHFLAGS="" -> disable multi -arch on OSX (see Config_heavy.pl:fetch_string)
+  PERL_CFLAGS=`ARCHFLAGS="" $perl_interpreter -MExtUtils::Embed -e ccopts`
+  PERL_LDFLAGS=`ARCHFLAGS="" $perl_interpreter -MExtUtils::Embed -e ldopts`
   CFLAGS="$CFLAGS $PERL_CFLAGS"
   LDFLAGS="$LDFLAGS $PERL_LDFLAGS"
 
diff --git a/contrib/collectd-network.py b/contrib/collectd-network.py
new file mode 100644 (file)
index 0000000..445b183
--- /dev/null
@@ -0,0 +1,318 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim: fileencoding=utf-8
+#
+# Copyright © 2009 Adrian Perez <aperez@igalia.com>
+#
+# Distributed under terms of the GPLv2 license.
+
+"""
+Collectd network protocol implementation.
+"""
+
+import socket
+import struct
+
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from StringIO import StringIO
+
+from datetime import datetime
+from copy import deepcopy
+
+
+DEFAULT_PORT = 25826
+"""Default port"""
+
+DEFAULT_IPv4_GROUP = "239.192.74.66"
+"""Default IPv4 multicast group"""
+
+DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
+"""Default IPv6 multicast group"""
+
+
+
+# Message kinds
+TYPE_HOST            = 0x0000
+TYPE_TIME            = 0x0001
+TYPE_PLUGIN          = 0x0002
+TYPE_PLUGIN_INSTANCE = 0x0003
+TYPE_TYPE            = 0x0004
+TYPE_TYPE_INSTANCE   = 0x0005
+TYPE_VALUES          = 0x0006
+TYPE_INTERVAL        = 0x0007
+
+# For notifications
+TYPE_MESSAGE         = 0x0100
+TYPE_SEVERITY        = 0x0101
+
+# DS kinds
+DS_TYPE_COUNTER      = 0
+DS_TYPE_GAUGE        = 1
+
+
+header = struct.Struct("!2H")
+number = struct.Struct("!Q")
+short  = struct.Struct("!H")
+double = struct.Struct("<d")
+
+
+def decode_network_values(ptype, plen, buf):
+    """Decodes a list of DS values in collectd network format
+    """
+    nvalues = short.unpack_from(buf, header.size)[0]
+    off = header.size + short.size + nvalues
+    valskip = double.size
+
+    # Check whether our expected packet size is the reported one
+    assert ((valskip + 1) * nvalues + short.size + header.size) == plen
+    assert double.size == number.size
+
+    result = []
+    for dstype in map(ord, buf[header.size+short.size:off]):
+        if dstype == DS_TYPE_COUNTER:
+            result.append((dstype, number.unpack_from(buf, off)[0]))
+            off += valskip
+        elif dstype == DS_TYPE_GAUGE:
+            result.append((dstype, double.unpack_from(buf, off)[0]))
+            off += valskip
+        else:
+            raise ValueError("DS type %i unsupported" % dstype)
+
+    return result
+
+
+def decode_network_number(ptype, plen, buf):
+    """Decodes a number (64-bit unsigned) in collectd network format.
+    """
+    return number.unpack_from(buf, header.size)[0]
+
+
+def decode_network_string(msgtype, plen, buf):
+    """Decodes a floating point number (64-bit) in collectd network format.
+    """
+    return buf[header.size:plen-1]
+
+
+# Mapping of message types to decoding functions.
+_decoders = {
+    TYPE_VALUES         : decode_network_values,
+    TYPE_TIME           : decode_network_number,
+    TYPE_INTERVAL       : decode_network_number,
+    TYPE_HOST           : decode_network_string,
+    TYPE_PLUGIN         : decode_network_string,
+    TYPE_PLUGIN_INSTANCE: decode_network_string,
+    TYPE_TYPE           : decode_network_string,
+    TYPE_TYPE_INSTANCE  : decode_network_string,
+    TYPE_MESSAGE        : decode_network_string,
+    TYPE_SEVERITY       : decode_network_number,
+}
+
+
+def decode_network_packet(buf):
+    """Decodes a network packet in collectd format.
+    """
+    off = 0
+    blen = len(buf)
+    while off < blen:
+        ptype, plen = header.unpack_from(buf, off)
+
+        if plen > blen - off:
+            raise ValueError("Packet longer than amount of data in buffer")
+
+        if ptype not in _decoders:
+            raise ValueError("Message type %i not recognized" % ptype)
+
+        yield ptype, _decoders[ptype](ptype, plen, buf[off:])
+        off += plen
+
+
+
+
+
+class Data(object):
+    time = 0
+    host = None
+    plugin = None
+    plugininstance = None
+    type = None
+    typeinstance = None
+
+    def __init__(self, **kw):
+        [setattr(self, k, v) for k, v in kw.iteritems()]
+
+    @property
+    def datetime(self):
+        return datetime.fromtimestamp(self.time)
+
+    @property
+    def source(self):
+        buf = StringIO()
+        if self.host:
+            buf.write(self.host)
+        if self.plugin:
+            buf.write("/")
+            buf.write(self.plugin)
+        if self.plugininstance:
+            buf.write("/")
+            buf.write(self.plugininstance)
+        if self.type:
+            buf.write("/")
+            buf.write(self.type)
+        if self.typeinstance:
+            buf.write("/")
+            buf.write(self.typeinstance)
+        return buf.getvalue()
+
+    def __str__(self):
+        return "[%i] %s" % (self.time, self.source)
+
+
+
+class Notification(Data):
+    FAILURE  = 1
+    WARNING  = 2
+    OKAY     = 4
+
+    SEVERITY = {
+        FAILURE: "FAILURE",
+        WARNING: "WARNING",
+        OKAY   : "OKAY",
+    }
+
+    __severity = 0
+    message  = ""
+
+    def __set_severity(self, value):
+        if value in (self.FAILURE, self.WARNING, self.OKAY):
+            self.__severity = value
+
+    severity = property(lambda self: self.__severity, __set_severity)
+
+    @property
+    def severitystring(self):
+        return self.SEVERITY.get(self.severity, "UNKNOWN")
+
+    def __str__(self):
+        return "%s [%s] %s" % (
+                super(Notification, self).__str__(),
+                self.severitystring,
+                self.message)
+
+
+
+class Values(Data, list):
+    def __str__(self):
+        return "%s %s" % (Data.__str__(self), list.__str__(self))
+
+
+
+def interpret_opcodes(iterable):
+    vl = Values()
+    nt = Notification()
+
+    for kind, data in iterable:
+        if kind == TYPE_TIME:
+            vl.time = nt.time = data
+        elif kind == TYPE_INTERVAL:
+            vl.interval = data
+        elif kind == TYPE_HOST:
+            vl.host = nt.host = data
+        elif kind == TYPE_PLUGIN:
+            vl.plugin = nt.plugin = data
+        elif kind == TYPE_PLUGIN_INSTANCE:
+            vl.plugininstance = nt.plugininstance = data
+        elif kind == TYPE_TYPE:
+            vl.type = nt.type = data
+        elif kind == TYPE_TYPE_INSTANCE:
+            vl.typeinstance = nt.typeinstance = data
+        elif kind == TYPE_SEVERITY:
+            nt.severity = data
+        elif kind == TYPE_MESSAGE:
+            nt.message = data
+            yield deepcopy(nt)
+        elif kind == TYPE_VALUES:
+            vl[:] = data
+            yield deepcopy(vl)
+
+
+
+class Reader(object):
+    """Network reader for collectd data.
+
+    Listens on the network in a given address, which can be a multicast
+    group address, and handles reading data when it arrives.
+    """
+    addr = None
+    host = None
+    port = DEFAULT_PORT
+
+    BUFFER_SIZE = 1024
+
+
+    def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
+        if host is None:
+            multicast = True
+            host = DEFAULT_IPv4_GROUP
+
+        self.host, self.port = host, port
+        self.ipv6 = ":" in self.host
+
+        family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
+                None if multicast else self.host, self.port,
+                socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
+                socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
+
+        self._sock = socket.socket(family, socktype, proto)
+        self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self._sock.bind(sockaddr)
+
+        if multicast:
+            if hasattr(socket, "SO_REUSEPORT"):
+                self._sock.setsockopt(
+                        socket.SOL_SOCKET,
+                        socket.SO_REUSEPORT, 1)
+
+            val = None
+            if family == socket.AF_INET:
+                assert "." in self.host
+                val = struct.pack("4sl",
+                        socket.inet_aton(self.host), socket.INADDR_ANY)
+            elif family == socket.AF_INET6:
+                raise NotImplementedError("IPv6 support not ready yet")
+            else:
+                raise ValueError("Unsupported network address family")
+
+            self._sock.setsockopt(
+                    socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
+                    socket.IP_ADD_MEMBERSHIP, val)
+            self._sock.setsockopt(
+                    socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
+                    socket.IP_MULTICAST_LOOP, 0)
+
+
+    def receive(self):
+        """Receives a single raw collect network packet.
+        """
+        return self._sock.recv(self.BUFFER_SIZE)
+
+
+    def decode(self, buf=None):
+        """Decodes a given buffer or the next received packet.
+        """
+        if buf is None:
+            buf = self.receive()
+        return decode_network_packet(buf)
+
+
+    def interpret(self, iterable=None):
+        """Interprets a sequence
+        """
+        if iterable is None:
+            iterable = self.decode()
+        if isinstance(iterable, basestring):
+            iterable = self.decode(iterable)
+        return interpret_opcodes(iterable)
+
+
index ac4a1b1..83802a1 100644 (file)
@@ -131,10 +131,10 @@ Example:
 Submits a notification to the daemon which will then dispatch it to all plugins
 which have registered for receiving notifications. 
 
-The B<PUTNOTIF> if followed by a list of options which further describe the
-notification. The B<message> option is special in that it will consume the rest
-of the line as its value. The B<message>, B<severity>, and B<time> options are
-mandatory.
+The B<PUTNOTIF> command is followed by a list of options which further describe
+the notification. The B<message> option is special in that it will consume the
+rest of the line as its value. The B<message>, B<severity>, and B<time> options
+are mandatory.
 
 Valid options are:
 
@@ -195,7 +195,7 @@ callback will be flushed.
 
 If the B<identifier> option is given only the specified values will be flushed.
 This is meant to be used by graphing or displaying frontends which want to have
-the lastest values for a specific graph. Again, you can specify the
+the latest values for a specific graph. Again, you can specify the
 B<identifier> option multiple times to flush several values. If this option is
 not specified at all, all values will be flushed.
 
index 277a610..18052ba 100644 (file)
 #  define BYTE_ORDER _BYTE_ORDER
 # elif defined(__BYTE_ORDER)
 #  define BYTE_ORDER __BYTE_ORDER
+# elif defined(__DARWIN_BYTE_ORDER)
+#  define BYTE_ORDER __DARWIN_BYTE_ORDER
 # endif
 #endif
 #ifndef BIG_ENDIAN
 #  define BIG_ENDIAN _BIG_ENDIAN
 # elif defined(__BIG_ENDIAN)
 #  define BIG_ENDIAN __BIG_ENDIAN
+# elif defined(__DARWIN_BIG_ENDIAN)
+#  define BIG_ENDIAN __DARWIN_BIG_ENDIAN
 # endif
 #endif
 #ifndef LITTLE_ENDIAN
 #  define LITTLE_ENDIAN _LITTLE_ENDIAN
 # elif defined(__LITTLE_ENDIAN)
 #  define LITTLE_ENDIAN __LITTLE_ENDIAN
+# elif defined(__DARWIN_LITTLE_ENDIAN)
+#  define LITTLE_ENDIAN __DARWIN_LITTLE_ENDIAN
 # endif
 #endif
 #ifndef BYTE_ORDER
index 973cd52..c2d42ee 100644 (file)
@@ -585,7 +585,17 @@ static void *exec_read_one (void *arg) /* {{{ */
         if (errno == EAGAIN || errno == EINTR)  continue;
         break;
       }
-      else if (len == 0) break;  /* We've reached EOF */
+      else if (len == 0)
+      {
+       /* We've reached EOF */
+       NOTICE ("exec plugin: Program `%s' has closed STDERR.",
+           pl->exec);
+       close (fd_err);
+       FD_CLR (fd_err, &fdset);
+       highest_fd = fd;
+       fd_err = -1;
+       continue;
+      }
 
       pbuffer_err[len] = '\0';
 
@@ -615,6 +625,7 @@ static void *exec_read_one (void *arg) /* {{{ */
     copy = fdset;
   }
 
+  DEBUG ("exec plugin: exec_read_one: Waiting for `%s' to exit.", pl->exec);
   if (waitpid (pl->pid, &status, 0) > 0)
     pl->status = status;
 
@@ -628,7 +639,8 @@ static void *exec_read_one (void *arg) /* {{{ */
   pthread_mutex_unlock (&pl_lock);
 
   close (fd);
-  close (fd_err);
+  if (fd_err >= 0)
+    close (fd_err);
 
   pthread_exit ((void *) 0);
   return (NULL);
index 66f0438..902f270 100644 (file)
@@ -870,11 +870,12 @@ static void free_sockent (sockent_t *se)
  */
 static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai)
 {
+       DEBUG ("network plugin: network_set_ttl: network_config_ttl = %i;",
+                       network_config_ttl);
+
        if ((network_config_ttl < 1) || (network_config_ttl > 255))
                return (-1);
 
-       DEBUG ("ttl = %i", network_config_ttl);
-
        if (ai->ai_family == AF_INET)
        {
                struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr;
index 367c0d1..74565c3 100644 (file)
@@ -376,6 +376,7 @@ int plugin_load (const char *type)
                else if (!S_ISREG (statbuf.st_mode))
                {
                        /* don't follow symlinks */
+                       WARNING ("stat %s: not a regular file", filename);
                        continue;
                }