Support for call the flush callback at regular intervals using
authorManuel Luis Sanmartín Rozada <manuel.luis@gmail.com>
Sun, 7 Jun 2015 22:49:20 +0000 (00:49 +0200)
committerManuel Luis Sanmartín Rozada <manuel.luis@gmail.com>
Sun, 7 Jun 2015 22:49:20 +0000 (00:49 +0200)
the read plugin callback.

src/collectd.conf.pod
src/daemon/configfile.c
src/daemon/plugin.c
src/daemon/plugin.h

index 1884914..05ae8f3 100644 (file)
@@ -130,6 +130,15 @@ Sets a plugin-specific interval for collecting metrics. This overrides the
 global B<Interval> setting. If a plugin provides own support for specifying an
 interval, that setting will take precedence.
 
+=item B<FlushInterval> I<Seconds>
+
+Specifies the the interval, in seconds, to call the flush callback if it's
+defined in this plugin. By default, this is disabled
+
+=item B<FlushTimeout> I<Seconds>
+
+Specifies the value of the timeout argument of the flush callback.
+
 =back
 
 =item B<AutoLoadPlugin> B<false>|B<true>
index 02fd96f..dde16ca 100644 (file)
@@ -287,12 +287,29 @@ static int dispatch_loadplugin (const oconfig_item_t *ci)
        /* default to the global interval set before loading this plugin */
        memset (&ctx, 0, sizeof (ctx));
        ctx.interval = cf_get_default_interval ();
+       ctx.flush_interval = 0;
+       ctx.flush_timeout = 0;
 
-       for (i = 0; i < ci->children_num; ++i) {
-               if (strcasecmp("Globals", ci->children[i].key) == 0)
-                       cf_util_get_flag (ci->children + i, &flags, PLUGIN_FLAGS_GLOBAL);
-               else if (strcasecmp ("Interval", ci->children[i].key) == 0) {
-                       if (cf_util_get_cdtime (ci->children + i, &ctx.interval) != 0) {
+       for (i = 0; i < ci->children_num; ++i)
+       {
+               oconfig_item_t *child = ci->children + i;
+
+               if (strcasecmp("Globals", child->key) == 0)
+                       cf_util_get_flag (child, &flags, PLUGIN_FLAGS_GLOBAL);
+               else if (strcasecmp ("Interval", child->key) == 0) {
+                       if (cf_util_get_cdtime (child, &ctx.interval) != 0) {
+                               /* cf_util_get_cdtime will log an error */
+                               continue;
+                       }
+               }
+               else if (strcasecmp ("FlushInterval", child->key) == 0) {
+                       if (cf_util_get_cdtime (child, &ctx.flush_interval) != 0) {
+                               /* cf_util_get_cdtime will log an error */
+                               continue;
+                       }
+               }
+               else if (strcasecmp ("FlushTimeout", child->key) == 0) {
+                       if (cf_util_get_cdtime (child, &ctx.flush_timeout) != 0) {
                                /* cf_util_get_cdtime will log an error */
                                continue;
                        }
@@ -300,7 +317,7 @@ static int dispatch_loadplugin (const oconfig_item_t *ci)
                else {
                        WARNING("Ignoring unknown LoadPlugin option \"%s\" "
                                        "for plugin \"%s\"",
-                                       ci->children[i].key, ci->values[0].value.string);
+                                       child->key, ci->values[0].value.string);
                }
        }
 
index b3cb97f..0ef6273 100644 (file)
@@ -80,6 +80,12 @@ struct write_queue_s
        write_queue_t *next;
 };
 
+struct flush_callback_s {
+       char *name;
+       cdtime_t timeout;
+};
+typedef struct flush_callback_s flush_callback_t;
+
 /*
  * Private variables
  */
@@ -1295,11 +1301,99 @@ int plugin_register_write (const char *name,
                                (void *) callback, ud));
 } /* int plugin_register_write */
 
