Merge branch 'pyr/riemann'
authorFlorian Forster <octo@collectd.org>
Mon, 21 Jan 2013 10:10:55 +0000 (11:10 +0100)
committerFlorian Forster <octo@collectd.org>
Mon, 21 Jan 2013 10:10:58 +0000 (11:10 +0100)
Resolves Github issue #221.

22 files changed:
README
configure.in
contrib/collection3/etc/collection.conf
src/Makefile.am
src/aggregation.c
src/apcups.c
src/collectd-tg.pod [new file with mode: 0644]
src/collectd.conf.in
src/collectd.conf.pod
src/configfile.c
src/dbi.c
src/ethstat.c
src/libcollectdclient/network_buffer.c
src/network.c
src/nfs.c
src/plugin.c
src/plugin.h
src/postgresql.c
src/redis.c
src/types.db
src/varnish.c
src/zfs_arc.c

diff --git a/README b/README
index 67cac04..45dcadb 100644 (file)
--- a/README
+++ b/README
@@ -222,6 +222,9 @@ Features
       write your own plugins in Perl and return arbitrary values using this
       API. See collectd-perl(5).
 
+    - pf
+      Query statistics from BSD's packet filter "pf".
+
     - pinba
       Receive and dispatch timing values from Pinba, a profiling extension for
       PHP.
@@ -458,6 +461,10 @@ Features
 
   * Miscellaneous plugins:
 
+    - aggregation
+      Selects multiple value lists based on patterns or regular expressions
+      and creates new aggregated values lists from those.
+
     - threshold
       Checks values against configured thresholds and creates notifications if
       values are out of bounds. See collectd-threshold(5) for details.
index 5598a0b..ea9fc6c 100644 (file)
@@ -270,7 +270,19 @@ if test "x$ac_system" = "xDarwin"
 then
        AC_CHECK_HEADERS(mach/mach_init.h mach/host_priv.h mach/mach_error.h mach/mach_host.h mach/mach_port.h mach/mach_types.h mach/message.h mach/processor_set.h mach/processor.h mach/processor_info.h mach/task.h mach/thread_act.h mach/vm_region.h mach/vm_map.h mach/vm_prot.h mach/vm_statistics.h mach/kern_return.h)
        AC_CHECK_HEADERS(CoreFoundation/CoreFoundation.h IOKit/IOKitLib.h IOKit/IOTypes.h IOKit/ps/IOPSKeys.h IOKit/IOBSD.h IOKit/storage/IOBlockStorageDriver.h)
+       # For the battery plugin
+       AC_CHECK_HEADERS(IOKit/ps/IOPowerSources.h, [], [],
+[
+#if HAVE_IOKIT_IOKITLIB_H
+#  include <IOKit/IOKitLib.h>
+#endif
+#if HAVE_IOKIT_IOTYPES_H
+#  include <IOKit/IOTypes.h>
+#endif
+])
+
 fi
