/**
* collectd - src/plugin.c
- * Copyright (C) 2005-2009 Florian octo Forster
+ * Copyright (C) 2005-2011 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* Authors:
- * Florian octo Forster <octo at verplant.org>
+ * Florian octo Forster <octo at collectd.org>
* Sebastian Harl <sh at tokkee.org>
**/
#include "utils_llist.h"
#include "utils_heap.h"
#include "utils_cache.h"
-#include "utils_threshold.h"
#include "filter_chain.h"
/*
#define rf_callback rf_super.cf_callback
#define rf_udata rf_super.cf_udata
callback_func_t rf_super;
+ char rf_group[DATA_MAX_NAME_LEN];
char rf_name[DATA_MAX_NAME_LEN];
int rf_type;
struct timespec rf_interval;
static llist_t *list_init;
static llist_t *list_write;
static llist_t *list_flush;
+static llist_t *list_missing;
static llist_t *list_shutdown;
static llist_t *list_log;
static llist_t *list_notification;
{
callback_func_t *cf;
- cf = c_head_get_root (read_heap);
+ cf = c_heap_get_root (read_heap);
if (cf == NULL)
break;
*list = llist_create ();
if (*list == NULL)
{
- ERROR ("plugin: create_register_callback: "
+ ERROR ("plugin: register_callback: "
"llist_create failed.");
destroy_callback (cf);
return (-1);
key = strdup (name);
if (key == NULL)
{
- ERROR ("plugin: create_register_callback: strdup failed.");
+ ERROR ("plugin: register_callback: strdup failed.");
destroy_callback (cf);
return (-1);
}
le = llentry_create (key, cf);
if (le == NULL)
{
- ERROR ("plugin: create_register_callback: "
+ ERROR ("plugin: register_callback: "
"llentry_create failed.");
free (key);
destroy_callback (cf);
old_cf = le->value;
le->value = cf;
+ WARNING ("plugin: register_callback: "
+ "a callback named `%s' already exists - "
+ "overwriting the old entry!", name);
+
destroy_callback (old_cf);
sfree (key);
}
* object, but it will bitch about a shared object not having a
* ``module_register'' symbol..
*/
-static int plugin_load_file (char *file)
+static int plugin_load_file (char *file, uint32_t flags)
{
lt_dlhandle dlh;
void (*reg_handle) (void);
lt_dlinit ();
lt_dlerror (); /* clear errors */
- /* XXX BUG FIXME */
- if (strstr(file, "python") != NULL) {
+#if LIBTOOL_VERSION == 2
+ if (flags & PLUGIN_FLAGS_GLOBAL) {
lt_dladvise advise;
lt_dladvise_init(&advise);
lt_dladvise_global(&advise);
} else {
dlh = lt_dlopen (file);
}
+#else /* if LIBTOOL_VERSION == 1 */
+ if (flags & PLUGIN_FLAGS_GLOBAL)
+ ERROR ("plugin_load_file: The global flag is not supported, "
+ "libtool 2 is required for this.");
+ dlh = lt_dlopen (file);
+#endif
+
if (dlh == NULL)
{
const char *error = lt_dlerror ();
return (0);
}
+static _Bool timeout_reached(struct timespec timeout)
+{
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000));
+}
+
static void *plugin_read_thread (void __attribute__((unused)) *args)
{
while (read_loop != 0)
{
read_func_t *rf;
- struct timeval now;
+ cdtime_t now;
int status;
int rf_type;
+ int rc;
/* Get the read function that needs to be read next. */
- rf = c_head_get_root (read_heap);
+ rf = c_heap_get_root (read_heap);
if (rf == NULL)
{
struct timespec abstime;
- gettimeofday (&now, /* timezone = */ NULL);
+ now = cdtime ();
- abstime.tv_sec = now.tv_sec + interval_g;
- abstime.tv_nsec = 1000 * now.tv_usec;
+ CDTIME_T_TO_TIMESPEC (now + interval_g, &abstime);
pthread_mutex_lock (&read_lock);
pthread_cond_timedwait (&read_cond, &read_lock,
if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
{
- gettimeofday (&now, /* timezone = */ NULL);
+ now = cdtime ();
- rf->rf_interval.tv_sec = interval_g;
- rf->rf_interval.tv_nsec = 0;
+ CDTIME_T_TO_TIMESPEC (interval_g, &rf->rf_interval);
rf->rf_effective_interval = rf->rf_interval;
- rf->rf_next_read.tv_sec = now.tv_sec;
- rf->rf_next_read.tv_nsec = 1000 * now.tv_usec;
+ CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
}
/* sleep until this entry is due,
* using pthread_cond_timedwait */
pthread_mutex_lock (&read_lock);
- pthread_cond_timedwait (&read_cond, &read_lock,
+ /* In pthread_cond_timedwait, spurious wakeups are possible
+ * (and really happen, at least on NetBSD with > 1 CPU), thus
+ * we need to re-evaluate the condition every time
+ * pthread_cond_timedwait returns. */
+ rc = 0;
+ while ((read_loop != 0)
+ && !timeout_reached(rf->rf_next_read)
+ && rc == 0)
+ {
+ rc = pthread_cond_timedwait (&read_cond, &read_lock,
&rf->rf_next_read);
- /* Must hold `real_lock' when accessing `rf->rf_type'. */
+ }
+
+ /* Must hold `read_lock' when accessing `rf->rf_type'. */
rf_type = rf->rf_type;
pthread_mutex_unlock (&read_lock);
}
/* update the ``next read due'' field */
- gettimeofday (&now, /* timezone = */ NULL);
+ now = cdtime ();
DEBUG ("plugin_read_thread: Effective interval of the "
"%s plugin is %i.%09i.",
NORMALIZE_TIMESPEC (rf->rf_next_read);
/* Check, if `rf_next_read' is in the past. */
- if ((rf->rf_next_read.tv_sec < now.tv_sec)
- || ((rf->rf_next_read.tv_sec == now.tv_sec)
- && (rf->rf_next_read.tv_nsec < (1000 * now.tv_usec))))
+ if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now)
{
/* `rf_next_read' is in the past. Insert `now'
* so this value doesn't trail off into the
* past too much. */
- rf->rf_next_read.tv_sec = now.tv_sec;
- rf->rf_next_read.tv_nsec = 1000 * now.tv_usec;
+ CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read);
}
DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.",
}
#define BUFSIZE 512
-int plugin_load (const char *type)
+int plugin_load (const char *type, uint32_t flags)
{
DIR *dh;
const char *dir;
continue;
}
- if (plugin_load_file (filename) == 0)
+ if (plugin_load_file (filename, flags) == 0)
{
/* success */
ret = 0;
}
}
+ le = llist_search (read_list, rf->rf_name);
+ if (le != NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ WARNING ("The read function \"%s\" is already registered. "
+ "Check for duplicate \"LoadPlugin\" lines "
+ "in your configuration!",
+ rf->rf_name);
+ return (EINVAL);
+ }
+
le = llentry_create (rf->rf_name, rf);
if (le == NULL)
{
int (*callback) (void))
{
read_func_t *rf;
+ int status;
- rf = (read_func_t *) malloc (sizeof (read_func_t));
+ rf = malloc (sizeof (*rf));
if (rf == NULL)
{
- char errbuf[1024];
- ERROR ("plugin_register_read: malloc failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- return (-1);
+ ERROR ("plugin_register_read: malloc failed.");
+ return (ENOMEM);
}
memset (rf, 0, sizeof (read_func_t));
rf->rf_callback = (void *) callback;
rf->rf_udata.data = NULL;
rf->rf_udata.free_func = NULL;
+ rf->rf_group[0] = '\0';
sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
rf->rf_type = RF_SIMPLE;
rf->rf_interval.tv_sec = 0;
rf->rf_interval.tv_nsec = 0;
rf->rf_effective_interval = rf->rf_interval;
- return (plugin_insert_read (rf));
+ status = plugin_insert_read (rf);
+ if (status != 0)
+ sfree (rf);
+
+ return (status);
} /* int plugin_register_read */
-int plugin_register_complex_read (const char *name,
+int plugin_register_complex_read (const char *group, const char *name,
plugin_read_cb callback,
const struct timespec *interval,
user_data_t *user_data)
{
read_func_t *rf;
+ int status;
- rf = (read_func_t *) malloc (sizeof (read_func_t));
+ rf = malloc (sizeof (*rf));
if (rf == NULL)
{
ERROR ("plugin_register_complex_read: malloc failed.");
- return (-1);
+ return (ENOMEM);
}
memset (rf, 0, sizeof (read_func_t));
rf->rf_callback = (void *) callback;
+ if (group != NULL)
+ sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
+ else
+ rf->rf_group[0] = '\0';
sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
rf->rf_type = RF_COMPLEX;
if (interval != NULL)
rf->rf_udata = *user_data;
}
- return (plugin_insert_read (rf));
+ status = plugin_insert_read (rf);
+ if (status != 0)
+ sfree (rf);
+
+ return (status);
} /* int plugin_register_complex_read */
int plugin_register_write (const char *name,
(void *) callback, ud));
} /* int plugin_register_flush */
+int plugin_register_missing (const char *name,
+ plugin_missing_cb callback, user_data_t *ud)
+{
+ return (create_register_callback (&list_missing, name,
+ (void *) callback, ud));
+} /* int plugin_register_missing */
+
int plugin_register_shutdown (char *name,
int (*callback) (void))
{
return (0);
} /* }}} int plugin_unregister_read */
+static int compare_read_func_group (llentry_t *e, void *ud) /* {{{ */
+{
+ read_func_t *rf = e->value;
+ char *group = ud;
+
+ return strcmp (rf->rf_group, (const char *)group);
+} /* }}} int compare_read_func_group */
+
+int plugin_unregister_read_group (const char *group) /* {{{ */
+{
+ llentry_t *le;
+ read_func_t *rf;
+
+ int found = 0;
+
+ if (group == NULL)
+ return (-ENOENT);
+
+ pthread_mutex_lock (&read_lock);
+
+ if (read_list == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ return (-ENOENT);
+ }
+
+ while (42)
+ {
+ le = llist_search_custom (read_list,
+ compare_read_func_group, (void *)group);
+
+ if (le == NULL)
+ break;
+
+ ++found;
+
+ llist_remove (read_list, le);
+
+ rf = le->value;
+ assert (rf != NULL);
+ rf->rf_type = RF_REMOVE;
+
+ llentry_destroy (le);
+
+ DEBUG ("plugin_unregister_read_group: "
+ "Marked `%s' (group `%s') for removal.",
+ rf->rf_name, group);
+ }
+
+ pthread_mutex_unlock (&read_lock);
+
+ if (found == 0)
+ {
+ WARNING ("plugin_unregister_read_group: No such "
+ "group of read function: %s", group);
+ return (-ENOENT);
+ }
+
+ return (0);
+} /* }}} int plugin_unregister_read_group */
+
int plugin_unregister_write (const char *name)
{
return (plugin_unregister (list_write, name));
return (plugin_unregister (list_flush, name));
}
+int plugin_unregister_missing (const char *name)
+{
+ return (plugin_unregister (list_missing, name));
+}
+
int plugin_unregister_shutdown (const char *name)
{
return (plugin_unregister (list_shutdown, name));
{
read_func_t *rf;
- rf = c_head_get_root (read_heap);
+ rf = c_heap_get_root (read_heap);
if (rf == NULL)
break;
return (status);
} /* }}} int plugin_write */
-int plugin_flush (const char *plugin, int timeout, const char *identifier)
+int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
{
llentry_t *le;
destroy_read_heap ();
- plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,
+ plugin_flush (/* plugin = */ NULL,
+ /* timeout = */ 0,
/* identifier = */ NULL);
le = NULL;
* the real free function when registering the write callback. This way
* the data isn't freed twice. */
destroy_all_callbacks (&list_flush);
+ destroy_all_callbacks (&list_missing);
destroy_all_callbacks (&list_write);
destroy_all_callbacks (&list_notification);
destroy_all_callbacks (&list_log);
} /* void plugin_shutdown_all */
+int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
+{
+ llentry_t *le;
+
+ if (list_missing == NULL)
+ return (0);
+
+ le = llist_head (list_missing);
+ while (le != NULL)
+ {
+ callback_func_t *cf;
+ plugin_missing_cb callback;
+ int status;
+
+ cf = le->value;
+ callback = cf->cf_callback;
+
+ status = (*callback) (vl, &cf->cf_udata);
+ if (status != 0)
+ {
+ if (status < 0)
+ {
+ ERROR ("plugin_dispatch_missing: Callback function \"%s\" "
+ "failed with status %i.",
+ le->key, status);
+ return (status);
+ }
+ else
+ {
+ return (0);
+ }
+ }
+
+ le = le->next;
+ }
+ return (0);
+} /* int }}} plugin_dispatch_missing */
+
int plugin_dispatch_values (value_list_t *vl)
{
int status;
if (c_avl_get (data_sets, vl->type, (void *) &ds) != 0)
{
- INFO ("plugin_dispatch_values: Dataset not found: %s", vl->type);
+ char ident[6 * DATA_MAX_NAME_LEN];
+
+ FORMAT_VL (ident, sizeof (ident), vl);
+ INFO ("plugin_dispatch_values: Dataset not found: %s "
+ "(from \"%s\"), check your types.db!",
+ vl->type, ident);
return (-1);
}
if (vl->time == 0)
- vl->time = time (NULL);
+ vl->time = cdtime ();
if (vl->interval <= 0)
vl->interval = interval_g;
- DEBUG ("plugin_dispatch_values: time = %u; interval = %i; "
+ DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
"host = %s; "
"plugin = %s; plugin_instance = %s; "
"type = %s; type_instance = %s;",
- (unsigned int) vl->time, vl->interval,
+ CDTIME_T_TO_DOUBLE (vl->time),
+ CDTIME_T_TO_DOUBLE (vl->interval),
vl->host,
vl->plugin, vl->plugin_instance,
vl->type, vl->type_instance);
/* Update the value cache */
uc_update (ds, vl);
- /* Initiate threshold checking */
- ut_check_threshold (ds, vl);
-
if (post_cache_chain != NULL)
{
status = fc_process_chain (ds, vl, post_cache_chain);
/* Possible TODO: Add flap detection here */
DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; "
- "time = %u; host = %s;",
+ "time = %.3f; host = %s;",
notif->severity, notif->message,
- (unsigned int) notif->time, notif->host);
+ CDTIME_T_TO_DOUBLE (notif->time), notif->host);
/* Nobody cares for notifications */
if (list_notification == NULL)
va_list ap;
llentry_t *le;
- if (list_log == NULL)
- return;
-
#if !COLLECT_DEBUG
if (level >= LOG_DEBUG)
return;
msg[sizeof (msg) - 1] = '\0';
va_end (ap);
+ if (list_log == NULL)
+ {
+ fprintf (stderr, "%s\n", msg);
+ return;
+ }
+
le = llist_head (list_log);
while (le != NULL)
{
}
case NM_TYPE_BOOLEAN:
{
- meta->nm_value.nm_boolean = *((bool *) value);
+ meta->nm_value.nm_boolean = *((_Bool *) value);
break;
}
default:
int plugin_notification_meta_add_boolean (notification_t *n,
const char *name,
- bool value)
+ _Bool value)
{
return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value));
}