Resolves Github issue #221.
write your own plugins in Perl and return arbitrary values using this
API. See collectd-perl(5).
+ - pf
+ Query statistics from BSD's packet filter "pf".
+
- pinba
Receive and dispatch timing values from Pinba, a profiling extension for
PHP.
* Miscellaneous plugins:
+ - aggregation
+ Selects multiple value lists based on patterns or regular expressions
+ and creates new aggregated values lists from those.
+
- threshold
Checks values against configured thresholds and creates notifications if
values are out of bounds. See collectd-threshold(5) for details.
then
AC_CHECK_HEADERS(mach/mach_init.h mach/host_priv.h mach/mach_error.h mach/mach_host.h mach/mach_port.h mach/mach_types.h mach/message.h mach/processor_set.h mach/processor.h mach/processor_info.h mach/task.h mach/thread_act.h mach/vm_region.h mach/vm_map.h mach/vm_prot.h mach/vm_statistics.h mach/kern_return.h)
AC_CHECK_HEADERS(CoreFoundation/CoreFoundation.h IOKit/IOKitLib.h IOKit/IOTypes.h IOKit/ps/IOPSKeys.h IOKit/IOBSD.h IOKit/storage/IOBlockStorageDriver.h)
+ # For the battery plugin
+ AC_CHECK_HEADERS(IOKit/ps/IOPowerSources.h, [], [],
+[
+#if HAVE_IOKIT_IOKITLIB_H
+# include <IOKit/IOKitLib.h>
+#endif
+#if HAVE_IOKIT_IOTYPES_H
+# include <IOKit/IOTypes.h>
+#endif
+])
+
fi
+
AC_CHECK_HEADERS(sys/sysctl.h, [], [],
[
#if HAVE_SYS_TYPES_H
have_linux_raid_md_u_h="no"
fi
-# For the battery plugin
-AC_CHECK_HEADERS(IOKit/ps/IOPowerSources.h, [], [],
-[
-#if HAVE_IOKIT_IOKITLIB_H
-# include <IOKit/IOKitLib.h>
-#endif
-#if HAVE_IOKIT_IOTYPES_H
-# include <IOKit/IOTypes.h>
-#endif
-])
-
# For the swap module
have_linux_wireless_h="no"
if test "x$ac_system" = "xLinux"
GraphWidth 400
#UnixSockAddr "/var/run/collectd-unixsock"
<Type apache_bytes>
- DataSources count
+ DataSources value
DSName "count Bytes/s"
RRDTitle "Apache Traffic"
RRDVerticalLabel "Bytes/s"
Color count 0000ff
</Type>
<Type apache_requests>
- DataSources count
+ DataSources value
DSName "count Requests/s"
RRDTitle "Apache Traffic"
RRDVerticalLabel "Requests/s"
</Type>
<Type apache_scoreboard>
Module GenericStacked
- DataSources count
+ DataSources value
RRDTitle "Apache scoreboard on {hostname}"
RRDVerticalLabel "Slots"
RRDFormat "%6.2lf"
RRDFormat "%6.1lf"
</Type>
<Type conntrack>
- DataSources conntrack
+ DataSources value
DSName conntrack Conntrack count
RRDTitle "nf_conntrack connections on {hostname}"
RRDVerticalLabel "Count"
RRDFormat "%4.0lf"
</Type>
<Type entropy>
- DataSources entropy
+ DataSources value
DSName entropy Entropy bits
RRDTitle "Available entropy on {hostname}"
RRDVerticalLabel "Bits"
Color value 00b000
</Type>
<Type frequency>
- DataSources frequency
+ DataSources value
DSName frequency Frequency
RRDTitle "Frequency ({type_instance})"
RRDVerticalLabel "Hertz"
Scale 8
</Type>
<Type percent>
- DataSources percent
+ DataSources value
DSName percent Percent
RRDTitle "Percent ({type_instance})"
RRDVerticalLabel "Percent"
Color percent 0000ff
</Type>
<Type ping>
- DataSources ping
+ DataSources value
DSName "ping Latency"
RRDTitle "Network latency ({type_instance})"
RRDVerticalLabel "Milliseconds"
Scale 0.001
</Type>
<Type users>
- DataSources users
+ DataSources value
DSName users Users
RRDTitle "Users ({type_instance}) on {hostname}"
RRDVerticalLabel "Users"
if BUILD_WITH_LIBSOCKET
collectd_tg_LDADD += -lsocket
endif
+if BUILD_WITH_LIBRT
+collectd_tg_LDADD += -lrt
+endif
if BUILD_AIX
collectd_tg_LDADD += -lm
endif
collectd-perl.5 \
collectd-python.5 \
collectd-snmp.5 \
+ collectd-tg.1 \
collectd-threshold.5 \
collectd-unixsock.5 \
types.db.5
collectd-python.pod \
collectd.pod \
collectd-snmp.pod \
+ collectd-tg.pod \
collectd-threshold.pod \
collectd-unixsock.pod \
postgresql_default.conf \
vl->values = &v;
vl->values_len = 1;
- plugin_dispatch_values_secure (vl);
+ plugin_dispatch_values (vl);
vl->values = NULL;
vl->values_len = 0;
apcups_detail.battv = -1.0;
apcups_detail.loadpct = -1.0;
apcups_detail.bcharge = -1.0;
- apcups_detail.timeleft = -1.0;
+ apcups_detail.timeleft = NAN;
apcups_detail.itemp = -300.0;
apcups_detail.linefreq = -1.0;
--- /dev/null
+=head1 NAME
+
+collectd-tg - Traffic generator for collectd.
+
+=head1 SYNOPSIS
+
+collectd-tg B<-n> I<num_vl> B<-H> I<num_hosts> B<-p> I<num_plugins> B<-i> I<interval> B<-d> I<dest> B<-D> I<dport>
+
+=head1 DESCRIPTION
+
+B<collectd-tg> generates bogus I<collectd> network traffic. While host, plugin
+and values are generated randomly, the generated traffic tries to mimic "real"
+traffic as closely as possible.
+
+=head1 ARGUMENTS AND OPTIONS
+
+The following options are understood by I<collectd-tg>. The order of the
+arguments generally doesn't matter, as long as no argument is passed more than
+once.
+
+=over 4
+
+=item B<-n> I<num_vl>
+
+Sets the number of unique I<value lists> (VL) to generate. Defaults to 10000.
+
+=item B<-H> I<num_hosts>
+
+Sets the number of unique hosts to simulate. Defaults to 1000.
+
+=item B<-p> I<num_plugins>
+
+Sets the number of unique plugins to simulate. Defaults to 20.
+
+=item B<-i> I<interval>
+
+Sets the interval in which each I<value list> is dispatched. Defaults to 10.0
+seconds.
+
+=item B<-d> I<dest>
+
+Sets the destination to which to send the generated network traffic. Defaults
+to the IPv6 multicast address, C<ff18::efc0:4a42>.
+
+=item B<-D> I<dport>
+
+Sets the destination port or service to which to send the generated network
+traffic. Defaults to I<collectd's> default port, C<25826>.
+
+=item B<-h>
+
+Print usage summary.
+
+=back
+
+=head1 SEE ALSO
+
+L<collectd(1)>,
+L<collectd.conf(5)>
+
+=head1 AUTHOR
+
+Florian Forster E<lt>octoE<nbsp>atE<nbsp>collectd.orgE<gt>
+
+=cut
#PIDFile "@localstatedir@/run/@PACKAGE_NAME@.pid"
#PluginDir "@libdir@/@PACKAGE_NAME@"
#TypesDB "@prefix@/share/@PACKAGE_NAME@/types.db"
+
+#----------------------------------------------------------------------------#
+# Interval at which to query values. This may be overwritten on a per-plugin #
+# base by using the 'Interval' option of the LoadPlugin block: #
+# <LoadPlugin foo> #
+# Interval 60 #
+# </LoadPlugin> #
+#----------------------------------------------------------------------------#
#Interval 10
+
#Timeout 2
#ReadThreads 5
+#WriteThreads 5
##############################################################################
# Logging #
# #SelectDB "custdb0"
# Query "num_of_customers"
# #Query "..."
+# #Host "..."
# </Database>
#</Plugin>
# ValuesFrom "count"
# </Result>
# </Query>
+# <Writer sqlstore>
+# # See contrib/postgresql/collectd_insert.sql for details
+# Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8, $9);"
+# StoreRates true
+# </Writer>
# <Database foo>
# Host "hostname"
# Port "5432"
# Query backend # predefined
# Query rt36_tickets
# </Database>
+# <Database qux>
+# Service "collectd_store"
+# Writer sqlstore
+# # see collectd.conf(5) for details
+# CommitInterval 30
+# </Database>
#</Plugin>
#<Plugin powerdns>
Number of threads to start for reading plugins. The default value is B<5>, but
you may want to increase this if you have more than five plugins that take a
-long time to read. Mostly those are plugin that do network-IO. Setting this to
-a value higher than the number of plugins you've loaded is totally useless.
+long time to read. Mostly those are plugins that do network-IO. Setting this to
+a value higher than the number of registered read callbacks is not recommended.
+
+=item B<WriteThreads> I<Num>
+
+Number of threads to start for dispatching value lists to write plugins. The
+default value is B<5>, but you may want to increase this if you have more than
+five plugins that may take relatively long to write to.
=item B<Hostname> I<Name>
default), the I<transient> delivery mode will be used, i.e. messages may be
lost due to high load, overflowing queues or similar issues.
-=item B<Format> B<Command>|B<JSON> (Publish only)
+=item B<Format> B<Command>|B<JSON>|B<Graphite> (Publish only)
Selects the format in which messages are sent to the broker. If set to
B<Command> (the default), values are sent as C<PUTVAL> commands which are
L<http://libdbi-drivers.sourceforge.net/>. However, the options "host",
"username", "password", and "dbname" seem to be deE<nbsp>facto standards.
+DBDs can register two types of options: String options and numeric options. The
+plugin will use the C<dbi_conn_set_option> function when the configuration
+provides a string and the C<dbi_conn_require_option_numeric> function when the
+configuration provides a number. So these two lines will actually result in
+different calls being used:
+
+ DriverOption "Port" 1234 # numeric
+ DriverOption "Port" "1234" # string
+
Unfortunately, drivers are not too keen to report errors when an unknown option
is passed to them, so invalid settings here may go unnoticed. This is not the
plugin's fault, it will report errors if it gets them from the libraryE<nbsp>/
the driver. If a driver complains about an option, the plugin will dump a
-complete list of all options understood by that driver to the log.
+complete list of all options understood by that driver to the log. There is no
+way to programatically find out if an option expects a string or a numeric
+argument, so you will have to refer to the appropriate DBD's documentation to
+find this out. Sorry.
=item B<SelectDB> I<Database>
blocks you want to refer to must be placed above the database block you want to
refer to them from.
+=item B<Host> I<Hostname>
+
+Sets the B<host> field of I<value lists> to I<Hostname> when dispatching
+values. Defaults to the global hostname setting.
+
=back
=head2 Plugin C<df>
{"FQDNLookup", NULL, "true"},
{"Interval", NULL, NULL},
{"ReadThreads", NULL, "5"},
+ {"WriteThreads", NULL, "5"},
{"Timeout", NULL, "2"},
{"PreCacheChain", NULL, "PreCache"},
{"PostCacheChain", NULL, "PostCache"}
/**
* collectd - src/dbi.c
- * Copyright (C) 2008,2009 Florian octo Forster
+ * Copyright (C) 2008-2013 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* Authors:
- * Florian octo Forster <octo at verplant.org>
+ * Florian octo Forster <octo at collectd.org>
**/
#include "collectd.h"
struct cdbi_driver_option_s /* {{{ */
{
char *key;
- char *value;
+ union
+ {
+ char *string;
+ int numeric;
+ } value;
+ _Bool is_numeric;
};
typedef struct cdbi_driver_option_s cdbi_driver_option_t; /* }}} */
char *select_db;
char *driver;
+ char *host;
cdbi_driver_option_t *driver_options;
size_t driver_options_num;
for (i = 0; i < db->driver_options_num; i++)
{
sfree (db->driver_options[i].key);
- sfree (db->driver_options[i].value);
+ if (!db->driver_options[i].is_numeric)
+ sfree (db->driver_options[i].value.string);
}
sfree (db->driver_options);
* </Plugin>
*/
-static int cdbi_config_set_string (char **ret_string, /* {{{ */
- oconfig_item_t *ci)
-{
- char *string;
-
- if ((ci->values_num != 1)
- || (ci->values[0].type != OCONFIG_TYPE_STRING))
- {
- WARNING ("dbi plugin: The `%s' config option "
- "needs exactly one string argument.", ci->key);
- return (-1);
- }
-
- string = strdup (ci->values[0].value.string);
- if (string == NULL)
- {
- ERROR ("dbi plugin: strdup failed.");
- return (-1);
- }
-
- if (*ret_string != NULL)
- free (*ret_string);
- *ret_string = string;
-
- return (0);
-} /* }}} int cdbi_config_set_string */
-
static int cdbi_config_add_database_driver_option (cdbi_database_t *db, /* {{{ */
oconfig_item_t *ci)
{
if ((ci->values_num != 2)
|| (ci->values[0].type != OCONFIG_TYPE_STRING)
- || (ci->values[1].type != OCONFIG_TYPE_STRING))
+ || ((ci->values[1].type != OCONFIG_TYPE_STRING)
+ && (ci->values[1].type != OCONFIG_TYPE_NUMBER)))
{
WARNING ("dbi plugin: The `DriverOption' config option "
- "needs exactly two string arguments.");
+ "needs exactly two arguments.");
return (-1);
}
db->driver_options = option;
option = db->driver_options + db->driver_options_num;
+ memset (option, 0, sizeof (*option));
option->key = strdup (ci->values[0].value.string);
if (option->key == NULL)
return (-1);
}
- option->value = strdup (ci->values[1].value.string);
- if (option->value == NULL)
+ if (ci->values[1].type == OCONFIG_TYPE_STRING)
{
- ERROR ("dbi plugin: strdup failed.");
- sfree (option->key);
- return (-1);
+ option->value.string = strdup (ci->values[1].value.string);
+ if (option->value.string == NULL)
+ {
+ ERROR ("dbi plugin: strdup failed.");
+ sfree (option->key);
+ return (-1);
+ }
+ }
+ else
+ {
+ assert (ci->values[1].type == OCONFIG_TYPE_NUMBER);
+ option->value.numeric = (int) (ci->values[1].value.number + .5);
+ option->is_numeric = 1;
}
db->driver_options_num++;
}
memset (db, 0, sizeof (*db));
- status = cdbi_config_set_string (&db->name, ci);
+ status = cf_util_get_string (ci, &db->name);
if (status != 0)
{
sfree (db);
oconfig_item_t *child = ci->children + i;
if (strcasecmp ("Driver", child->key) == 0)
- status = cdbi_config_set_string (&db->driver, child);
+ status = cf_util_get_string (child, &db->driver);
else if (strcasecmp ("DriverOption", child->key) == 0)
status = cdbi_config_add_database_driver_option (db, child);
else if (strcasecmp ("SelectDB", child->key) == 0)
- status = cdbi_config_set_string (&db->select_db, child);
+ status = cf_util_get_string (child, &db->select_db);
else if (strcasecmp ("Query", child->key) == 0)
status = udb_query_pick_from_list (child, queries, queries_num,
&db->queries, &db->queries_num);
+ else if (strcasecmp ("Host", child->key) == 0)
+ status = cf_util_get_string (child, &db->host);
else
{
WARNING ("dbi plugin: Option `%s' not allowed here.", child->key);
sstrncpy (column_names[i], column_name, DATA_MAX_NAME_LEN);
} /* }}} for (i = 0; i < column_num; i++) */
- udb_query_prepare_result (q, prep_area, hostname_g,
+ udb_query_prepare_result (q, prep_area, (db->host ? db->host : hostname_g),
/* plugin = */ "dbi", db->name,
column_names, column_num, /* interval = */ 0);
* trouble finding out how to configure the plugin correctly.. */
for (i = 0; i < db->driver_options_num; i++)
{
- DEBUG ("dbi plugin: cdbi_connect_database (%s): "
- "key = %s; value = %s;",
- db->name,
- db->driver_options[i].key,
- db->driver_options[i].value);
+ if (db->driver_options[i].is_numeric)
+ {
+ status = dbi_conn_set_option_numeric (connection,
+ db->driver_options[i].key, db->driver_options[i].value.numeric);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("dbi plugin: cdbi_connect_database (%s): "
+ "dbi_conn_set_option_numeric (\"%s\", %i) failed: %s.",
+ db->name,
+ db->driver_options[i].key, db->driver_options[i].value.numeric,
+ cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+ }
+ }
+ else
+ {
+ status = dbi_conn_set_option (connection,
+ db->driver_options[i].key, db->driver_options[i].value.string);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("dbi plugin: cdbi_connect_database (%s): "
+ "dbi_conn_set_option (\"%s\", \"%s\") failed: %s.",
+ db->name,
+ db->driver_options[i].key, db->driver_options[i].value.string,
+ cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+ }
+ }
- status = dbi_conn_set_option (connection,
- db->driver_options[i].key, db->driver_options[i].value);
if (status != 0)
{
- char errbuf[1024];
- const char *opt;
-
- ERROR ("dbi plugin: cdbi_connect_database (%s): "
- "dbi_conn_set_option (%s, %s) failed: %s.",
- db->name,
- db->driver_options[i].key, db->driver_options[i].value,
- cdbi_strerror (connection, errbuf, sizeof (errbuf)));
+ char const *opt;
INFO ("dbi plugin: This is a list of all options understood "
"by the `%s' driver:", db->driver);
return (status);
interfaces_num++;
- INFO("ethstat plugin: Registred interface %s",
+ INFO("ethstat plugin: Registered interface %s",
interfaces[interfaces_num - 1]);
return (0);
{
value_map_t *map;
int status;
+ char *key;
if ((ci->values_num < 2)
|| (ci->values_num > 3)
return (-1);
}
+ key = strdup (ci->values[0].value.string);
+ if (key == NULL)
+ {
+ ERROR ("ethstat plugin: strdup(3) failed.");
+ return (ENOMEM);
+ }
+
map = malloc (sizeof (*map));
if (map == NULL)
{
+ sfree (key);
ERROR ("ethstat plugin: malloc(3) failed.");
return (ENOMEM);
}
if (value_map == NULL)
{
sfree (map);
+ sfree (key);
ERROR ("ethstat plugin: c_avl_create() failed.");
return (-1);
}
}
status = c_avl_insert (value_map,
- /* key = */ ci->values[0].value.string,
+ /* key = */ key,
/* value = */ map);
if (status != 0)
{
- sfree (map);
if (status > 0)
- ERROR ("ethstat plugin: Multiple mappings for \"%s\".",
- ci->values[0].value.string);
+ ERROR ("ethstat plugin: Multiple mappings for \"%s\".", key);
else
- ERROR ("ethstat plugin: c_avl_insert(\"%s\") failed.",
- ci->values[0].value.string);
+ ERROR ("ethstat plugin: c_avl_insert(\"%s\") failed.", key);
+
+ sfree (map);
+ sfree (key);
return (-1);
}
return 0;
}
+static int ethstat_shutdown (void)
+{
+ void *key = NULL;
+ void *value = NULL;
+
+ if (value_map == NULL)
+ return (0);
+
+ while (c_avl_pick (value_map, &key, &value) == 0)
+ {
+ sfree (key);
+ sfree (value);
+ }
+
+ c_avl_destroy (value_map);
+ value_map = NULL;
+
+ return (0);
+}
+
void module_register (void)
{
plugin_register_complex_config ("ethstat", ethstat_config);
plugin_register_read ("ethstat", ethstat_read);
+ plugin_register_shutdown ("ethstat", ethstat_shutdown);
}
/* vim: set sw=2 sts=2 et fdm=marker : */
char *username;
char *password;
+#if HAVE_LIBGCRYPT
gcry_cipher_hd_t encr_cypher;
size_t encr_header_len;
char encr_iv[16];
+#endif
};
#define SSTRNCPY(dst,src,sz) do { \
return (result);
need_init = 0;
+#if HAVE_LIBGCRYPT
gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
if (!gcry_check_version (GCRYPT_VERSION))
result = 1;
return (1);
+#else
+ return(0);
+#endif
} /* }}} _Bool have_gcrypt */
static uint64_t htonll (uint64_t val) /* {{{ */
return (0);
} /* }}} int nb_add_value_list */
+#if HAVE_LIBGCRYPT
static int nb_add_signature (lcc_network_buffer_t *nb) /* {{{ */
{
char *buffer;
return (0);
} /* }}} int nb_add_encryption */
+#endif
/*
* Public functions
nb->ptr = nb->buffer;
nb->free = nb->size;
+#if HAVE_LIBGCRYPT
if (nb->seclevel == SIGN)
{
size_t username_len;
ADD_GENERIC (nb, hash, sizeof (hash));
assert ((nb->encr_header_len + nb->free) == nb->size);
}
+#endif
return (0);
} /* }}} int lcc_network_buffer_initialize */
if (nb == NULL)
return (EINVAL);
+#if HAVE_LIBGCRYPT
if (nb->seclevel == SIGN)
nb_add_signature (nb);
else if (nb->seclevel == ENCRYPT)
nb_add_encryption (nb);
+#endif
return (0);
} /* }}} int lcc_network_buffer_finalize */
}
}
- plugin_dispatch_values_secure (vl);
+ plugin_dispatch_values (vl);
stats_values_dispatched++;
meta_data_destroy (vl->meta);
vl.values[0].derive = (derive_t) copy_octets_rx;
vl.values[1].derive = (derive_t) copy_octets_tx;
sstrncpy (vl.type, "if_octets", sizeof (vl.type));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Packets received / send */
vl.values[0].derive = (derive_t) copy_packets_rx;
vl.values[1].derive = (derive_t) copy_packets_tx;
sstrncpy (vl.type, "if_packets", sizeof (vl.type));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Values (not) dispatched and (not) send */
sstrncpy (vl.type, "total_values", sizeof (vl.type));
vl.values[0].derive = (derive_t) copy_values_dispatched;
sstrncpy (vl.type_instance, "dispatch-accepted",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_not_dispatched;
sstrncpy (vl.type_instance, "dispatch-rejected",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_sent;
sstrncpy (vl.type_instance, "send-accepted",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_not_sent;
sstrncpy (vl.type_instance, "send-rejected",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Receive queue length */
vl.values[0].gauge = (gauge_t) copy_receive_list_length;
sstrncpy (vl.type, "queue_length", sizeof (vl.type));
vl.type_instance[0] = 0;
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
return (0);
} /* }}} int network_stats_read */
vl.values = values + i;
sstrncpy (vl.type_instance, type_instances[i],
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
}
} /* void nfs_procedures_submit */
/**
* collectd - src/plugin.c
- * Copyright (C) 2005-2011 Florian octo Forster
+ * Copyright (C) 2005-2013 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
};
typedef struct read_func_s read_func_t;
+struct write_queue_s;
+typedef struct write_queue_s write_queue_t;
+struct write_queue_s
+{
+ value_list_t *vl;
+ write_queue_t *next;
+};
+
/*
* Private variables
*/
static pthread_t *read_threads = NULL;
static int read_threads_num = 0;
+static write_queue_t *write_queue_head;
+static write_queue_t *write_queue_tail;
+static _Bool write_loop = 1;
+static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t *write_threads = NULL;
+static size_t write_threads_num = 0;
+
static pthread_key_t plugin_ctx_key;
static _Bool plugin_ctx_key_initialized = 0;
/*
* Static functions
*/
+static int plugin_dispatch_values_internal (value_list_t *vl);
+
static const char *plugin_get_dir (void)
{
if (plugindir == NULL)
read_threads_num = 0;
} /* void stop_read_threads */
+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+ if (vl == NULL)
+ return;
+
+ meta_data_destroy (vl->meta);
+ sfree (vl->values);
+ sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+ value_list_t *vl;
+
+ if (vl_orig == NULL)
+ return (NULL);
+
+ vl = malloc (sizeof (*vl));
+ if (vl == NULL)
+ return (NULL);
+ memcpy (vl, vl_orig, sizeof (*vl));
+
+ vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+ if (vl->values == NULL)
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+ memcpy (vl->values, vl_orig->values,
+ vl_orig->values_len * sizeof (*vl->values));
+
+ vl->meta = meta_data_clone (vl->meta);
+ if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+
+ return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+ write_queue_t *q;
+
+ q = malloc (sizeof (*q));
+ if (q == NULL)
+ return (ENOMEM);
+ q->next = NULL;
+
+ q->vl = plugin_value_list_clone (vl);
+ if (q->vl == NULL)
+ {
+ sfree (q);
+ return (ENOMEM);
+ }
+
+ pthread_mutex_lock (&write_lock);
+
+ if (write_queue_tail == NULL)
+ {
+ write_queue_head = q;
+ write_queue_tail = q;
+ }
+ else
+ {
+ write_queue_tail->next = q;
+ write_queue_tail = q;
+ }
+
+ pthread_cond_signal (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+ write_queue_t *q;
+ value_list_t *vl;
+
+ pthread_mutex_lock (&write_lock);
+
+ while (write_loop && (write_queue_head == NULL))
+ pthread_cond_wait (&write_cond, &write_lock);
+
+ if (write_queue_head == NULL)
+ {
+ pthread_mutex_unlock (&write_lock);
+ return (NULL);
+ }
+
+ q = write_queue_head;
+ write_queue_head = q->next;
+ if (write_queue_head == NULL)
+ write_queue_tail = NULL;
+
+ pthread_mutex_unlock (&write_lock);
+
+ vl = q->vl;
+ sfree (q);
+ return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+ while (write_loop)
+ {
+ value_list_t *vl = plugin_write_dequeue ();
+ if (vl == NULL)
+ continue;
+
+ plugin_dispatch_values_internal (vl);
+
+ plugin_value_list_free (vl);
+ }
+
+ pthread_exit (NULL);
+ return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+ size_t i;
+
+ if (write_threads != NULL)
+ return;
+
+ write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+ if (write_threads == NULL)
+ {
+ ERROR ("plugin: start_write_threads: calloc failed.");
+ return;
+ }
+
+ write_threads_num = 0;
+ for (i = 0; i < num; i++)
+ {
+ int status;
+
+ status = pthread_create (write_threads + write_threads_num,
+ /* attr = */ NULL,
+ plugin_write_thread,
+ /* arg = */ NULL);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin: start_write_threads: pthread_create failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return;
+ }
+
+ write_threads_num++;
+ } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+ write_queue_t *q;
+ int i;
+
+ if (write_threads == NULL)
+ return;
+
+ INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+ pthread_mutex_lock (&write_lock);
+ write_loop = 0;
+ DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+ pthread_cond_broadcast (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ for (i = 0; i < write_threads_num; i++)
+ {
+ if (pthread_join (write_threads[i], NULL) != 0)
+ {
+ ERROR ("plugin: stop_write_threads: pthread_join failed.");
+ }
+ write_threads[i] = (pthread_t) 0;
+ }
+ sfree (write_threads);
+ write_threads_num = 0;
+
+ pthread_mutex_lock (&write_lock);
+ i = 0;
+ for (q = write_queue_head; q != NULL; q = q->next)
+ {
+ plugin_value_list_free (q->vl);
+ sfree (q);
+ i++;
+ }
+ write_queue_head = NULL;
+ write_queue_tail = NULL;
+ pthread_mutex_unlock (&write_lock);
+
+ if (i > 0)
+ {
+ WARNING ("plugin: %i value list%s left after shutting down "
+ "the write threads.",
+ i, (i == 1) ? " was" : "s were");
+ }
+} /* }}} void stop_write_threads */
+
/*
* Public functions
*/
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
+ {
+ char const *tmp = global_option_get ("WriteThreads");
+ int num = atoi (tmp);
+
+ if (num < 1)
+ num = 5;
+
+ start_write_threads ((size_t) num);
+ }
if ((list_init == NULL) && (read_heap == NULL))
return;
plugin_set_ctx (old_ctx);
}
+ stop_write_threads ();
+
/* Write plugins which use the `user_data' pointer usually need the
* same data available to the flush callback. If this is the case, set
* the free_function to NULL when registering the flush callback and to
return (0);
} /* int }}} plugin_dispatch_missing */
-int plugin_dispatch_values (value_list_t *vl)
+static int plugin_dispatch_values_internal (value_list_t *vl)
{
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
}
return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_internal */
-int plugin_dispatch_values_secure (const value_list_t *vl)
+int plugin_dispatch_values (value_list_t const *vl)
{
- value_list_t vl_copy;
- int status;
-
- if (vl == NULL)
- return EINVAL;
-
- memcpy (&vl_copy, vl, sizeof (vl_copy));
-
- /* Write callbacks must not change the values and meta pointers, so we can
- * savely skip copying those and make this more efficient. */
- if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
- return (plugin_dispatch_values (&vl_copy));
-
- /* Set pointers to NULL, just to be on the save side. */
- vl_copy.values = NULL;
- vl_copy.meta = NULL;
-
- vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
- if (vl_copy.values == NULL)
- {
- ERROR ("plugin_dispatch_values_secure: malloc failed.");
- return (ENOMEM);
- }
- memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
-
- if (vl->meta != NULL)
- {
- vl_copy.meta = meta_data_clone (vl->meta);
- if (vl_copy.meta == NULL)
- {
- ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
- free (vl_copy.values);
- return (ENOMEM);
- }
- } /* if (vl->meta) */
-
- status = plugin_dispatch_values (&vl_copy);
+ int status;
- meta_data_destroy (vl_copy.meta);
- free (vl_copy.values);
+ status = plugin_write_enqueue (vl);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return (status);
+ }
- return (status);
-} /* int plugin_dispatch_values_secure */
+ return (0);
+}
int plugin_dispatch_notification (const notification_t *notif)
{
* `vl' Value list of the values that have been read by a `read'
* function.
*/
-int plugin_dispatch_values (value_list_t *vl);
-int plugin_dispatch_values_secure (const value_list_t *vl);
+int plugin_dispatch_values (value_list_t const *vl);
int plugin_dispatch_missing (const value_list_t *vl);
int plugin_dispatch_notification (const notification_t *notif);
if ((PGRES_COMMAND_OK != PQresultStatus (res))
&& (PGRES_TUPLES_OK != PQresultStatus (res))) {
+ PQclear (res);
+
if ((CONNECTION_OK != PQstatus (db->conn))
&& (0 == c_psql_check_connection (db))) {
- PQclear (res);
-
/* try again */
res = PQexecParams (db->conn, writer->statement,
STATIC_ARRAY_SIZE (params), NULL,
if ((PGRES_COMMAND_OK == PQresultStatus (res))
|| (PGRES_TUPLES_OK == PQresultStatus (res))) {
+ PQclear (res);
success = 1;
continue;
}
pthread_mutex_unlock (&db->db_lock);
return -1;
}
+
+ PQclear (res);
success = 1;
}
#endif
#define REDIS_DEF_HOST "localhost"
+#define REDIS_DEF_PASSWD ""
#define REDIS_DEF_PORT 6379
#define REDIS_DEF_TIMEOUT 2000
#define MAX_REDIS_NODE_NAME 64
static int redis_init (void) /* {{{ */
{
- redis_node_t rn = { "default", REDIS_DEF_HOST, REDIS_DEF_PORT,
- REDIS_DEF_TIMEOUT, /* next = */ NULL };
+ redis_node_t rn = { "default", REDIS_DEF_HOST, REDIS_DEF_PASSWD,
+ REDIS_DEF_PORT, REDIS_DEF_TIMEOUT, /* next = */ NULL };
if (nodes_head == NULL)
redis_node_add (&rn);
temperature value:GAUGE:-273.15:U
threads value:GAUGE:0:U
time_dispersion value:GAUGE:-1000000:1000000
-timeleft value:GAUGE:0:3600
+timeleft value:GAUGE:0:U
time_offset value:GAUGE:-1000000:1000000
total_bytes value:DERIVE:0:U
total_connections value:DERIVE:0:U
/**
* collectd - src/varnish.c
- * Copyright (C) 2010 Jérôme Renard
- * Copyright (C) 2010 Marc Fournier
- * Copyright (C) 2010 Florian Forster
+ * Copyright (C) 2010 Jérôme Renard
+ * Copyright (C) 2010 Marc Fournier
+ * Copyright (C) 2010-2012 Florian Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Authors:
* Jérôme Renard <jerome.renard at gmail.com>
* Marc Fournier <marc.fournier at camptocamp.com>
- * Florian octo Forster <octo at verplant.org>
+ * Florian octo Forster <octo at collectd.org>
**/
/**
vd = VSM_New();
VSC_Setup(vd);
- if (VSM_n_Arg(vd, conf->instance) == -1)
+
+ if (conf->instance != NULL)
{
- ERROR ("Varnish plugin : unable to load statistics from instance");
- return (-1);
+ int status;
+
+ status = VSM_n_Arg (vd, conf->instance);
+ if (status < 0)
+ {
+ ERROR ("varnish plugin: VSM_n_Arg (\"%s\") failed "
+ "with status %i.",
+ conf->instance, status);
+ return (-1);
+ }
}
+
if (VSC_Open (vd, /* diag = */ 1))
{
ERROR ("varnish plugin: Unable to load statistics.");
za_read_derive (ksp, "stolen", "cache_operation", "stolen");
/* Issue indicators */
- za_read_derive (ksp, "mutex_miss", "mutex_operation", "miss");
+ za_read_derive (ksp, "mutex_miss", "mutex_operations", "miss");
za_read_derive (ksp, "hash_collisions", "hash_collisions", "");
/* Evictions */