+
 AC_CHECK_HEADERS(sys/sysctl.h, [], [],
 [
 #if HAVE_SYS_TYPES_H
@@ -315,17 +327,6 @@ else
        have_linux_raid_md_u_h="no"
 fi
 
-# For the battery plugin
-AC_CHECK_HEADERS(IOKit/ps/IOPowerSources.h, [], [],
-[
-#if HAVE_IOKIT_IOKITLIB_H
-#  include <IOKit/IOKitLib.h>
-#endif
-#if HAVE_IOKIT_IOTYPES_H
-#  include <IOKit/IOTypes.h>
-#endif
-])
-
 # For the swap module
 have_linux_wireless_h="no"
 if test "x$ac_system" = "xLinux"
index 3bb3d8b..5d1024f 100644 (file)
@@ -2,7 +2,7 @@
 GraphWidth 400
 #UnixSockAddr "/var/run/collectd-unixsock"
 <Type apache_bytes>
-  DataSources count
+  DataSources value
   DSName "count Bytes/s"
   RRDTitle "Apache Traffic"
   RRDVerticalLabel "Bytes/s"
@@ -10,7 +10,7 @@ GraphWidth 400
   Color count 0000ff
 </Type>
 <Type apache_requests>
-  DataSources count
+  DataSources value
   DSName "count Requests/s"
   RRDTitle "Apache Traffic"
   RRDVerticalLabel "Requests/s"
@@ -19,7 +19,7 @@ GraphWidth 400
 </Type>
 <Type apache_scoreboard>
   Module GenericStacked
-  DataSources count
+  DataSources value
   RRDTitle "Apache scoreboard on {hostname}"
   RRDVerticalLabel "Slots"
   RRDFormat "%6.2lf"
@@ -245,14 +245,14 @@ GraphWidth 400
   RRDFormat "%6.1lf"
 </Type>
 <Type conntrack>
-  DataSources conntrack
+  DataSources value
   DSName conntrack Conntrack count
   RRDTitle "nf_conntrack connections on {hostname}"
   RRDVerticalLabel "Count"
   RRDFormat "%4.0lf"
 </Type>
 <Type entropy>
-  DataSources entropy
+  DataSources value
   DSName entropy Entropy bits
   RRDTitle "Available entropy on {hostname}"
   RRDVerticalLabel "Bits"
@@ -267,7 +267,7 @@ GraphWidth 400
   Color value 00b000
 </Type>
 <Type frequency>
-  DataSources frequency
+  DataSources value
   DSName frequency Frequency
   RRDTitle "Frequency ({type_instance})"
   RRDVerticalLabel "Hertz"
@@ -542,7 +542,7 @@ GraphWidth 400
   Scale 8
 </Type>
 <Type percent>
-  DataSources percent
+  DataSources value
   DSName percent Percent
   RRDTitle "Percent ({type_instance})"
   RRDVerticalLabel "Percent"
@@ -550,7 +550,7 @@ GraphWidth 400
   Color percent 0000ff
 </Type>
 <Type ping>
-  DataSources ping
+  DataSources value
   DSName "ping Latency"
   RRDTitle "Network latency ({type_instance})"
   RRDVerticalLabel "Milliseconds"
@@ -700,7 +700,7 @@ GraphWidth 400
   Scale 0.001
 </Type>
 <Type users>
-  DataSources users
+  DataSources value
   DSName users Users
   RRDTitle "Users ({type_instance}) on {hostname}"
   RRDVerticalLabel "Users"
index 8bdf9f8..f31c176 100644 (file)
@@ -119,6 +119,9 @@ collectd_tg_LDADD =
 if BUILD_WITH_LIBSOCKET
 collectd_tg_LDADD += -lsocket
 endif
+if BUILD_WITH_LIBRT
+collectd_tg_LDADD += -lrt
+endif
 if BUILD_AIX
 collectd_tg_LDADD += -lm
 endif
@@ -1380,6 +1383,7 @@ dist_man_MANS = collectd.1 \
                collectd-perl.5 \
                collectd-python.5 \
                collectd-snmp.5 \
+               collectd-tg.1 \
                collectd-threshold.5 \
                collectd-unixsock.5 \
                types.db.5
@@ -1399,6 +1403,7 @@ EXTRA_DIST +=   collectd.conf.pod \
                collectd-python.pod \
                collectd.pod \
                collectd-snmp.pod \
+               collectd-tg.pod \
                collectd-threshold.pod \
                collectd-unixsock.pod \
                postgresql_default.conf \
index 96dc337..0c0f19d 100644 (file)
@@ -367,7 +367,7 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
   vl->values = &v;
   vl->values_len = 1;
 
-  plugin_dispatch_values_secure (vl);
+  plugin_dispatch_values (vl);
 
   vl->values = NULL;
   vl->values_len = 0;
index dd535b1..ee15b14 100644 (file)
@@ -465,7 +465,7 @@ static int apcups_read (void)
        apcups_detail.battv    =   -1.0;
        apcups_detail.loadpct  =   -1.0;
        apcups_detail.bcharge  =   -1.0;
-       apcups_detail.timeleft =   -1.0;
+       apcups_detail.timeleft =    NAN;
        apcups_detail.itemp    = -300.0;
        apcups_detail.linefreq =   -1.0;
 
diff --git a/src/collectd-tg.pod b/src/collectd-tg.pod
new file mode 100644 (file)
index 0000000..5f1b630
--- /dev/null
@@ -0,0 +1,65 @@
+=head1 NAME
+
+collectd-tg - Traffic generator for collectd.
+
+=head1 SYNOPSIS
+
+collectd-tg B<-n> I<num_vl> B<-H> I<num_hosts> B<-p> I<num_plugins> B<-i> I<interval> B<-d> I<dest> B<-D> I<dport>
+
+=head1 DESCRIPTION
+
+B<collectd-tg> generates bogus I<collectd> network traffic. While host, plugin
+and values are generated randomly, the generated traffic tries to mimic "real"
+traffic as closely as possible.
+
+=head1 ARGUMENTS AND OPTIONS
+
+The following options are understood by I<collectd-tg>. The order of the
+arguments generally doesn't matter, as long as no argument is passed more than
+once.
+
+=over 4
+
+=item B<-n> I<num_vl>
+
+Sets the number of unique I<value lists> (VL) to generate. Defaults to 10000.
+
+=item B<-H> I<num_hosts>
+
+Sets the number of unique hosts to simulate. Defaults to 1000.
+
+=item B<-p> I<num_plugins>
+
+Sets the number of unique plugins to simulate. Defaults to 20.
+
+=item B<-i> I<interval>
+
+Sets the interval in which each I<value list> is dispatched. Defaults to 10.0
+seconds.
+
+=item B<-d> I<dest>
+
+Sets the destination to which to send the generated network traffic. Defaults
+to the IPv6 multicast address, C<ff18::efc0:4a42>.
+
+=item B<-D> I<dport>
+
+Sets the destination port or service to which to send the generated network
+traffic. Defaults to I<collectd's> default port, C<25826>.
+
+=item B<-h>
+
+Print usage summary.
+
+=back
+
+=head1 SEE ALSO
+
+L<collectd(1)>,
+L<collectd.conf(5)>
+
+=head1 AUTHOR
+
+Florian Forster E<lt>octoE<nbsp>atE<nbsp>collectd.orgE<gt>
+
+=cut
index 31dce55..9c02ce6 100644 (file)
 #PIDFile     "@localstatedir@/run/@PACKAGE_NAME@.pid"
 #PluginDir   "@libdir@/@PACKAGE_NAME@"
 #TypesDB     "@prefix@/share/@PACKAGE_NAME@/types.db"
+
+#----------------------------------------------------------------------------#
+# Interval at which to query values. This may be overwritten on a per-plugin #
+# base by using the 'Interval' option of the LoadPlugin block:               #
+#   <LoadPlugin foo>                                                         #
+#       Interval 60                                                          #
+#   </LoadPlugin>                                                            #
+#----------------------------------------------------------------------------#
 #Interval     10
+
 #Timeout      2
 #ReadThreads  5
+#WriteThreads 5
 
 ##############################################################################
 # Logging                                                                    #
 #              #SelectDB "custdb0"
 #              Query "num_of_customers"
 #              #Query "..."
+#              #Host "..."
 #      </Database>
 #</Plugin>
 
 #                      ValuesFrom "count"
 #              </Result>
 #      </Query>
+#      <Writer sqlstore>
+#              # See contrib/postgresql/collectd_insert.sql for details
+#              Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);"
+#              StoreRates true
+#      </Writer>
 #      <Database foo>
 #              Host "hostname"
 #              Port "5432"
 #              Query backend # predefined
 #              Query rt36_tickets
 #      </Database>
+#      <Database qux>
+#              Service "collectd_store"
+#              Writer sqlstore
+#              # see collectd.conf(5) for details
+#              CommitInterval 30
+#      </Database>
 #</Plugin>
 
 #<Plugin powerdns>
index 6afd0b0..b7bebf6 100644 (file)
@@ -181,8 +181,14 @@ see L<collectd-threshold(5)> for details.
 
 Number of threads to start for reading plugins. The default value is B<5>, but
 you may want to increase this if you have more than five plugins that take a
-long time to read. Mostly those are plugin that do network-IO. Setting this to
-a value higher than the number of plugins you've loaded is totally useless.
+long time to read. Mostly those are plugins that do network-IO. Setting this to
+a value higher than the number of registered read callbacks is not recommended.
+
+=item B<WriteThreads> I<Num>
+
+Number of threads to start for dispatching value lists to write plugins. The
+default value is B<5>, but you may want to increase this if you have more than
+five plugins that may take relatively long to write to.
 
 =item B<Hostname> I<Name>
 
@@ -502,7 +508,7 @@ mode will be used, i.e. delivery is guaranteed. If set to B<false> (the
 default), the I<transient> delivery mode will be used, i.e. messages may be
 lost due to high load, overflowing queues or similar issues.
 
-=item B<Format> B<Command>|B<JSON> (Publish only)
+=item B<Format> B<Command>|B<JSON>|B<Graphite> (Publish only)
 
 Selects the format in which messages are sent to the broker. If set to
 B<Command> (the default), values are sent as C<PUTVAL> commands which are
@@ -1360,11 +1366,23 @@ documentation for each driver, somewhere at
 L<http://libdbi-drivers.sourceforge.net/>. However, the options "host",
 "username", "password", and "dbname" seem to be deE<nbsp>facto standards.
 
+DBDs can register two types of options: String options and numeric options. The
+plugin will use the C<dbi_conn_set_option> function when the configuration
+provides a string and the C<dbi_conn_require_option_numeric> function when the
+configuration provides a number. So these two lines will actually result in
+different calls being used:
+
+  DriverOption "Port" 1234      # numeric
+  DriverOption "Port" "1234"    # string
+
 Unfortunately, drivers are not too keen to report errors when an unknown option
 is passed to them, so invalid settings here may go unnoticed. This is not the
 plugin's fault, it will report errors if it gets them from the libraryE<nbsp>/
 the driver. If a driver complains about an option, the plugin will dump a
-complete list of all options understood by that driver to the log.
+complete list of all options understood by that driver to the log. There is no
+way to programatically find out if an option expects a string or a numeric
+argument, so you will have to refer to the appropriate DBD's documentation to
+find this out. Sorry.
 
 =item B<SelectDB> I<Database>
 
@@ -1379,6 +1397,11 @@ query needs to be defined I<before> this statement, i.E<nbsp>e. all query
 blocks you want to refer to must be placed above the database block you want to
 refer to them from.
 
+=item B<Host> I<Hostname>
+
+Sets the B<host> field of I<value lists> to I<Hostname> when dispatching
+values. Defaults to the global hostname setting.
+
 =back
 
 =head2 Plugin C<df>
index b16ae47..ac5e8ed 100644 (file)
@@ -108,6 +108,7 @@ static cf_global_option_t cf_global_options[] =
        {"FQDNLookup",  NULL, "true"},
        {"Interval",    NULL, NULL},
        {"ReadThreads", NULL, "5"},
+       {"WriteThreads", NULL, "5"},
        {"Timeout",     NULL, "2"},
        {"PreCacheChain",  NULL, "PreCache"},
        {"PostCacheChain", NULL, "PostCache"}
index caa41ef..80488d8 100644 (file)
--- a/src/dbi.c
+++ b/src/dbi.c
@@ -1,6 +1,6 @@
 /**
  * collectd - src/dbi.c
- * Copyright (C) 2008,2009  Florian octo Forster
+ * Copyright (C) 2008-2013  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -16,7 +16,7 @@
  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
  *
  * Authors:
- *   Florian octo Forster <octo at verplant.org>
+ *   Florian octo Forster <octo at collectd.org>
  **/
 
 #include "collectd.h"
 struct cdbi_driver_option_s /* {{{ */
 {
   char *key;
-  char *value;
+  union
+  {
+    char *string;
+    int numeric;
+  } value;
+  _Bool is_numeric;
 };
 typedef struct cdbi_driver_option_s cdbi_driver_option_t; /* }}} */
 
@@ -43,6 +48,7 @@ struct cdbi_database_s /* {{{ */
   char *select_db;
 
   char *driver;
+  char *host;
   cdbi_driver_option_t *driver_options;
   size_t driver_options_num;
 
@@ -159,7 +165,8 @@ static void cdbi_database_free (cdbi_database_t *db) /* {{{ */
   for (i = 0; i < db->driver_options_num; i++)
   {
     sfree (db->driver_options[i].key);
-    sfree (db->driver_options[i].value);
+    if (!db->driver_options[i].is_numeric)
+      sfree (db->driver_options[i].value.string);
   }
   sfree (db->driver_options);
 
@@ -193,33 +200,6 @@ static void cdbi_database_free (cdbi_database_t *db) /* {{{ */
  * </Plugin>
  */
 
-static int cdbi_config_set_string (char **ret_string, /* {{{ */
-    oconfig_item_t *ci)
-{
-  char *string;
-
-  if ((ci->values_num != 1)
-      || (ci->values[0].type != OCONFIG_TYPE_STRING))
-  {
-    WARNING ("dbi plugin: The `%s' config option "
-        "needs exactly one string argument.", ci->key);
-    return (-1);
-  }
-
-  string = strdup (ci->values[0].value.string);
-  if (string == NULL)
-  {
-    ERROR ("dbi plugin: strdup failed.");
-    return (-1);
-  }
-
-  if (*ret_string != NULL)
-    free (*ret_string);
-  *ret_string = string;
-
-  return (0);
-} /* }}} int cdbi_config_set_string */
-
 static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ */
     oconfig_item_t *ci)
 {
@@ -227,10 +207,11 @@ static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ *
 
   if ((ci->values_num != 2)
       || (ci->values[0].type != OCONFIG_TYPE_STRING)
-      || (ci->values[1].type != OCONFIG_TYPE_STRING))
+      || ((ci->values[1].type != OCONFIG_TYPE_STRING)
+        && (ci->values[1].type != OCONFIG_TYPE_NUMBER)))
   {
     WARNING ("dbi plugin: The `DriverOption' config option "
-        "needs exactly two string arguments.");
+        "needs exactly two arguments.");
     return (-1);
   }
 
