collectd, rrdtool plugin: Add flushing of specific identifiers.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Tue, 13 May 2008 10:53:11 +0000 (12:53 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Tue, 13 May 2008 10:53:57 +0000 (12:53 +0200)
The flush callbacks have been changed to expect an (optional)
`const char *instance'. If not NULL, *only* that value should be flushed.

The network and perl plugins don't follow this rule yet, but will in the
not so far future - hopefully ;)

src/network.c
src/perl.c
src/plugin.c
src/plugin.h
src/rrdtool.c

index 4e1504f..cf01b71 100644 (file)
@@ -1757,7 +1757,8 @@ static int network_init (void)
        return (0);
 } /* int network_init */
 
-static int network_flush (int timeout)
+/* TODO: Implement flushing of single items. */
+static int network_flush (int timeout, const char *itentifier)
 {
        pthread_mutex_lock (&send_buffer_lock);
 
index 66cac7e..43f9197 100644 (file)
@@ -1264,7 +1264,8 @@ static int perl_notify (const notification_t *notif)
        return pplugin_call_all (aTHX_ PLUGIN_NOTIF, notif);
 } /* static int perl_notify (const notification_t *) */
 
-static int perl_flush (const int timeout)
+/* TODO: Implement flushing of single identifiers. */
+static int perl_flush (int timeout, const char *identifier)
 {
        dTHX;
 
index dfab52c..cbd5477 100644 (file)
@@ -441,7 +441,8 @@ int plugin_register_write (const char *name,
        return (register_callback (&list_write, name, (void *) callback));
 } /* int plugin_register_write */
 
-int plugin_register_flush (const char *name, int (*callback) (const int))
+int plugin_register_flush (const char *name,
+               int (*callback) (const int timeout, const char *identifier))
 {
        return (register_callback (&list_flush, name, (void *) callback));
 } /* int plugin_register_flush */
@@ -694,6 +695,28 @@ void plugin_flush_all (int timeout)
        }
 } /* void plugin_flush_all */
 
+int plugin_flush (const char *plugin, int timeout, const char *identifier)
+{
+  int (*callback) (int timeout, const char *identifier);
+  llentry_t *le;
+
+  if (list_flush == NULL)
+    return (0);
+
+  le = llist_head (list_flush);
+  while (le != NULL)
+  {
+    if ((plugin != NULL)
+       && (strcmp (plugin, le->key) != 0))
+      continue;
+
+    callback = (int (*) (int, const char *)) le->value;
+    le = le->next;
+
+    (*callback) (timeout, identifier);
+  }
+} /* int plugin_flush */
+
 void plugin_shutdown_all (void)
 {
        int (*callback) (void);
index 488e041..ac389c6 100644 (file)
@@ -151,10 +151,11 @@ int plugin_load (const char *name);
 
 void plugin_init_all (void);
 void plugin_read_all (void);
-void plugin_flush_all (int timeout);
 void plugin_shutdown_all (void);
 
+void plugin_flush_all (int timeout);
 int plugin_flush_one (int timeout, const char *name);
+int plugin_flush (const char *plugin, int timeout, const char *identifier);
 
 /*
  * The `plugin_register_*' functions are used to make `config', `init',
@@ -173,7 +174,7 @@ int plugin_register_read (const char *name,
 int plugin_register_write (const char *name,
                int (*callback) (const data_set_t *ds, const value_list_t *vl));
 int plugin_register_flush (const char *name,
-               int (*callback) (const int));
+               int (*callback) (const int timeout, const char *identifier));
 int plugin_register_shutdown (char *name,
                int (*callback) (void));
 int plugin_register_data_set (const data_set_t *ds);
index 29e8a7d..6b9c540 100644 (file)
@@ -766,6 +766,55 @@ static void rrd_cache_flush (int timeout)
        cache_flush_last = now;
 } /* void rrd_cache_flush */
 
+static int rrd_cache_flush_identifier (int timeout, const char *identifier)
+{
+  rrd_cache_t *rc;
+  time_t now;
+  int status;
+  char *key;
+  size_t key_size;
+
+  if (identifier == NULL)
+  {
+    rrd_cache_flush (timeout);
+    return (0);
+  }
+
+  now = time (NULL);
+
+  key_size = strlen (identifier + 5) * sizeof (char);
+  key = (char *) malloc (key_size);
+  if (key == NULL)
+  {
+    ERROR ("rrdtool plugin: rrd_cache_flush_identifier: malloc failed.");
+    return (-1);
+  }
+  snprintf (key, key_size, "%s.rrd", identifier);
+  key[key_size - 1] = 0;
+
+  status = c_avl_get (cache, key, (void *) &rc);
+  if (status != 0)
+  {
+    WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
+       "c_avl_get (%s) failed. Does that file really exist?",
+       key);
+    return (status);
+  }
+
+  if (rc->flags == FLAG_QUEUED)
+    status = 0;
+  else if ((now - rc->first_value) < timeout)
+    status = 0;
+  else if (rc->values_num > 0)
+  {
+    status = rrd_queue_cache_entry (key);
+    if (status == 0)
+      rc->flags = FLAG_QUEUED;
+  }
+
+  return (status);
+} /* int rrd_cache_flush_identifier */
+
 static int rrd_cache_insert (const char *filename,
                const char *value, time_t value_time)
 {
@@ -940,7 +989,7 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl)
        return (status);
 } /* int rrd_write */
 
-static int rrd_flush (const int timeout)
+static int rrd_flush (int timeout, const char *identifier)
 {
        pthread_mutex_lock (&cache_lock);
 
@@ -949,7 +998,8 @@ static int rrd_flush (const int timeout)
                return (0);
        }
 
-       rrd_cache_flush (timeout);
+       rrd_cache_flush_identifier (timeout, identifier);
+
        pthread_mutex_unlock (&cache_lock);
        return (0);
 } /* int rrd_flush */