X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=cd992044b2c199f07483b3807b7157986c411bed;hb=a2e732f8fc1673f0f30ac93aefab32d757ea17c3;hp=6c7aa057cf2d272b126eadf6308a76188d3f3b1a;hpb=1894607dc4c03ce20db7c5256c97890aa70bcaa5;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 6c7aa057..cd992044 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -1,19 +1,24 @@ /** * collectd - src/plugin.c - * Copyright (C) 2005-2013 Florian octo Forster + * Copyright (C) 2005-2014 Florian octo Forster * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; only version 2 of the License is applicable. + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. * * Authors: * Florian octo Forster @@ -99,6 +104,9 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; +#ifndef DEFAULT_MAX_READ_INTERVAL +# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400) +#endif static c_heap_t *read_heap = NULL; static llist_t *read_list; static int read_loop = 1; @@ -106,6 +114,7 @@ static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; static int read_threads_num = 0; +static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL; static write_queue_t *write_queue_head; static write_queue_t *write_queue_tail; @@ -122,6 +131,9 @@ static _Bool plugin_ctx_key_initialized = 0; static long write_limit_high = 0; static long write_limit_low = 0; +static derive_t stats_values_dropped = 0; +static _Bool record_statistics = 0; + /* * Static functions */ @@ -135,6 +147,52 @@ static const char *plugin_get_dir (void) return (plugindir); } +static void plugin_update_internal_statistics (void) { /* {{{ */ + derive_t copy_write_queue_length; + value_list_t vl = VALUE_LIST_INIT; + value_t values[2]; + + copy_write_queue_length = write_queue_length; + + /* Initialize `vl' */ + vl.values = values; + vl.values_len = 2; + vl.time = 0; + sstrncpy (vl.host, hostname_g, sizeof (vl.host)); + sstrncpy (vl.plugin, "collectd", sizeof (vl.plugin)); + + vl.type_instance[0] = 0; + vl.values_len = 1; + + /* Write queue */ + sstrncpy (vl.plugin_instance, "write_queue", + sizeof (vl.plugin_instance)); + + /* Write queue : queue length */ + vl.values[0].gauge = (gauge_t) copy_write_queue_length; + sstrncpy (vl.type, "queue_length", sizeof (vl.type)); + vl.type_instance[0] = 0; + plugin_dispatch_values (&vl); + + /* Write queue : Values dropped (queue length > low limit) */ + vl.values[0].derive = (derive_t) stats_values_dropped; + sstrncpy (vl.type, "derive", sizeof (vl.type)); + sstrncpy (vl.type_instance, "dropped", sizeof (vl.type_instance)); + plugin_dispatch_values (&vl); + + /* Cache */ + sstrncpy (vl.plugin_instance, "cache", + sizeof (vl.plugin_instance)); + + /* Cache : Nb entry in cache tree */ + vl.values[0].gauge = (gauge_t) uc_get_size(); + sstrncpy (vl.type, "cache_size", sizeof (vl.type)); + vl.type_instance[0] = 0; + plugin_dispatch_values (&vl); + + return; +} /* }}} void plugin_update_internal_statistics */ + static void destroy_callback (callback_func_t *cf) /* {{{ */ { if (cf == NULL) @@ -478,8 +536,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) if (status != 0) { rf->rf_effective_interval *= 2; - if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400)) - rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400); + if (rf->rf_effective_interval > max_read_interval) + rf->rf_effective_interval = max_read_interval; NOTICE ("read-function of plugin `%s' failed. " "Will suspend it for %.3f seconds.", @@ -1463,6 +1521,9 @@ void plugin_init_all (void) /* Init the value cache */ uc_init (); + if (IS_TRUE (global_option_get ("CollectInternalStats"))) + record_statistics = 1; + chain_name = global_option_get ("PreCacheChain"); pre_cache_chain = fc_chain_get_by_name (chain_name); @@ -1536,11 +1597,15 @@ void plugin_init_all (void) le = le->next; } + max_read_interval = global_option_get_time ("MaxReadInterval", + DEFAULT_MAX_READ_INTERVAL); + /* Start read-threads */ if (read_heap != NULL) { const char *rt; int num; + rt = global_option_get ("ReadThreads"); num = atoi (rt); if (num != -1) @@ -1551,6 +1616,9 @@ void plugin_init_all (void) /* TODO: Rename this function. */ void plugin_read_all (void) { + if(record_statistics) { + plugin_update_internal_statistics (); + } uc_check_timeout (); return; @@ -2072,9 +2140,16 @@ static _Bool check_drop_value (void) /* {{{ */ int plugin_dispatch_values (value_list_t const *vl) { int status; + static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER; - if (check_drop_value ()) + if (check_drop_value ()) { + if(record_statistics) { + pthread_mutex_lock(&statistics_lock); + stats_values_dropped++; + pthread_mutex_unlock(&statistics_lock); + } return (0); + } status = plugin_write_enqueue (vl); if (status != 0) @@ -2089,6 +2164,65 @@ int plugin_dispatch_values (value_list_t const *vl) return (0); } +__attribute__((sentinel)) +int plugin_dispatch_multivalue (value_list_t const *template, /* {{{ */ + _Bool store_percentage, ...) +{ + value_list_t *vl; + int failed = 0; + gauge_t sum = 0.0; + va_list ap; + + assert (template->values_len == 1); + + va_start (ap, store_percentage); + while (42) + { + char const *name; + gauge_t value; + + name = va_arg (ap, char const *); + if (name == NULL) + break; + + value = va_arg (ap, gauge_t); + if (!isnan (value)) + sum += value; + } + va_end (ap); + + vl = plugin_value_list_clone (template); + /* plugin_value_list_clone makes sure vl->time is set to non-zero. */ + if (store_percentage) + sstrncpy (vl->type, "percent", sizeof (vl->type)); + + va_start (ap, store_percentage); + while (42) + { + char const *name; + int status; + + /* Set the type instance. */ + name = va_arg (ap, char const *); + if (name == NULL) + break; + sstrncpy (vl->type_instance, name, sizeof (vl->type_instance)); + + /* Set the value. */ + vl->values[0].gauge = va_arg (ap, gauge_t); + if (store_percentage) + vl->values[0].gauge *= 100.0 / sum; + + status = plugin_write_enqueue (vl); + if (status != 0) + failed++; + } + va_end (ap); + + plugin_value_list_free (vl); + return (failed); +} /* }}} int plugin_dispatch_multivalue */ + int plugin_dispatch_notification (const notification_t *notif) { llentry_t *le;