@@ -244,6 +225,7 @@ static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ *
 
   db->driver_options = option;
   option = db->driver_options + db->driver_options_num;
+  memset (option, 0, sizeof (*option));
 
   option->key = strdup (ci->values[0].value.string);
   if (option->key == NULL)
@@ -252,12 +234,21 @@ static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ *
     return (-1);
   }
 
-  option->value = strdup (ci->values[1].value.string);
-  if (option->value == NULL)
+  if (ci->values[1].type == OCONFIG_TYPE_STRING)
   {
-    ERROR ("dbi plugin: strdup failed.");
-    sfree (option->key);
-    return (-1);
+    option->value.string = strdup (ci->values[1].value.string);
+    if (option->value.string == NULL)
+    {
+      ERROR ("dbi plugin: strdup failed.");
+      sfree (option->key);
+      return (-1);
+    }
+  }
+  else
+  {
+    assert (ci->values[1].type == OCONFIG_TYPE_NUMBER);
+    option->value.numeric = (int) (ci->values[1].value.number + .5);
+    option->is_numeric = 1;
   }
 
   db->driver_options_num++;
@@ -286,7 +277,7 @@ static int cdbi_config_add_database (oconfig_item_t *ci) /* {{{ */
   }
   memset (db, 0, sizeof (*db));
 
