check_uptime: New plugin, based on new cache_event callback.
authorPavel Rochnyack <pavel2000@ngs.ru>
Sun, 19 May 2019 10:27:23 +0000 (17:27 +0700)
committerPavel Rochnyack <pavel2000@ngs.ru>
Sun, 19 May 2019 17:47:59 +0000 (00:47 +0700)
Plugin checks for cache events for uptime metric, and send notifications
when metric state changed.

Makefile.am
configure.ac
src/check_uptime.c [new file with mode: 0644]
src/collectd.conf.in
src/collectd.conf.pod
src/daemon/plugin.c
src/daemon/plugin.h
src/daemon/utils_cache.c
src/daemon/utils_cache.h

index 85f8da8..b9a57a8 100644 (file)
@@ -774,6 +774,12 @@ chrony_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 chrony_la_LIBADD = -lm
 endif
 
+if BUILD_PLUGIN_CHECK_UPTIME
+pkglib_LTLIBRARIES += check_uptime.la
+check_uptime_la_SOURCES = src/check_uptime.c
+check_uptime_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+endif
+
 if BUILD_PLUGIN_CONNTRACK
 pkglib_LTLIBRARIES += conntrack.la
 conntrack_la_SOURCES = src/conntrack.c
index c95422f..1863389 100644 (file)
@@ -6793,6 +6793,7 @@ AC_PLUGIN([bind],                [$plugin_bind],              [ISC Bind nameserv
 AC_PLUGIN([ceph],                [$plugin_ceph],              [Ceph daemon statistics])
 AC_PLUGIN([cgroups],             [$plugin_cgroups],           [CGroups CPU usage accounting])
 AC_PLUGIN([chrony],              [yes],                       [Chrony statistics])
+AC_PLUGIN([check_uptime],        [yes],                       [Notify about uptime reset])
 AC_PLUGIN([conntrack],           [$plugin_conntrack],         [nf_conntrack statistics])
 AC_PLUGIN([contextswitch],       [$plugin_contextswitch],     [context switch statistics])
 AC_PLUGIN([cpu],                 [$plugin_cpu],               [CPU usage statistics])
@@ -7220,6 +7221,7 @@ AC_MSG_RESULT([    bind  . . . . . . . . $enable_bind])
 AC_MSG_RESULT([    ceph  . . . . . . . . $enable_ceph])
 AC_MSG_RESULT([    cgroups . . . . . . . $enable_cgroups])
 AC_MSG_RESULT([    chrony. . . . . . . . $enable_chrony])
+AC_MSG_RESULT([    check_uptime. . . . . $enable_check_uptime])
 AC_MSG_RESULT([    conntrack . . . . . . $enable_conntrack])
 AC_MSG_RESULT([    contextswitch . . . . $enable_contextswitch])
 AC_MSG_RESULT([    cpu . . . . . . . . . $enable_cpu])
diff --git a/src/check_uptime.c b/src/check_uptime.c
new file mode 100644 (file)
index 0000000..33363b5
--- /dev/null
@@ -0,0 +1,273 @@
+/**
+ * collectd - src/check_uptime.c
+ * Copyright (C) 2007-2019  Florian Forster
+ * Copyright (C) 2019  Pavel V. Rochnyack
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Pavel Rochnyak <pavel2000 ngs.ru>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "utils/avltree/avltree.h"
+#include "utils/common/common.h"
+#include "utils_cache.h"
+
+/* Types are registered only in `config` phase, so access is not protected by
+ * locks */
+c_avl_tree_t *types_tree = NULL;
+
+static int format_uptime(unsigned long uptime_sec, char *buf, size_t bufsize) {
+
+  unsigned int uptime_days = uptime_sec / 24 / 3600;
+  uptime_sec -= uptime_days * 24 * 3600;
+  unsigned int uptime_hours = uptime_sec / 3600;
+  uptime_sec -= uptime_hours * 3600;
+  unsigned int uptime_mins = uptime_sec / 60;
+  uptime_sec -= uptime_mins * 60;
+
+  int ret = 0;
+  if (uptime_days) {
+    ret += snprintf(buf + ret, bufsize - ret, " %u day(s)", uptime_days);
+  }
+  if (uptime_days || uptime_hours) {
+    ret += snprintf(buf + ret, bufsize - ret, " %u hour(s)", uptime_hours);
+  }
+  if (uptime_days || uptime_hours || uptime_mins) {
+    ret += snprintf(buf + ret, bufsize - ret, " %u min", uptime_mins);
+  }
+  ret += snprintf(buf + ret, bufsize - ret, " %lu sec.", uptime_sec);
+  return ret;
+}
+
+static int cu_notify(enum cache_event_type_e event_type, const value_list_t *vl,
+                     gauge_t old_uptime, gauge_t new_uptime) {
+  notification_t n;
+  NOTIFICATION_INIT_VL(&n, vl);
+
+  int status;
+  char *buf = n.message;
+  size_t bufsize = sizeof(n.message);
+
+  n.time = vl->time;
+
+  const char *service = "Service";
+  if (strcmp(vl->plugin, "uptime") == 0)
+    service = "Host";
+
+  switch (event_type) {
+  case CE_VALUE_NEW:
+    n.severity = NOTIF_OKAY;
+    status = snprintf(buf, bufsize, "%s is running.", service);
+    buf += status;
+    bufsize -= status;
+    break;
+  case CE_VALUE_UPDATE:
+    n.severity = NOTIF_WARNING;
+    status = snprintf(buf, bufsize, "%s just restarted.", service);
+    buf += status;
+    bufsize -= status;
+    break;
+  case CE_VALUE_EXPIRED:
+    n.severity = NOTIF_FAILURE;
+    status = snprintf(buf, bufsize, "%s is unreachable.", service);
+    buf += status;
+    bufsize -= status;
+    break;
+  }
+
+  if (!isnan(old_uptime)) {
+    status = snprintf(buf, bufsize, " Uptime was:");
+    buf += status;
+    bufsize -= status;
+
+    status = format_uptime(old_uptime, buf, bufsize);
+    buf += status;
+    bufsize -= status;
+
+    plugin_notification_meta_add_double(&n, "LastValue", old_uptime);
+  }
+
+  if (!isnan(new_uptime)) {
+    status = snprintf(buf, bufsize, " Uptime now:");
+    buf += status;
+    bufsize -= status;
+
+    status = format_uptime(new_uptime, buf, bufsize);
+    buf += status;
+    bufsize -= status;
+
+    plugin_notification_meta_add_double(&n, "CurrentValue", new_uptime);
+  }
+
+  plugin_dispatch_notification(&n);
+
+  plugin_notification_meta_free(n.meta);
+  return 0;
+}
+
+static int cu_cache_event(cache_event_t *event,
+                          __attribute__((unused)) user_data_t *ud) {
+  gauge_t values_history[2];
+
+  /* For CE_VALUE_EXPIRED */
+  int ret;
+  value_t *values;
+  size_t values_num;
+  gauge_t old_uptime = NAN;
+
+  switch (event->type) {
+  case CE_VALUE_NEW:
+    DEBUG("check_uptime: CE_VALUE_NEW, %s", event->value_list_name);
+    if (c_avl_get(types_tree, event->value_list->type, NULL) == 0) {
+      event->ret = 1;
+      assert(event->value_list->values_len > 0);
+      cu_notify(CE_VALUE_NEW, event->value_list, NAN /* old */,
+                event->value_list->values[0].gauge /* new */);
+    }
+    break;
+  case CE_VALUE_UPDATE:
+    DEBUG("check_uptime: CE_VALUE_UPDATE, %s", event->value_list_name);
+    if (uc_get_history_by_name(event->value_list_name, values_history, 2, 1)) {
+      ERROR("check_uptime plugin: Failed to get value history for %s.",
+            event->value_list_name);
+    } else {
+      if (!isnan(values_history[0]) && !isnan(values_history[1]) &&
+          values_history[0] < values_history[1]) {
+        cu_notify(CE_VALUE_UPDATE, event->value_list,
+                  values_history[1] /* old */, values_history[0] /* new */);
+      }
+    }
+    break;
+  case CE_VALUE_EXPIRED:
+    DEBUG("check_uptime: CE_VALUE_EXPIRED, %s", event->value_list_name);
+    ret = uc_get_value_by_name(event->value_list_name, &values, &values_num);
+    if (ret == 0) {
+      old_uptime = values[0].gauge;
+      sfree(values);
+    }
+
+    cu_notify(CE_VALUE_EXPIRED, event->value_list, old_uptime, NAN /* new */);
+    break;
+  }
+  return 0;
+}
+
+static int cu_config(oconfig_item_t *ci) {
+  if (types_tree == NULL) {
+    types_tree = c_avl_create((int (*)(const void *, const void *))strcmp);
+    if (types_tree == NULL) {
+      ERROR("check_uptime plugin: c_avl_create failed.");
+      return -1;
+    }
+  }
+
+  for (int i = 0; i < ci->children_num; ++i) {
+    oconfig_item_t *child = ci->children + i;
+    if (strcasecmp("Type", child->key) == 0) {
+      if ((child->values_num != 1) ||
+          (child->values[0].type != OCONFIG_TYPE_STRING)) {
+        WARNING("check_uptime plugin: The `Type' option needs exactly one "
+                "string argument.");
+        return -1;
+      }
+      char *type = child->values[0].value.string;
+
+      if (c_avl_get(types_tree, type, NULL) == 0) {
+        ERROR("check_uptime plugin: Type `%s' already added.", type);
+        return -1;
+      }
+
+      char *type_copy = strdup(type);
+      if (type_copy == NULL) {
+        ERROR("check_uptime plugin: strdup failed.");
+        return -1;
+      }
+
+      int status = c_avl_insert(types_tree, type_copy, NULL);
+      if (status != 0) {
+        ERROR("check_uptime plugin: c_avl_insert failed.");
+        sfree(type_copy);
+        return -1;
+      }
+    } else
+      WARNING("check_uptime plugin: Ignore unknown config option `%s'.",
+              child->key);
+  }
+
+  return 0;
+}
+
+static int cu_init(void) {
+  if (types_tree == NULL) {
+    types_tree = c_avl_create((int (*)(const void *, const void *))strcmp);
+    if (types_tree == NULL) {
+      ERROR("check_uptime plugin: c_avl_create failed.");
+      return -1;
+    }
+    /* Default configuration */
+    char *type = strdup("uptime");
+    if (type == NULL) {
+      ERROR("check_uptime plugin: strdup failed.");
+      return -1;
+    }
+    int status = c_avl_insert(types_tree, type, NULL);
+    if (status != 0) {
+      ERROR("check_uptime plugin: c_avl_insert failed.");
+      sfree(type);
+      return -1;
+    }
+  }
+
+  int ret = 0;
+  char *type;
+  void *val;
+  c_avl_iterator_t *iter = c_avl_get_iterator(types_tree);
+  while (c_avl_iterator_next(iter, (void *)&type, (void *)&val) == 0) {
+    data_set_t const *ds = plugin_get_ds(type);
+    if (ds == NULL) {
+      ERROR("check_uptime plugin: Failed to look up type \"%s\".", type);
+      ret = -1;
+      continue;
+    }
+    if (ds->ds_num != 1) {
+      ERROR("check_uptime plugin: The type \"%s\" has %" PRIsz " data sources. "
+            "Only types with a single GAUGE data source are supported.",
+            ds->type, ds->ds_num);
+      ret = -1;
+      continue;
+    }
+    if (ds->ds[0].type != DS_TYPE_GAUGE) {
+      ERROR("check_uptime plugin: The type \"%s\" has wrong data source type. "
+            "Only types with a single GAUGE data source are supported.",
+            ds->type);
+      ret = -1;
+      continue;
+    }
+  }
+  c_avl_iterator_destroy(iter);
+
+  if (ret == 0)
+    plugin_register_cache_event("check_uptime", cu_cache_event, NULL);
+
+  return ret;
+}
+
+void module_register(void) {
+  plugin_register_complex_config("check_uptime", cu_config);
+  plugin_register_init("check_uptime", cu_init);
+}
index 7f09c5c..4011155 100644 (file)
 #@BUILD_PLUGIN_CEPH_TRUE@LoadPlugin ceph
 #@BUILD_PLUGIN_CGROUPS_TRUE@LoadPlugin cgroups
 #@BUILD_PLUGIN_CHRONY_TRUE@LoadPlugin chrony
+#@BUILD_PLUGIN_CHECK_UPTIME_TRUE@LoadPlugin check_uptime
 #@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack
 #@BUILD_PLUGIN_CONTEXTSWITCH_TRUE@LoadPlugin contextswitch
 @BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu
index 71931c2..1190895 100644 (file)
@@ -1548,6 +1548,35 @@ at all, B<all> cgroups are selected.
 
 =back
 
+=head2 Plugin C<check_uptime>
+
+The I<check_uptime plugin> designed to check and notify about host or service
+status based on I<uptime> metric.
+
+When new metric of I<uptime> type appears in cache, OK notification is sent.
+When new value for metric is less than previous value, WARNING notification is
+sent about host/service restart.
+When no new updates comes for metric and cache entry expires, then FAILURE
+notification is sent about unreachable host or service.
+
+By default (when no explicit configuration), plugin checks for I<uptime> metric.
+
+B<Synopsis:>
+
+ <Plugin "check_uptime">
+   Type "uptime"
+   Type "my_uptime_type"
+ </Plugin>
+
+=over 4
+
+=item B<Type> I<Type>
+
+Metric type to check for status/values. The type should consist single GAUGE
+data source.
+
+=back
+
 =head2 Plugin C<chrony>
 
 The C<chrony> plugin collects ntp data from a B<chronyd> server, such as clock
index b4e5ae7..10a2064 100644 (file)
@@ -85,6 +85,14 @@ struct read_func_s {
 };
 typedef struct read_func_s read_func_t;
 
+struct cache_event_func_s {
+  plugin_cache_event_cb callback;
+  char *name;
+  user_data_t user_data;
+  plugin_ctx_t plugin_ctx;
+};
+typedef struct cache_event_func_s cache_event_func_t;
+
 struct write_queue_s;
 typedef struct write_queue_s write_queue_t;
 struct write_queue_s {
@@ -112,6 +120,9 @@ static llist_t *list_shutdown;
 static llist_t *list_log;
 static llist_t *list_notification;
 
+static size_t list_cache_event_num;
+static cache_event_func_t list_cache_event[32];
+
 static fc_chain_t *pre_cache_chain;
 static fc_chain_t *post_cache_chain;
 
@@ -263,8 +274,6 @@ static void destroy_read_heap(void) /* {{{ */
 
 static int register_callback(llist_t **list, /* {{{ */
                              const char *name, callback_func_t *cf) {
-  llentry_t *le;
-  char *key;
 
   if (*list == NULL) {
     *list = llist_create();
@@ -276,14 +285,14 @@ static int register_callback(llist_t **list, /* {{{ */
     }
   }
 
-  key = strdup(name);
+  char *key = strdup(name);
   if (key == NULL) {
     ERROR("plugin: register_callback: strdup failed.");
     destroy_callback(cf);
     return -1;
   }
 
-  le = llist_search(*list, name);
+  llentry_t *le = llist_search(*list, name);
   if (le == NULL) {
     le = llentry_create(key, cf);
     if (le == NULL) {
@@ -296,9 +305,7 @@ static int register_callback(llist_t **list, /* {{{ */
 
     llist_append(*list, le);
   } else {
-    callback_func_t *old_cf;
-
-    old_cf = le->value;
+    callback_func_t *old_cf = le->value;
     le->value = cf;
 
     P_WARNING("register_callback: "
@@ -1310,6 +1317,60 @@ EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback,
   return create_register_callback(&list_missing, name, (void *)callback, ud);
 } /* int plugin_register_missing */
 
+EXPORT int plugin_register_cache_event(const char *name,
+                                       plugin_cache_event_cb callback,
+                                       user_data_t const *ud) {
+
+  if (name == NULL || callback == NULL)
+    return EINVAL;
+
+  char *name_copy = strdup(name);
+  if (name_copy == NULL) {
+    P_ERROR("plugin_register_cache_event: strdup failed.");
+    free_userdata(ud);
+    return ENOMEM;
+  }
+
+  if (list_cache_event_num >= 32) {
+    P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
+            "to be registered.");
+    free_userdata(ud);
+    return ENOMEM;
+  }
+
+  for (size_t i = 0; i < list_cache_event_num; i++) {
+    cache_event_func_t *cef = &list_cache_event[i];
+    if (!cef->callback)
+      continue;
+
+    if (strcmp(name, cef->name) == 0) {
+      P_ERROR("plugin_register_cache_event: a callback named `%s' already "
+              "registered!",
+              name);
+      free_userdata(ud);
+      return -1;
+    }
+  }
+
+  user_data_t user_data;
+  if (ud == NULL) {
+    user_data = (user_data_t){
+        .data = NULL, .free_func = NULL,
+    };
+  } else {
+    user_data = *ud;
+  }
+
+  list_cache_event[list_cache_event_num] =
+      (cache_event_func_t){.callback = callback,
+                           .name = name_copy,
+                           .user_data = user_data,
+                           .plugin_ctx = plugin_get_ctx()};
+  list_cache_event_num++;
+
+  return 0;
+} /* int plugin_register_cache_event */
+
 EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
   return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
 } /* int plugin_register_shutdown */
@@ -1511,6 +1572,32 @@ EXPORT int plugin_unregister_missing(const char *name) {
   return plugin_unregister(list_missing, name);
 }
 
+EXPORT int plugin_unregister_cache_event(const char *name) {
+  for (size_t i = 0; i < list_cache_event_num; i++) {
+    cache_event_func_t *cef = &list_cache_event[i];
+    if (!cef->callback)
+      continue;
+    if (strcmp(name, cef->name) == 0) {
+      /* Mark callback as inactive, so mask in cache entries remains actual */
+      cef->callback = NULL;
+      sfree(cef->name);
+      free_userdata(&cef->user_data);
+    }
+  }
+  return 0;
+}
+
+static void destroy_cache_event_callbacks() {
+  for (size_t i = 0; i < list_cache_event_num; i++) {
+    cache_event_func_t *cef = &list_cache_event[i];
+    if (!cef->callback)
+      continue;
+    cef->callback = NULL;
+    sfree(cef->name);
+    free_userdata(&cef->user_data);
+  }
+}
+
 EXPORT int plugin_unregister_shutdown(const char *name) {
   return plugin_unregister(list_shutdown, name);
 }
@@ -1855,6 +1942,7 @@ EXPORT int plugin_shutdown_all(void) {
    * the data isn't freed twice. */
   destroy_all_callbacks(&list_flush);
   destroy_all_callbacks(&list_missing);
+  destroy_cache_event_callbacks();
   destroy_all_callbacks(&list_write);
 
   destroy_all_callbacks(&list_notification);
@@ -1895,6 +1983,82 @@ EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
   return 0;
 } /* int }}} plugin_dispatch_missing */
 
+void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
+                                 unsigned long callbacks_mask, const char *name,
+                                 const value_list_t *vl) {
+  switch (event_type) {
+  case CE_VALUE_NEW:
+    callbacks_mask = 0;
+    for (size_t i = 0; i < list_cache_event_num; i++) {
+      cache_event_func_t *cef = &list_cache_event[i];
+      plugin_cache_event_cb callback = cef->callback;
+
+      if (!callback)
+        continue;
+
+      cache_event_t event = (cache_event_t){.type = event_type,
+                                            .value_list = vl,
+                                            .value_list_name = name,
+                                            .ret = 0};
+
+      plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+      int status = (*callback)(&event, &cef->user_data);
+      plugin_set_ctx(old_ctx);
+
+      if (status != 0) {
+        ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+              "%i for event NEW.",
+              cef->name, status);
+      } else {
+        if (event.ret) {
+          DEBUG(
+              "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
+              cef->name, name);
+          callbacks_mask |= (1 << (i));
+        } else {
+          DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
+                cef->name, name);
+        }
+      }
+    }
+
+    if (callbacks_mask)
+      uc_set_callbacks_mask(name, callbacks_mask);
+
+    break;
+  case CE_VALUE_UPDATE:
+  case CE_VALUE_EXPIRED:
+    for (size_t i = 0; i < list_cache_event_num; i++) {
+      cache_event_func_t *cef = &list_cache_event[i];
+      plugin_cache_event_cb callback = cef->callback;
+
+      if (!callback)
+        continue;
+
+      if (callbacks_mask && (1 << (i)) == 0)
+        continue;
+
+      cache_event_t event = (cache_event_t){.type = event_type,
+                                            .value_list = vl,
+                                            .value_list_name = name,
+                                            .ret = 0};
+
+      plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+      int status = (*callback)(&event, &cef->user_data);
+      plugin_set_ctx(old_ctx);
+
+      if (status != 0) {
+        ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+              "%i for event %s.",
+              cef->name, status,
+              ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
+      }
+    }
+    break;
+  }
+  return;
+}
+
 static int plugin_dispatch_values_internal(value_list_t *vl) {
   int status;
   static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
index 6b3a030..c3534e8 100644 (file)
@@ -171,6 +171,15 @@ struct user_data_s {
 };
 typedef struct user_data_s user_data_t;
 
+enum cache_event_type_e { CE_VALUE_NEW, CE_VALUE_UPDATE, CE_VALUE_EXPIRED };
+
+typedef struct cache_event_s {
+  enum cache_event_type_e type;
+  const value_list_t *value_list;
+  const char *value_list_name;
+  int ret;
+} cache_event_t;
+
 struct plugin_ctx_s {
   char *name;
   cdtime_t interval;
@@ -192,6 +201,11 @@ typedef int (*plugin_flush_cb)(cdtime_t timeout, const char *identifier,
  * callbacks should be called, greater than zero if no more callbacks should be
  * called. */
 typedef int (*plugin_missing_cb)(const value_list_t *, user_data_t *);
+/* "cache event" callback. CE_VALUE_NEW events are sent to all registered
+ * callbacks. Callback should check if it interested in further CE_VALUE_UPDATE
+ * and CE_VALUE_EXPIRED events for metric and set event->ret = 1 if so.
+ */
+typedef int (*plugin_cache_event_cb)(cache_event_t *, user_data_t *);
 typedef void (*plugin_log_cb)(int severity, const char *message, user_data_t *);
 typedef int (*plugin_shutdown_cb)(void);
 typedef int (*plugin_notification_cb)(const notification_t *, user_data_t *);
@@ -294,6 +308,9 @@ int plugin_register_flush(const char *name, plugin_flush_cb callback,
                           user_data_t const *user_data);
 int plugin_register_missing(const char *name, plugin_missing_cb callback,
                             user_data_t const *user_data);
+int plugin_register_cache_event(const char *name,
+                                plugin_cache_event_cb callback,
+                                user_data_t const *ud);
 int plugin_register_shutdown(const char *name, plugin_shutdown_cb callback);
 int plugin_register_data_set(const data_set_t *ds);
 int plugin_register_log(const char *name, plugin_log_cb callback,
@@ -310,6 +327,7 @@ int plugin_unregister_read_group(const char *group);
 int plugin_unregister_write(const char *name);
 int plugin_unregister_flush(const char *name);
 int plugin_unregister_missing(const char *name);
+int plugin_unregister_cache_event(const char *name);
 int plugin_unregister_shutdown(const char *name);
 int plugin_unregister_data_set(const char *name);
 int plugin_unregister_log(const char *name);
@@ -380,6 +398,9 @@ __attribute__((sentinel)) int plugin_dispatch_multivalue(value_list_t const *vl,
                                                          int store_type, ...);
 
 int plugin_dispatch_missing(const value_list_t *vl);
+void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
+                                 unsigned long callbacks_mask, const char *name,
+                                 const value_list_t *vl);
 
 int plugin_dispatch_notification(const notification_t *notif);
 
index c53e5d1..722fa2d 100644 (file)
@@ -67,6 +67,7 @@ typedef struct cache_entry_s {
   size_t history_length;
 
   meta_data_t *meta;
+  unsigned long callbacks_mask;
 } cache_entry_t;
 
 struct uc_iter_s {
@@ -140,18 +141,15 @@ static void uc_check_range(const data_set_t *ds, cache_entry_t *ce) {
 
 static int uc_insert(const data_set_t *ds, const value_list_t *vl,
                      const char *key) {
-  char *key_copy;
-  cache_entry_t *ce;
-
   /* `cache_lock' has been locked by `uc_update' */
 
-  key_copy = strdup(key);
+  char *key_copy = strdup(key);
   if (key_copy == NULL) {
     ERROR("uc_insert: strdup failed.");
     return -1;
   }
 
-  ce = cache_alloc(ds->ds_num);
+  cache_entry_t *ce = cache_alloc(ds->ds_num);
   if (ce == NULL) {
     sfree(key_copy);
     ERROR("uc_insert: cache_alloc (%" PRIsz ") failed.", ds->ds_num);
@@ -226,6 +224,7 @@ int uc_check_timeout(void) {
     char *key;
     cdtime_t time;
     cdtime_t interval;
+    unsigned long callbacks_mask;
   } *expired = NULL;
   size_t expired_num = 0;
 
@@ -251,6 +250,7 @@ int uc_check_timeout(void) {
     expired[expired_num].key = strdup(key);
     expired[expired_num].time = ce->last_time;
     expired[expired_num].interval = ce->interval;
+    expired[expired_num].callbacks_mask = ce->callbacks_mask;
 
     if (expired[expired_num].key == NULL) {
       ERROR("uc_check_timeout: strdup failed.");
@@ -285,6 +285,10 @@ int uc_check_timeout(void) {
     }
 
     plugin_dispatch_missing(&vl);
+
+    if (expired[i].callbacks_mask)
+      plugin_dispatch_cache_event(CE_VALUE_EXPIRED, expired[i].callbacks_mask,
+                                  expired[i].key, &vl);
   } /* for (i = 0; i < expired_num; i++) */
 
   /* Now actually remove all the values from the cache. We don't re-evaluate
@@ -314,8 +318,6 @@ int uc_check_timeout(void) {
 
 int uc_update(const data_set_t *ds, const value_list_t *vl) {
   char name[6 * DATA_MAX_NAME_LEN];
-  cache_entry_t *ce = NULL;
-  int status;
 
   if (FORMAT_VL(name, sizeof(name), vl) != 0) {
     ERROR("uc_update: FORMAT_VL failed.");
@@ -324,11 +326,16 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) {
 
   pthread_mutex_lock(&cache_lock);
 
-  status = c_avl_get(cache_tree, name, (void *)&ce);
+  cache_entry_t *ce = NULL;
+  int status = c_avl_get(cache_tree, name, (void *)&ce);
   if (status != 0) /* entry does not yet exist */
   {
     status = uc_insert(ds, vl, name);
     pthread_mutex_unlock(&cache_lock);
+
+    if (status == 0)
+      plugin_dispatch_cache_event(CE_VALUE_NEW, 0 /* mask */, name, vl);
+
     return status;
   }
 
@@ -403,11 +410,32 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) {
   ce->last_update = cdtime();
   ce->interval = vl->interval;
 
+  /* Check if cache entry has registered callbacks */
+  unsigned long callbacks_mask = ce->callbacks_mask;
+
   pthread_mutex_unlock(&cache_lock);
 
+  if (callbacks_mask)
+    plugin_dispatch_cache_event(CE_VALUE_UPDATE, callbacks_mask, name, vl);
+
   return 0;
 } /* int uc_update */
 
+int uc_set_callbacks_mask(const char *name, unsigned long mask) {
+  pthread_mutex_lock(&cache_lock);
+  cache_entry_t *ce = NULL;
+  int status = c_avl_get(cache_tree, name, (void *)&ce);
+  if (status != 0) { /* Ouch, just created entry disappeared ?! */
+    ERROR("uc_set_callbacks_mask: Couldn't find %s entry!", name);
+    pthread_mutex_unlock(&cache_lock);
+    return -1;
+  }
+  DEBUG("uc_set_callbacks_mask: set mask for \"%s\" to %lu.", name, mask);
+  ce->callbacks_mask = mask;
+  pthread_mutex_unlock(&cache_lock);
+  return 0;
+}
+
 int uc_get_rate_by_name(const char *name, gauge_t **ret_values,
                         size_t *ret_values_num) {
   gauge_t *ret = NULL;
index d3ea936..a069221 100644 (file)
@@ -56,6 +56,8 @@ int uc_get_hits(const data_set_t *ds, const value_list_t *vl);
 int uc_set_hits(const data_set_t *ds, const value_list_t *vl, int hits);
 int uc_inc_hits(const data_set_t *ds, const value_list_t *vl, int step);
 
+int uc_set_callbacks_mask(const char *name, unsigned long callbacks_mask);
+
 int uc_get_history(const data_set_t *ds, const value_list_t *vl,
                    gauge_t *ret_history, size_t num_steps, size_t num_ds);
 int uc_get_history_by_name(const char *name, gauge_t *ret_history,