+static int plugin_flush_timeout_callback (user_data_t *ud)
+{
+       flush_callback_t *cb = ud->data;
+
+       return plugin_flush (cb->name, cb->timeout, /* identifier = */ NULL);
+} /* static int plugin_flush_callback */
+
+static void plugin_flush_timeout_callback_free (void *data)
+{
+       flush_callback_t *cb = data;
+
+       if (cb == NULL) return;
+
+       sfree(cb->name);
+       sfree(cb);
+} /* static void plugin_flush_callback_free */
+
 int plugin_register_flush (const char *name,
                plugin_flush_cb callback, user_data_t *ud)
 {
-       return (create_register_callback (&list_flush, name,
-                               (void *) callback, ud));
+       int status;
+       plugin_ctx_t ctx = plugin_get_ctx ();
+
+       status = create_register_callback (&list_flush, name,
+               (void *) callback, ud);
+       if (status != 0)
+               return status;
+
+       if (ctx.flush_interval != 0)
+       {
+               char *flush_prefix = "flush/";
+               size_t prefix_size;
+               char *flush_name;
+               size_t name_size;
+               user_data_t ud;
+               flush_callback_t *cb;
+
+               prefix_size = strlen(flush_prefix);
+               name_size = strlen(name);
+
+               flush_name = (char *) malloc (sizeof (char) *
+                       (name_size + prefix_size + 1));
+               if (flush_name == NULL)
+               {
+                       ERROR ("plugin_register_flush: malloc failed.");
+                       plugin_unregister (list_flush, name);
+                       return (-1);
+               }
+
+               sstrncpy (flush_name, flush_prefix, prefix_size + 1);
+               sstrncpy (flush_name + prefix_size, name, name_size + 1);
+
+               cb = (flush_callback_t *)malloc(sizeof(flush_callback_t));
+               if (cb == NULL)
+               {
+                       ERROR ("plugin_register_flush: malloc failed.");
+                       sfree(flush_name);
+                       plugin_unregister (list_flush, name);
+                       return (-1);
+               }
+
+               cb->name = strdup (name);
+               if (cb->name == NULL)
+               {
+                       ERROR ("plugin_register_flush: strdup failed.");
+                       sfree(cb);
+                       sfree(flush_name);
+                       plugin_unregister (list_flush, name);
+                       return (-1);
+               }
+               cb->timeout = ctx.flush_timeout;
+
+               ud.data = cb;
+               ud.free_func = plugin_flush_timeout_callback_free;
+
+               status = plugin_register_complex_read (
+                       /* group     = */ "flush",
+                       /* name      = */ flush_name,
+                       /* callback  = */ plugin_flush_timeout_callback,
+                       /* interval  = */ ctx.flush_interval,
+                       /* user data = */ &ud);
+
+               sfree(flush_name);
+               if (status != 0)
+               {
+                       sfree(cb->name);
+                       sfree(cb);
+                       plugin_unregister (list_flush, name);
+                       return status;
+               }
+       }
+
+       return 0;
 } /* int plugin_register_flush */
 
 int plugin_register_missing (const char *name,
@@ -1518,7 +1612,34 @@ int plugin_unregister_write (const char *name)
 
 int plugin_unregister_flush (const char *name)
 {
-       return (plugin_unregister (list_flush, name));
+       plugin_ctx_t ctx = plugin_get_ctx ();
+
+       if (ctx.flush_interval != 0)
+        {
+               char *flush_prefix = "flush/";
+               size_t prefix_size;
+               char *flush_name;
+               size_t name_size;
+
+               prefix_size = strlen(flush_prefix);
+               name_size = strlen(name);
+
+               flush_name = (char *) malloc (sizeof (char) *
+                       (name_size + prefix_size + 1));
+               if (flush_name == NULL)
+               {
+                       ERROR ("plugin_unregister_flush: malloc failed.");
+                       return (-1);
+               }
+
+               sstrncpy (flush_name, flush_prefix, prefix_size + 1);
+               sstrncpy (flush_name + prefix_size, name, name_size + 1);
+
+               plugin_unregister_read(flush_name);
+               sfree(flush_name);
+       }
+
+       return plugin_unregister (list_flush, name);
 }
 
 int plugin_unregister_missing (const char *name)
index 2e20da4..daea4fc 100644 (file)
@@ -177,6 +177,8 @@ typedef struct user_data_s user_data_t;
 struct plugin_ctx_s
 {
        cdtime_t interval;
+       cdtime_t flush_interval;
+       cdtime_t flush_timeout;
 };
 typedef struct plugin_ctx_s plugin_ctx_t;