-  status = cdbi_config_set_string (&db->name, ci);
+  status = cf_util_get_string (ci, &db->name);
   if (status != 0)
   {
     sfree (db);
@@ -299,14 +290,16 @@ static int cdbi_config_add_database (oconfig_item_t *ci) /* {{{ */
     oconfig_item_t *child = ci->children + i;
 
     if (strcasecmp ("Driver", child->key) == 0)
-      status = cdbi_config_set_string (&db->driver, child);
+      status = cf_util_get_string (child, &db->driver);
     else if (strcasecmp ("DriverOption", child->key) == 0)
       status = cdbi_config_add_database_driver_option (db, child);
     else if (strcasecmp ("SelectDB", child->key) == 0)
-      status = cdbi_config_set_string (&db->select_db, child);
+      status = cf_util_get_string (child, &db->select_db);
     else if (strcasecmp ("Query", child->key) == 0)
       status = udb_query_pick_from_list (child, queries, queries_num,
           &db->queries, &db->queries_num);
+    else if (strcasecmp ("Host", child->key) == 0)
+      status = cf_util_get_string (child, &db->host);
     else
     {
       WARNING ("dbi plugin: Option `%s' not allowed here.", child->key);
@@ -564,7 +557,7 @@ static int cdbi_read_database_query (cdbi_database_t *db, /* {{{ */
     sstrncpy (column_names[i], column_name, DATA_MAX_NAME_LEN);
   } /* }}} for (i = 0; i < column_num; i++) */
 
-  udb_query_prepare_result (q, prep_area, hostname_g,
+  udb_query_prepare_result (q, prep_area, (db->host ? db->host : hostname_g),
       /* plugin = */ "dbi", db->name,
       column_names, column_num, /* interval = */ 0);
 
@@ -688,24 +681,38 @@ static int cdbi_connect_database (cdbi_database_t *db) /* {{{ */
    * trouble finding out how to configure the plugin correctly.. */
   for (i = 0; i < db->driver_options_num; i++)
   {
-    DEBUG ("dbi plugin: cdbi_connect_database (%s): "
-        "key = %s; value = %s;",
-        db->name,
-        db->driver_options[i].key,
-        db->driver_options[i].value);
+    if (db->driver_options[i].is_numeric)
+    {
+      status = dbi_conn_set_option_numeric (connection,
+          db->driver_options[i].key, db->driver_options[i].value.numeric);
+      if (status != 0)
+      {
+        char errbuf[1024];
+        ERROR ("dbi plugin: cdbi_connect_database (%s): "
+            "dbi_conn_set_option_numeric (\"%s\", %i) failed: %s.",
+            db->name,
+            db->driver_options[i].key, db->driver_options[i].value.numeric,
+            cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+      }
+    }
+    else
+    {
+      status = dbi_conn_set_option (connection,
+          db->driver_options[i].key, db->driver_options[i].value.string);
+      if (status != 0)
+      {
+        char errbuf[1024];
+        ERROR ("dbi plugin: cdbi_connect_database (%s): "
+            "dbi_conn_set_option (\"%s\", \"%s\") failed: %s.",
+            db->name,
+            db->driver_options[i].key, db->driver_options[i].value.string,
+            cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+      }
+    }
 
-    status = dbi_conn_set_option (connection,
-        db->driver_options[i].key, db->driver_options[i].value);
     if (status != 0)
     {
-      char errbuf[1024];
-      const char *opt;
-
-      ERROR ("dbi plugin: cdbi_connect_database (%s): "
-          "dbi_conn_set_option (%s, %s) failed: %s.",
-          db->name,
-          db->driver_options[i].key, db->driver_options[i].value,
-          cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+      char const *opt;
 
       INFO ("dbi plugin: This is a list of all options understood "
           "by the `%s' driver:", db->driver);
index 08381a8..dec14f3 100644 (file)
@@ -73,7 +73,7 @@ static int ethstat_add_interface (const oconfig_item_t *ci) /* {{{ */
     return (status);
 
   interfaces_num++;
-  INFO("ethstat plugin: Registred interface %s",
+  INFO("ethstat plugin: Registered interface %s",
       interfaces[interfaces_num - 1]);
 
   return (0);
@@ -83,6 +83,7 @@ static int ethstat_add_map (const oconfig_item_t *ci) /* {{{ */
 {
   value_map_t *map;
   int status;
+  char *key;
 
   if ((ci->values_num < 2)
       || (ci->values_num > 3)
@@ -96,9 +97,17 @@ static int ethstat_add_map (const oconfig_item_t *ci) /* {{{ */
     return (-1);
   }
 
+  key = strdup (ci->values[0].value.string);
+  if (key == NULL)
+  {
+    ERROR ("ethstat plugin: strdup(3) failed.");
+    return (ENOMEM);
+  }
+
   map = malloc (sizeof (*map));
   if (map == NULL)
   {
+    sfree (key);
     ERROR ("ethstat plugin: malloc(3) failed.");
     return (ENOMEM);
   }
@@ -115,23 +124,24 @@ static int ethstat_add_map (const oconfig_item_t *ci) /* {{{ */
     if (value_map == NULL)
     {
       sfree (map);
+      sfree (key);
       ERROR ("ethstat plugin: c_avl_create() failed.");
       return (-1);
     }
   }
 
   status = c_avl_insert (value_map,
-      /* key = */ ci->values[0].value.string,
+      /* key = */ key,
       /* value = */ map);
   if (status != 0)
   {
-    sfree (map);
     if (status > 0)
-      ERROR ("ethstat plugin: Multiple mappings for \"%s\".",
-          ci->values[0].value.string);
+      ERROR ("ethstat plugin: Multiple mappings for \"%s\".", key);
     else
-      ERROR ("ethstat plugin: c_avl_insert(\"%s\") failed.",
-          ci->values[0].value.string);
+      ERROR ("ethstat plugin: c_avl_insert(\"%s\") failed.", key);
+
+    sfree (map);
+    sfree (key);
     return (-1);
   }
 
@@ -328,10 +338,31 @@ static int ethstat_read(void)
   return 0;
 }
 
+static int ethstat_shutdown (void)
+{
+  void *key = NULL;
+  void *value = NULL;
+
+  if (value_map == NULL)
+    return (0);
+
+  while (c_avl_pick (value_map, &key, &value) == 0)
+  {
+    sfree (key);
+    sfree (value);
+  }
+
+  c_avl_destroy (value_map);
+  value_map = NULL;
+
+  return (0);
+}
+
 void module_register (void)
 {
   plugin_register_complex_config ("ethstat", ethstat_config);
   plugin_register_read ("ethstat", ethstat_read);
+  plugin_register_shutdown ("ethstat", ethstat_shutdown);
 }
 
 /* vim: set sw=2 sts=2 et fdm=marker : */
index 0b34e1f..acbe93f 100644 (file)
@@ -106,9 +106,11 @@ struct lcc_network_buffer_s
   char *username;
   char *password;
 
+#if HAVE_LIBGCRYPT
   gcry_cipher_hd_t encr_cypher;
   size_t encr_header_len;
   char encr_iv[16];
+#endif
 };
 
 #define SSTRNCPY(dst,src,sz) do { \
@@ -128,6 +130,7 @@ static _Bool have_gcrypt (void) /* {{{ */
     return (result);
   need_init = 0;
 
+#if HAVE_LIBGCRYPT
   gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
 
   if (!gcry_check_version (GCRYPT_VERSION))
@@ -138,6 +141,9 @@ static _Bool have_gcrypt (void) /* {{{ */
 
   result = 1;
   return (1);
+#else
+  return(0);
+#endif
 } /* }}} _Bool have_gcrypt */
 
 static uint64_t htonll (uint64_t val) /* {{{ */
@@ -494,6 +500,7 @@ static int nb_add_value_list (lcc_network_buffer_t *nb, /* {{{ */
   return (0);
 } /* }}} int nb_add_value_list */
 
+#if HAVE_LIBGCRYPT
 static int nb_add_signature (lcc_network_buffer_t *nb) /* {{{ */
 {
   char *buffer;
@@ -620,6 +627,7 @@ static int nb_add_encryption (lcc_network_buffer_t *nb) /* {{{ */
 
   return (0);
 } /* }}} int nb_add_encryption */
+#endif
 
 /*
  * Public functions
@@ -720,6 +728,7 @@ int lcc_network_buffer_initialize (lcc_network_buffer_t *nb) /* {{{ */
   nb->ptr = nb->buffer;
   nb->free = nb->size;
 
+#if HAVE_LIBGCRYPT
   if (nb->seclevel == SIGN)
   {
     size_t username_len;
@@ -765,6 +774,7 @@ int lcc_network_buffer_initialize (lcc_network_buffer_t *nb) /* {{{ */
     ADD_GENERIC (nb, hash, sizeof (hash));
     assert ((nb->encr_header_len + nb->free) == nb->size);
   }
+#endif
 
   return (0);
 } /* }}} int lcc_network_buffer_initialize */
@@ -774,10 +784,12 @@ int lcc_network_buffer_finalize (lcc_network_buffer_t *nb) /* {{{ */
   if (nb == NULL)
     return (EINVAL);
 
+#if HAVE_LIBGCRYPT
   if (nb->seclevel == SIGN)
     nb_add_signature (nb);
   else if (nb->seclevel == ENCRYPT)
     nb_add_encryption (nb);
+#endif
 
   return (0);
 } /* }}} int lcc_network_buffer_finalize */
index 72e3484..8a5bc6a 100644 (file)
@@ -460,7 +460,7 @@ static int network_dispatch_values (value_list_t *vl, /* {{{ */
     }
   }
 
-  plugin_dispatch_values_secure (vl);
+  plugin_dispatch_values (vl);
   stats_values_dispatched++;
 
   meta_data_destroy (vl->meta);
@@ -3318,13 +3318,13 @@ static int network_stats_read (void) /* {{{ */
        vl.values[0].derive = (derive_t) copy_octets_rx;
        vl.values[1].derive = (derive_t) copy_octets_tx;
        sstrncpy (vl.type, "if_octets", sizeof (vl.type));
-       plugin_dispatch_values_secure (&vl);
+       plugin_dispatch_values (&vl);
 
        /* Packets received / send */
        vl.values[0].derive = (derive_t) copy_packets_rx;
        vl.values[1].derive = (derive_t) copy_packets_tx;
        sstrncpy (vl.type, "if_packets", sizeof (vl.type));
-       plugin_dispatch_values_secure (&vl);
+       plugin_dispatch_values (&vl);
 
        /* Values (not) dispatched and (not) send */
        sstrncpy (vl.type, "total_values", sizeof (vl.type));
@@ -3333,28 +3333,28 @@ static int network_stats_read (void) /* {{{ */
        vl.values[0].derive = (derive_t) copy_values_dispatched;
        sstrncpy (vl.type_instance, "dispatch-accepted",
                        sizeof (vl.type_instance));
-       plugin_dispatch_values_secure (&vl);
+       plugin_dispatch_values (&vl);
 
        vl.values[0].derive = (derive_t) copy_values_not_dispatched;
        sstrncpy (vl.type_instance, "dispatch-rejected",
                        sizeof (vl.type_instance));
-       plugin_dispatch_values_secure (&vl);
+       plugin_dispatch_values (&vl);
 
        vl.values[0].derive = (derive_t) copy_values_sent;
        sstrncpy (vl.type_instance, "send-accepted",
                        sizeof (vl.type_instance));
-       plugin_dispatch_values_secure (&vl);
+       plugin_dispatch_values (&vl);
 
        vl.values[0].derive = (derive_t) copy_values_not_sent;
        sstrncpy (vl.type_instance, "send-rejected",
                        sizeof (vl.type_instance));
-       plugin_dispatch_values_secure (&vl);
+       plugin_dispatch_values (&vl);
 
        /* Receive queue length */
        vl.values[0].gauge = (gauge_t) copy_receive_list_length;
        sstrncpy (vl.type, "queue_length", sizeof (vl.type));
        vl.type_instance[0] = 0;
-       plugin_dispatch_values_secure (&vl);
+       plugin_dispatch_values (&vl);
 
        return (0);
 } /* }}} int network_stats_read */
index 461e806..5c6f620 100644 (file)
--- a/src/nfs.c
+++ b/src/nfs.c
@@ -246,7 +246,7 @@ static void nfs_procedures_submit (const char *plugin_instance,
                vl.values = values + i;
                sstrncpy (vl.type_instance, type_instances[i],
                                sizeof (vl.type_instance));
-               plugin_dispatch_values_secure (&vl);
+               plugin_dispatch_values (&vl);
        }
 } /* void nfs_procedures_submit */
 
index bbede05..547b5eb 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/plugin.c
- * Copyright (C) 2005-2011  Florian octo Forster
+ * Copyright (C) 2005-2013  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -69,6 +69,14 @@ struct read_func_s
 };
 typedef struct read_func_s read_func_t;
 
+struct write_queue_s;
+typedef struct write_queue_s write_queue_t;
+struct write_queue_s
+{
+       value_list_t *vl;
+       write_queue_t *next;
+};
+
 /*
  * Private variables
  */
@@ -95,12 +103,22 @@ static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
 static int             read_threads_num = 0;
 
+static write_queue_t  *write_queue_head;
+static write_queue_t  *write_queue_tail;
+static _Bool           write_loop = 1;
+static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t      *write_threads = NULL;
+static size_t          write_threads_num = 0;
+
 static pthread_key_t   plugin_ctx_key;
 static _Bool           plugin_ctx_key_initialized = 0;
 
 /*
  * Static functions
  */
+static int plugin_dispatch_values_internal (value_list_t *vl);
+
 static const char *plugin_get_dir (void)
 {
        if (plugindir == NULL)
@@ -573,6 +591,210 @@ static void stop_read_threads (void)
        read_threads_num = 0;
 } /* void stop_read_threads */
 
+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+       if (vl == NULL)
+               return;
+
+       meta_data_destroy (vl->meta);
+       sfree (vl->values);
+       sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+       value_list_t *vl;
+
+       if (vl_orig == NULL)
+               return (NULL);
+
+       vl = malloc (sizeof (*vl));
+       if (vl == NULL)
+               return (NULL);
+       memcpy (vl, vl_orig, sizeof (*vl));
+
+       vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+       if (vl->values == NULL)
+       {
+               plugin_value_list_free (vl);
+               return (NULL);
+       }
+       memcpy (vl->values, vl_orig->values,
+                       vl_orig->values_len * sizeof (*vl->values));
+
+       vl->meta = meta_data_clone (vl->meta);
+       if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+       {
+               plugin_value_list_free (vl);
+               return (NULL);
+       }
+
+       return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+       write_queue_t *q;
+
+       q = malloc (sizeof (*q));
+       if (q == NULL)
+               return (ENOMEM);
+       q->next = NULL;
+
+       q->vl = plugin_value_list_clone (vl);
+       if (q->vl == NULL)
+       {
+               sfree (q);
+               return (ENOMEM);
+       }
+
+       pthread_mutex_lock (&write_lock);
+
+       if (write_queue_tail == NULL)
+       {
+               write_queue_head = q;
+               write_queue_tail = q;
+       }
+       else
+       {
+               write_queue_tail->next = q;
+               write_queue_tail = q;
+       }
+
+       pthread_cond_signal (&write_cond);
+       pthread_mutex_unlock (&write_lock);
+
+       return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+       write_queue_t *q;
+       value_list_t *vl;
+
+       pthread_mutex_lock (&write_lock);
+
+       while (write_loop && (write_queue_head == NULL))
+               pthread_cond_wait (&write_cond, &write_lock);
+
+       if (write_queue_head == NULL)
+       {
+               pthread_mutex_unlock (&write_lock);
+               return (NULL);
+       }
+
+       q = write_queue_head;
+       write_queue_head = q->next;
+       if (write_queue_head == NULL)
+               write_queue_tail = NULL;
+
+       pthread_mutex_unlock (&write_lock);
+
+       vl = q->vl;
+       sfree (q);
+       return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+       while (write_loop)
+       {
+               value_list_t *vl = plugin_write_dequeue ();
+               if (vl == NULL)
+                       continue;
+
+               plugin_dispatch_values_internal (vl);
+
+               plugin_value_list_free (vl);
+       }
+
+       pthread_exit (NULL);
+       return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+       size_t i;
+
+       if (write_threads != NULL)
+               return;
+
+       write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+       if (write_threads == NULL)
+       {
+               ERROR ("plugin: start_write_threads: calloc failed.");
+               return;
+       }
+
+       write_threads_num = 0;
+       for (i = 0; i < num; i++)
+       {
+               int status;
+
+               status = pthread_create (write_threads + write_threads_num,
+                               /* attr = */ NULL,
+                               plugin_write_thread,
+                               /* arg = */ NULL);
+               if (status != 0)
+               {
+                       char errbuf[1024];
+                       ERROR ("plugin: start_write_threads: pthread_create failed "
+                                       "with status %i (%s).", status,
+                                       sstrerror (status, errbuf, sizeof (errbuf)));
+                       return;
+               }
+
+               write_threads_num++;
+       } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+       write_queue_t *q;
+       int i;
+
+       if (write_threads == NULL)
+               return;
+
+       INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+       pthread_mutex_lock (&write_lock);
+       write_loop = 0;
+       DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+       pthread_cond_broadcast (&write_cond);
+       pthread_mutex_unlock (&write_lock);
+
+       for (i = 0; i < write_threads_num; i++)
+       {
+               if (pthread_join (write_threads[i], NULL) != 0)
+               {
+                       ERROR ("plugin: stop_write_threads: pthread_join failed.");
+               }
+               write_threads[i] = (pthread_t) 0;
+       }
+       sfree (write_threads);
+       write_threads_num = 0;
+
+       pthread_mutex_lock (&write_lock);
+       i = 0;
+       for (q = write_queue_head; q != NULL; q = q->next)
+       {
+               plugin_value_list_free (q->vl);
+               sfree (q);
+               i++;
+       }
+       write_queue_head = NULL;
+       write_queue_tail = NULL;
+       pthread_mutex_unlock (&write_lock);
+
+       if (i > 0)
+       {
+               WARNING ("plugin: %i value list%s left after shutting down "
+                               "the write threads.",
+                               i, (i == 1) ? " was" : "s were");
+       }
+} /* }}} void stop_write_threads */
+
 /*
  * Public functions
  */
@@ -1166,6 +1388,15 @@ void plugin_init_all (void)
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
 
+       {
+               char const *tmp = global_option_get ("WriteThreads");
+               int num = atoi (tmp);
+
+               if (num < 1)
+                       num = 5;
+
+               start_write_threads ((size_t) num);
+       }
 
        if ((list_init == NULL) && (read_heap == NULL))
                return;
@@ -1435,6 +1666,8 @@ void plugin_shutdown_all (void)
                plugin_set_ctx (old_ctx);
        }
 
+       stop_write_threads ();
+
        /* Write plugins which use the `user_data' pointer usually need the
         * same data available to the flush callback. If this is the case, set
         * the free_function to NULL when registering the flush callback and to
@@ -1490,7 +1723,7 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
   return (0);
 } /* int }}} plugin_dispatch_missing */
 
-int plugin_dispatch_values (value_list_t *vl)
+static int plugin_dispatch_values_internal (value_list_t *vl)
 {
        int status;
        static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
@@ -1684,53 +1917,24 @@ int plugin_dispatch_values (value_list_t *vl)
        }
 
        return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_internal */
 
-int plugin_dispatch_values_secure (const value_list_t *vl)
+int plugin_dispatch_values (value_list_t const *vl)
 {
-  value_list_t vl_copy;
-  int status;
-
-  if (vl == NULL)
-    return EINVAL;
-
-  memcpy (&vl_copy, vl, sizeof (vl_copy));
-
-  /* Write callbacks must not change the values and meta pointers, so we can
-   * savely skip copying those and make this more efficient. */
-  if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
-    return (plugin_dispatch_values (&vl_copy));
-
-  /* Set pointers to NULL, just to be on the save side. */
-  vl_copy.values = NULL;
-  vl_copy.meta = NULL;
-
-  vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
-  if (vl_copy.values == NULL)
-  {
-    ERROR ("plugin_dispatch_values_secure: malloc failed.");
-    return (ENOMEM);
-  }
-  memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
-
-  if (vl->meta != NULL)
-  {
-    vl_copy.meta = meta_data_clone (vl->meta);
-    if (vl_copy.meta == NULL)
-    {
-      ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
-      free (vl_copy.values);
-      return (ENOMEM);
-    }
-  } /* if (vl->meta) */
-
-  status = plugin_dispatch_values (&vl_copy);
+       int status;
 
-  meta_data_destroy (vl_copy.meta);
-  free (vl_copy.values);
+       status = plugin_write_enqueue (vl);
+       if (status != 0)
+       {
+               char errbuf[1024];
+               ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+                               "with status %i (%s).", status,
+                               sstrerror (status, errbuf, sizeof (errbuf)));
+               return (status);
+       }
 
-  return (status);
-} /* int plugin_dispatch_values_secure */
+       return (0);
+}
 
 int plugin_dispatch_notification (const notification_t *notif)
 {
index 0f35de5..635ff30 100644 (file)
@@ -326,8 +326,7 @@ int plugin_unregister_notification (const char *name);
  *  `vl'        Value list of the values that have been read by a `read'
  *              function.
  */
-int plugin_dispatch_values (value_list_t *vl);
-int plugin_dispatch_values_secure (const value_list_t *vl);
+int plugin_dispatch_values (value_list_t const *vl);
 int plugin_dispatch_missing (const value_list_t *vl);
 
 int plugin_dispatch_notification (const notification_t *notif);
index 15d4666..6c9ab44 100644 (file)
@@ -900,10 +900,10 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
 
                if ((PGRES_COMMAND_OK != PQresultStatus (res))
                                && (PGRES_TUPLES_OK != PQresultStatus (res))) {
+                       PQclear (res);
+
                        if ((CONNECTION_OK != PQstatus (db->conn))
                                        && (0 == c_psql_check_connection (db))) {
-                               PQclear (res);
-
                                /* try again */
                                res = PQexecParams (db->conn, writer->statement,
                                                STATIC_ARRAY_SIZE (params), NULL,
@@ -912,6 +912,7 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
 
                                if ((PGRES_COMMAND_OK == PQresultStatus (res))
                                                || (PGRES_TUPLES_OK == PQresultStatus (res))) {
+                                       PQclear (res);
                                        success = 1;
                                        continue;
                                }
@@ -932,6 +933,8 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
                        pthread_mutex_unlock (&db->db_lock);
                        return -1;
                }
+
+               PQclear (res);
                success = 1;
        }
 
index 439cf4b..85a8354 100644 (file)
@@ -33,6 +33,7 @@
 #endif
 
 #define REDIS_DEF_HOST   "localhost"
+#define REDIS_DEF_PASSWD ""
 #define REDIS_DEF_PORT    6379
 #define REDIS_DEF_TIMEOUT 2000
 #define MAX_REDIS_NODE_NAME 64
@@ -229,8 +230,8 @@ static void redis_submit_d (char *plugin_instance,
 
 static int redis_init (void) /* {{{ */
 {
-  redis_node_t rn = { "default", REDIS_DEF_HOST, REDIS_DEF_PORT,
-    REDIS_DEF_TIMEOUT, /* next = */ NULL };
+  redis_node_t rn = { "default", REDIS_DEF_HOST, REDIS_DEF_PASSWD,
+    REDIS_DEF_PORT, REDIS_DEF_TIMEOUT, /* next = */ NULL };
 
   if (nodes_head == NULL)
     redis_node_add (&rn);
index 25f4d7c..0abab4b 100644 (file)
@@ -158,7 +158,7 @@ tcp_connections             value:GAUGE:0:4294967295
 temperature            value:GAUGE:-273.15:U
 threads                        value:GAUGE:0:U
 time_dispersion                value:GAUGE:-1000000:1000000
-timeleft               value:GAUGE:0:3600
+timeleft               value:GAUGE:0:U
 time_offset            value:GAUGE:-1000000:1000000
 total_bytes            value:DERIVE:0:U
 total_connections      value:DERIVE:0:U
index de60e39..602f47a 100644 (file)
@@ -1,8 +1,8 @@
 /**
  * collectd - src/varnish.c
- * Copyright (C) 2010 Jérôme Renard
- * Copyright (C) 2010 Marc Fournier
- * Copyright (C) 2010 Florian Forster
+ * Copyright (C) 2010      Jérôme Renard
+ * Copyright (C) 2010      Marc Fournier
+ * Copyright (C) 2010-2012 Florian Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -20,7 +20,7 @@
  * Authors:
  *   Jérôme Renard <jerome.renard at gmail.com>
  *   Marc Fournier <marc.fournier at camptocamp.com>
- *   Florian octo Forster <octo at verplant.org>
+ *   Florian octo Forster <octo at collectd.org>
  **/
 
 /**
@@ -412,11 +412,21 @@ static int varnish_read (user_data_t *ud) /* {{{ */
 
        vd = VSM_New();
        VSC_Setup(vd);
-       if (VSM_n_Arg(vd, conf->instance) == -1)
+
+       if (conf->instance != NULL)
        {
-               ERROR ("Varnish plugin : unable to load statistics from instance");
-               return (-1);
+               int status;
+
+               status = VSM_n_Arg (vd, conf->instance);
+               if (status < 0)
+               {
+                       ERROR ("varnish plugin: VSM_n_Arg (\"%s\") failed "
+                                       "with status %i.",
+                                       conf->instance, status);
+                       return (-1);
+               }
        }
+
        if (VSC_Open (vd, /* diag = */ 1))
        {
                ERROR ("varnish plugin: Unable to load statistics.");
index 046b7b0..aa90019 100644 (file)
@@ -128,7 +128,7 @@ static int za_read (void)
        za_read_derive (ksp, "stolen",   "cache_operation", "stolen");
 
         /* Issue indicators */
-        za_read_derive (ksp, "mutex_miss", "mutex_operation", "miss");
+        za_read_derive (ksp, "mutex_miss", "mutex_operations", "miss");
        za_read_derive (ksp, "hash_collisions", "hash_collisions", "");
        
         /* Evictions */