Merge remote-tracking branches 'github/pr/1857', 'github/pr/1859' and 'github/pr...
authorFlorian Forster <octo@collectd.org>
Sat, 6 Aug 2016 19:13:22 +0000 (21:13 +0200)
committerFlorian Forster <octo@collectd.org>
Sat, 6 Aug 2016 19:13:22 +0000 (21:13 +0200)
19 files changed:
configure.ac
src/Makefile.am
src/collectd-python.pod
src/collectd.conf.in
src/collectd.conf.pod
src/daemon/configfile.c
src/daemon/utils_avltree.h
src/daemon/utils_heap.h
src/libcollectdclient/client.c
src/libcollectdclient/collectd/client.h
src/ping.c
src/python.c
src/rrdtool.c
src/utils_db_query.c
src/utils_dns.c
src/utils_format_json.c
src/utils_format_json.h
src/utils_format_json_test.c [new file with mode: 0644]
src/write_http.c

index 96596a0..69824a6 100644 (file)
@@ -3640,7 +3640,7 @@ then
   SAVE_CFLAGS="$CFLAGS"
   SAVE_LIBS="$LIBS"
 dnl ARCHFLAGS="" -> disable multi -arch on OSX (see Config_heavy.pl:fetch_string)
-  PERL_CFLAGS=`ARCHFLAGS="" $perl_interpreter -MExtUtils::Embed -e ccopts`
+  PERL_CFLAGS=`ARCHFLAGS="" $perl_interpreter -MExtUtils::Embed -e perl_inc`
   PERL_LIBS=`ARCHFLAGS="" $perl_interpreter -MExtUtils::Embed -e ldopts`
   CFLAGS="$CFLAGS $PERL_CFLAGS"
   LIBS="$LIBS $PERL_LIBS"
index a4ca01e..c78e479 100644 (file)
@@ -27,6 +27,21 @@ noinst_LTLIBRARIES =
 check_PROGRAMS =
 TESTS =
 
+noinst_LTLIBRARIES += libformat_json.la
+libformat_json_la_SOURCES   = utils_format_json.c utils_format_json.h
+libformat_json_la_CPPFLAGS  = $(AM_CPPFLAGS)
+libformat_json_la_LDFLAGS   = $(AM_LDFLAGS)
+libformat_json_la_LIBADD    =
+if BUILD_WITH_LIBYAJL
+libformat_json_la_CPPFLAGS += $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+libformat_json_la_LDFLAGS  += $(BUILD_WITH_LIBYAJL_LDFLAGS)
+libformat_json_la_LIBADD   += $(BUILD_WITH_LIBYAJL_LIBS)
+check_PROGRAMS += test_format_json
+TESTS += test_format_json
+test_format_json_SOURCES = utils_format_json_test.c testing.h
+test_format_json_LDADD = libformat_json.la daemon/libmetadata.la daemon/libplugin_mock.la -lm
+endif
+
 noinst_LTLIBRARIES += liblatency.la
 liblatency_la_SOURCES = utils_latency.c utils_latency.h
 check_PROGRAMS += test_utils_latency
@@ -141,11 +156,10 @@ pkglib_LTLIBRARIES += amqp.la
 amqp_la_SOURCES = amqp.c \
                  utils_cmd_putval.c utils_cmd_putval.h \
                  utils_parse_option.c utils_parse_option.h \
-                 utils_format_graphite.c utils_format_graphite.h \
-                 utils_format_json.c utils_format_json.h
+                 utils_format_graphite.c utils_format_graphite.h
 amqp_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
 amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
-amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS)
+amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS) libformat_json.la
 endif
 
 if BUILD_PLUGIN_APACHE
@@ -1213,31 +1227,29 @@ endif
 if BUILD_PLUGIN_WRITE_GRAPHITE
 pkglib_LTLIBRARIES += write_graphite.la
 write_graphite_la_SOURCES = write_graphite.c \
-                        utils_format_graphite.c utils_format_graphite.h \
-                        utils_format_json.c utils_format_json.h
+                        utils_format_graphite.c utils_format_graphite.h
 write_graphite_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+write_graphite_la_LIBADD = libformat_json.la
 endif
 
 if BUILD_PLUGIN_WRITE_HTTP
 pkglib_LTLIBRARIES += write_http.la
 write_http_la_SOURCES = write_http.c \
-                       utils_format_json.c utils_format_json.h \
                        utils_format_kairosdb.c utils_format_kairosdb.h
-write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS)
 write_http_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-write_http_la_LIBADD = $(BUILD_WITH_LIBCURL_LIBS)
+write_http_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS)
+write_http_la_LIBADD = $(BUILD_WITH_LIBCURL_LIBS) libformat_json.la
 endif
 
 if BUILD_PLUGIN_WRITE_KAFKA
 pkglib_LTLIBRARIES += write_kafka.la
 write_kafka_la_SOURCES = write_kafka.c \
                         utils_format_graphite.c utils_format_graphite.h \
-                        utils_format_json.c utils_format_json.h \
                         utils_cmd_putval.c utils_cmd_putval.h \
                         utils_crc32.c utils_crc32.h
 write_kafka_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRDKAFKA_CPPFLAGS)
 write_kafka_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBRDKAFKA_LDFLAGS)
-write_kafka_la_LIBADD = $(BUILD_WITH_LIBRDKAFKA_LIBS)
+write_kafka_la_LIBADD = $(BUILD_WITH_LIBRDKAFKA_LIBS) libformat_json.la
 endif
 
 if BUILD_PLUGIN_WRITE_LOG
index 4647a11..e0851d9 100644 (file)
@@ -94,11 +94,12 @@ way of entering your commands. The daemonized collectd won't do that.
 
 =item *
 
-B<2.> collectd will block I<SIGINT>. Pressing I<Ctrl+C> will usually cause
+B<2.> Python will be handling I<SIGINT>. Pressing I<Ctrl+C> will usually cause
 collectd to shut down. This would be problematic in an interactive session,
-therefore this signal will be blocked. You can still use it to interrupt
-syscalls like sleep and pause but it won't generate a I<KeyboardInterrupt>
-exception either.
+therefore Python will be handling it in interactive sessions. This allows you
+to use I<Ctrl+C> to interrupt Python code without killing collectd. This also
+means you can catch I<KeyboardInterrupt> exceptions which does not work during
+normal operation.
 
 To quit collectd send I<EOF> (press I<Ctrl+D> at the beginning of a new line).
 
index 1e8994e..244fd67 100644 (file)
 #              Header "X-Custom-Header: custom_value"
 #              SSLVersion "TLSv1"
 #              Format "Command"
+#              Metrics true
+#              Notifications false
 #              StoreRates false
 #              BufferSize 4096
 #              LowSpeedLimit 0
index 2b726dd..1e564cf 100644 (file)
@@ -7062,7 +7062,7 @@ port in numeric form.
 =item B<AllPortsSummary> I<true>|I<false>
 
 If this option is set to I<true> a summary of statistics from all connections
-are collectd. This option defaults to I<false>.
+are collected. This option defaults to I<false>.
 
 =back
 
@@ -7843,6 +7843,14 @@ create output in the I<JavaScript Object Notation> (JSON). When set to KAIROSDB
 
 Defaults to B<Command>.
 
+=item B<Metrics> B<true>|B<false>
+
+Controls whether I<metrics> are POSTed to this location. Defaults to B<true>.
+
+=item B<Notifications> B<false>|B<true>
+
+Controls whether I<notifications> are POSTed to this location. Defaults to B<false>.
+
 =item B<StoreRates> B<true|false>
 
 If set to B<true>, convert counter values to rates. If set to B<false> (the
index ff5e034..a31fb64 100644 (file)
@@ -792,7 +792,7 @@ static oconfig_item_t *cf_read_dir (const char *dir,
  *
  * There are two versions of this function: If `wordexp' exists shell wildcards
  * will be expanded and the function will include all matches found. If
- * `wordexp' (or, more precisely, it's header file) is not available the
+ * `wordexp' (or, more precisely, its header file) is not available the
  * simpler function is used which does not do any such expansion.
  */
 #if HAVE_WORDEXP_H
index 1e0f271..43e94cf 100644 (file)
@@ -42,7 +42,7 @@ typedef struct c_avl_iterator_s c_avl_iterator_t;
  *
  * PARAMETERS
  *   `compare'  The function-pointer `compare' is used to compare two keys. It
- *              has to return less than zero if it's first argument is smaller
+ *              has to return less than zero if its first argument is smaller
  *              then the second argument, more than zero if the first argument
  *              is bigger than the second argument and zero if they are equal.
  *              If your keys are char-pointers, you can use the `strcmp'
@@ -131,7 +131,7 @@ int c_avl_get (c_avl_tree_t *t, const void *key, void **value);
  *   c_avl_pick
  *
  * DESCRIPTION
- *   Remove a (pseudo-)random element from the tree and return it's `key' and
+ *   Remove a (pseudo-)random element from the tree and return its `key' and
  *   `value'. Entries are not returned in any particular order. This function
  *   is intended for cache-flushes that don't care about the order but simply
  *   want to remove all elements, one at a time.
index 6d71c43..243554c 100644 (file)
@@ -39,7 +39,7 @@ typedef struct c_heap_s c_heap_t;
  *
  * PARAMETERS
  *   `compare'  The function-pointer `compare' is used to compare two keys. It
- *              has to return less than zero if it's first argument is smaller
+ *              has to return less than zero if its first argument is smaller
  *              then the second argument, more than zero if the first argument
  *              is bigger than the second argument and zero if they are equal.
  *              If your keys are char-pointers, you can use the `strcmp'
index 1c118e3..a15e0aa 100644 (file)
@@ -36,6 +36,7 @@
 
 #include <stdlib.h>
 #include <stdio.h>
+#include <stdarg.h>
 #include <unistd.h>
 #include <sys/types.h>
 #include <sys/socket.h>
   (c)->errbuf[sizeof ((c)->errbuf) - 1] = 0; \
 } while (0)
 
-#if COLLECT_DEBUG
-# define LCC_DEBUG(...) printf (__VA_ARGS__)
-#else
-# define LCC_DEBUG(...) /**/
-#endif
-
 /*
  * Types
  */
@@ -118,6 +113,22 @@ typedef struct lcc_response_s lcc_response_t;
 /*
  * Private functions
  */
+static int lcc_tracef(char const *format, ...)
+{
+  va_list ap;
+  int status;
+
+  char const *trace = getenv (LCC_TRACE_ENV);
+  if (!trace || (strcmp ("", trace) == 0) || (strcmp ("0", trace) == 0))
+    return 0;
+
+  va_start (ap, format);
+  status = vprintf (format, ap);
+  va_end (ap);
+
+  return status;
+}
+
 /* Even though Posix requires "strerror_r" to return an "int",
  * some systems (e.g. the GNU libc) return a "char *" _and_
  * ignore the second argument ... -tokkee */
@@ -248,7 +259,7 @@ static int lcc_send (lcc_connection_t *c, const char *command) /* {{{ */
 {
   int status;
 
-  LCC_DEBUG ("send:    --> %s\n", command);
+  lcc_tracef ("send:    --> %s\n", command);
 
   status = fprintf (c->fh, "%s\r\n", command);
   if (status < 0)
@@ -277,7 +288,7 @@ static int lcc_receive (lcc_connection_t *c, /* {{{ */
     return (-1);
   }
   lcc_chomp (buffer);
-  LCC_DEBUG ("receive: <-- %s\n", buffer);
+  lcc_tracef ("receive: <-- %s\n", buffer);
 
   /* Convert the leading status to an integer and make `ptr' to point to the
    * beginning of the message. */
@@ -325,7 +336,7 @@ static int lcc_receive (lcc_connection_t *c, /* {{{ */
       break;
     }
     lcc_chomp (buffer);
-    LCC_DEBUG ("receive: <-- %s\n", buffer);
+    lcc_tracef ("receive: <-- %s\n", buffer);
 
     res.lines[i] = strdup (buffer);
     if (res.lines[i] == NULL)
index ac26b84..47462a6 100644 (file)
 
 #include "lcc_features.h"
 
+/* COLLECTD_TRACE is the environment variable used to control trace output. When
+ * set to something non-zero, all lines sent to / received from the daemon are
+ * printed to STDOUT. */
+#ifndef LCC_TRACE_ENV
+# define LCC_TRACE_ENV "COLLECTD_TRACE"
+#endif
+
 /*
  * Includes (for data types)
  */
index 978cb49..4932bae 100644 (file)
@@ -383,7 +383,7 @@ static int start_thread (void) /* {{{ */
   if (ping_thread_loop != 0)
   {
     pthread_mutex_unlock (&ping_lock);
-    return (-1);
+    return (0);
   }
 
   ping_thread_loop = 1;
@@ -448,10 +448,7 @@ static int ping_init (void) /* {{{ */
         "Will use a timeout of %gs.", ping_timeout);
   }
 
-  if (start_thread () != 0)
-    return (-1);
-
-  return (0);
+  return (start_thread ());
 } /* }}} int ping_init */
 
 static int config_set_string (const char *name, /* {{{ */
index b991f45..24046de 100644 (file)
@@ -219,6 +219,8 @@ static char reg_shutdown_doc[] = "register_shutdown(callback[, data][, name]) ->
                "    data if it was supplied.";
 
 
+static pthread_t main_thread;
+static PyOS_sighandler_t python_sigint_handler;
 static _Bool do_interactive = 0;
 
 /* This is our global thread state. Python saves some stuff in thread-local
@@ -910,13 +912,8 @@ static int cpy_shutdown(void) {
        return 0;
 }
 
-static void cpy_int_handler(int sig) {
-       return;
-}
-
 static void *cpy_interactive(void *data) {
-       sigset_t sigset;
-       struct sigaction old;
+       PyOS_sighandler_t cur_sig;
 
        /* Signal handler in a plugin? Bad stuff, but the best way to
         * handle it I guess. In an interactive session people will
@@ -926,46 +923,40 @@ static void *cpy_interactive(void *data) {
         * mess. Chances are, this isn't what the user wanted to do.
         *
         * So this is the plan:
-        * 1. Block SIGINT in the main thread.
-        * 2. Install our own signal handler that does nothing.
-        * 3. Unblock SIGINT in the interactive thread.
+        * 1. Restore Python's own signal handler
+        * 2. Tell Python we just forked so it will accept this thread
+        *    as the main one. No version of Python will ever handle
+        *    interrupts anywhere but in the main thread.
+        * 3. After the interactive loop is done, restore collectd's
+        *    SIGINT handler.
+        * 4. Raise SIGINT for a clean shutdown. The signal is sent to
+        *    the main thread to ensure it wakes up the main interval
+        *    sleep so that collectd shuts down immediately not in 10
+        *    seconds.
         *
         * This will make sure that SIGINT won't kill collectd but
-        * still interrupt syscalls like sleep and pause.
-        * It does not raise a KeyboardInterrupt exception because so
-        * far nobody managed to figure out how to do that. */
-       struct sigaction sig_int_action = {
-               .sa_handler = cpy_int_handler
-       };
-       sigaction (SIGINT, &sig_int_action, &old);
-
-       sigemptyset(&sigset);
-       sigaddset(&sigset, SIGINT);
-       pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+        * still interrupt syscalls like sleep and pause. */
+
        PyEval_AcquireThread(state);
        if (PyImport_ImportModule("readline") == NULL) {
                /* This interactive session will suck. */
                cpy_log_exception("interactive session init");
-       }
+       }
+       cur_sig = PyOS_setsig(SIGINT, python_sigint_handler);
+       /* We totally forked just now. Everyone saw that, right? */
+       PyOS_AfterFork();
        PyRun_InteractiveLoop(stdin, "<stdin>");
+       PyOS_setsig(SIGINT, cur_sig);
        PyErr_Print();
        PyEval_ReleaseThread(state);
        NOTICE("python: Interactive interpreter exited, stopping collectd ...");
-       /* Restore the original collectd SIGINT handler and raise SIGINT.
-        * The main thread still has SIGINT blocked and there's nothing we
-        * can do about that so this thread will handle it. But that's not
-        * important, except that it won't interrupt the main loop and so
-        * it might take a few seconds before collectd really shuts down. */
-       sigaction (SIGINT, &old, NULL);
-       raise(SIGINT);
-       pause();
+       pthread_kill(main_thread, SIGINT);
        return NULL;
 }
 
 static int cpy_init(void) {
        PyObject *ret;
        static pthread_t thread;
-       sigset_t sigset;
 
        if (!Py_IsInitialized()) {
                WARNING("python: Plugin loaded but not configured.");
@@ -981,10 +972,8 @@ static int cpy_init(void) {
                else
                        Py_DECREF(ret);
        }
-       sigemptyset(&sigset);
-       sigaddset(&sigset, SIGINT);
-       pthread_sigmask(SIG_BLOCK, &sigset, NULL);
        state = PyEval_SaveThread();
+       main_thread = pthread_self();
        if (do_interactive) {
                if (plugin_thread_create(&thread, NULL, cpy_interactive, NULL)) {
                        ERROR("python: Error creating thread for interactive interpreter.");
@@ -1040,6 +1029,7 @@ PyMODINIT_FUNC PyInit_collectd(void) {
 #endif
 
 static int cpy_init_python(void) {
+       PyOS_sighandler_t cur_sig;
        PyObject *sys;
        PyObject *module;
 
@@ -1051,7 +1041,10 @@ static int cpy_init_python(void) {
        char *argv = "";
 #endif
 
+       /* Chances are the current signal handler is already SIG_DFL, but let's make sure. */
+       cur_sig = PyOS_setsig(SIGINT, SIG_DFL);
        Py_Initialize();
+       python_sigint_handler = PyOS_setsig(SIGINT, cur_sig);
 
        PyType_Ready(&ConfigType);
        PyType_Ready(&PluginDataType);
index c63db52..094dd42 100644 (file)
@@ -411,7 +411,7 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data)
                pthread_mutex_unlock (&queue_lock);
 
                /* We now need the cache lock so the entry isn't updated while
-                * we make a copy of it's values */
+                * we make a copy of its values */
                pthread_mutex_lock (&cache_lock);
 
                status = c_avl_get (cache, queue_entry->filename,
index 26ebbf3..8eae6b8 100644 (file)
@@ -256,15 +256,27 @@ static int udb_result_submit (udb_result_t *r, /* {{{ */
   {
     if (r->instance_prefix == NULL)
     {
-      strjoin (vl.type_instance, sizeof (vl.type_instance),
+      int status = strjoin (vl.type_instance, sizeof (vl.type_instance),
           r_area->instances_buffer, r->instances_num, "-");
+      if (status != 0)
+      {
+        ERROR ("udb_result_submit: creating type_instance failed with status %d.",
+            status);
+        return (status);
+      }
     }
     else
     {
       char tmp[DATA_MAX_NAME_LEN];
 
-      strjoin (tmp, sizeof (tmp), r_area->instances_buffer,
+      int status = strjoin (tmp, sizeof (tmp), r_area->instances_buffer,
           r->instances_num, "-");
+      if (status != 0)
+      {
+        ERROR ("udb_result_submit: creating type_instance failed with status %d.",
+            status);
+        return (status);
+      }
       tmp[sizeof (tmp) - 1] = 0;
 
       snprintf (vl.type_instance, sizeof (vl.type_instance), "%s-%s",
index 2a1e2e5..e0b8452 100644 (file)
@@ -749,22 +749,41 @@ const char *qtype_str(int t)
            case ns_t_srv:      return ("SRV");
            case ns_t_atma:     return ("ATMA");
            case ns_t_naptr:    return ("NAPTR");
+           case ns_t_opt:      return ("OPT");
+# if __NAMESER >= 19991006
            case ns_t_kx:       return ("KX");
            case ns_t_cert:     return ("CERT");
            case ns_t_a6:       return ("A6");
            case ns_t_dname:    return ("DNAME");
            case ns_t_sink:     return ("SINK");
-           case ns_t_opt:      return ("OPT");
-# if __NAMESER >= 19991006
+           case ns_t_apl:      return ("APL");
            case ns_t_tsig:     return ("TSIG");
 # endif
+# if __NAMESER >= 20090302
+           case ns_t_ds:       return ("DS");
+           case ns_t_sshfp:    return ("SSHFP");
+           case ns_t_ipseckey: return ("IPSECKEY");
+           case ns_t_rrsig:    return ("RRSIG");
+           case ns_t_nsec:     return ("NSEC");
+           case ns_t_dnskey:   return ("DNSKEY");
+           case ns_t_dhcid:    return ("DHCID");
+           case ns_t_nsec3:    return ("NSEC3");
+           case ns_t_nsec3param: return ("NSEC3PARAM");
+           case ns_t_hip:      return ("HIP");
+           case ns_t_spf:      return ("SPF");
            case ns_t_ixfr:     return ("IXFR");
+# endif
            case ns_t_axfr:     return ("AXFR");
            case ns_t_mailb:    return ("MAILB");
            case ns_t_maila:    return ("MAILA");
            case ns_t_any:      return ("ANY");
+# if __NAMESER >= 19991006
            case ns_t_zxfr:     return ("ZXFR");
-/* #endif __NAMESER >= 19991006 */
+# endif
+# if __NAMESER >= 20090302
+           case ns_t_dlv:       return ("DLV");
+# endif
+/* #endif __NAMESER >= 19991001 */
 #elif (defined (__BIND)) && (__BIND >= 19950621)
            case T_A:           return ("A"); /* 1 ... */
            case T_NS:          return ("NS");
index 5ca7fc8..63f03da 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/utils_format_json.c
- * Copyright (C) 2009       Florian octo Forster
+ * Copyright (C) 2009-2015  Florian octo Forster
  *
  * Permission is hereby granted, free of charge, to any person obtaining a
  * copy of this software and associated documentation files (the "Software"),
 
 #include "collectd.h"
 
+#include "utils_format_json.h"
+
 #include "plugin.h"
 #include "common.h"
-
 #include "utils_cache.h"
-#include "utils_format_json.h"
+
+#if HAVE_LIBYAJL
+# include <yajl/yajl_common.h>
+# include <yajl/yajl_gen.h>
+# if HAVE_YAJL_YAJL_VERSION_H
+#  include <yajl/yajl_version.h>
+# endif
+# if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
+#  define HAVE_YAJL_V2 1
+# endif
+#endif
 
 static int json_escape_string (char *buffer, size_t buffer_size, /* {{{ */
     const char *string)
@@ -493,4 +504,215 @@ int format_json_value_list (char *buffer, /* {{{ */
         store_rates, (*ret_buffer_free) - 2));
 } /* }}} int format_json_value_list */
 
+#if HAVE_LIBYAJL
+static int json_add_string (yajl_gen g, char const *str) /* {{{ */
+{
+  if (str == NULL)
+    return (int) yajl_gen_null (g);
+
+  return (int) yajl_gen_string (g, (unsigned char const *) str, (unsigned int) strlen (str));
+} /* }}} int json_add_string */
+
+#define JSON_ADD(g, str) do {                        \
+  yajl_gen_status status = json_add_string (g, str); \
+  if (status != yajl_gen_status_ok) { return -1; }   \
+} while (0)
+
+#define JSON_ADDF(g, format, ...) do {               \
+  char *str = ssnprintf_alloc (format, __VA_ARGS__); \
+  yajl_gen_status status = json_add_string (g, str); \
+  free (str);                                        \
+  if (status != yajl_gen_status_ok) { return -1; }   \
+} while (0)
+
+static int format_json_meta (yajl_gen g, notification_meta_t *meta) /* {{{ */
+{
+  if (meta == NULL)
+    return 0;
+
+  JSON_ADD (g, meta->name);
+  switch (meta->type)
+  {
+    case NM_TYPE_STRING:
+      JSON_ADD (g, meta->nm_value.nm_string);
+      break;
+    case NM_TYPE_SIGNED_INT:
+      JSON_ADDF (g, "%"PRIi64, meta->nm_value.nm_signed_int);
+      break;
+    case NM_TYPE_UNSIGNED_INT:
+      JSON_ADDF (g, "%"PRIu64, meta->nm_value.nm_unsigned_int);
+      break;
+    case NM_TYPE_DOUBLE:
+      JSON_ADDF (g, JSON_GAUGE_FORMAT, meta->nm_value.nm_double);
+      break;
+    case NM_TYPE_BOOLEAN:
+      JSON_ADD (g, meta->nm_value.nm_boolean ? "true" : "false");
+      break;
+    default:
+      ERROR ("format_json_meta: unknown meta data type %d (name \"%s\")", meta->type, meta->name);
+      yajl_gen_null (g);
+  }
+
+  return format_json_meta (g, meta->next);
+} /* }}} int format_json_meta */
+
+static int format_time (yajl_gen g, cdtime_t t) /* {{{ */
+{
+  char buffer[RFC3339NANO_SIZE] = "";
+
+  if (rfc3339nano (buffer, sizeof (buffer), t) != 0)
+    return -1;
+
+  JSON_ADD (g, buffer);
+  return 0;
+} /* }}} int format_time */
+
+static int format_alert (yajl_gen g, notification_t const *n) /* {{{ */
+{
+  yajl_gen_array_open (g);
+  yajl_gen_map_open (g); /* BEGIN alert */
+
+  /*
+   * labels
+   */
+  JSON_ADD (g, "labels");
+  yajl_gen_map_open (g); /* BEGIN labels */
+
+  JSON_ADD (g, "alertname");
+  if (strncmp (n->plugin, n->type, strlen (n->plugin)) == 0)
+    JSON_ADDF (g, "collectd_%s", n->type);
+  else
+    JSON_ADDF (g, "collectd_%s_%s", n->plugin, n->type);
+
+  JSON_ADD (g, "instance");
+  JSON_ADD (g, n->host);
+
+  /* mangling of plugin instance and type instance into labels is copied from
+   * the Prometheus collectd exporter. */
+  if (strlen (n->plugin_instance) > 0)
+  {
+    JSON_ADD (g, n->plugin);
+    JSON_ADD (g, n->plugin_instance);
+  }
+  if (strlen (n->type_instance) > 0)
+  {
+    if (strlen (n->plugin_instance) > 0)
+      JSON_ADD (g, "type");
+    else
+      JSON_ADD (g, n->plugin);
+    JSON_ADD (g, n->type_instance);
+  }
+
+  JSON_ADD (g, "severity");
+  JSON_ADD (g, (n->severity == NOTIF_FAILURE) ? "FAILURE"
+                   : (n->severity == NOTIF_WARNING) ? "WARNING"
+                   : (n->severity == NOTIF_OKAY) ? "OKAY"
+                   : "UNKNOWN");
+
+  JSON_ADD (g, "service");
+  JSON_ADD (g, "collectd");
+
+  yajl_gen_map_close (g); /* END labels */
+
+  /*
+   * annotations
+   */
+  JSON_ADD (g, "annotations");
+  yajl_gen_map_open (g); /* BEGIN annotations */
+
+  JSON_ADD (g, "summary");
+  JSON_ADD (g, n->message);
+
+  if (format_json_meta (g, n->meta) != 0)
+    return -1;
+
+  yajl_gen_map_close (g); /* END annotations */
+
+  JSON_ADD (g, "startsAt");
+  format_time (g, n->time);
+
+  yajl_gen_map_close (g); /* END alert */
+  yajl_gen_array_close (g);
+
+  return 0;
+} /* }}} format_alert */
+
+/*
+ * Format (prometheus/alertmanager v1):
+ *
+ * [{
+ *   "labels": {
+ *     "alertname": "collectd_cpu",
+ *     "instance":  "host.example.com",
+ *     "severity":  "FAILURE",
+ *     "service":   "collectd",
+ *     "cpu":       "0",
+ *     "type":      "wait"
+ *   },
+ *   "annotations": {
+ *     "summary": "...",
+ *     // meta
+ *   },
+ *   "startsAt": <rfc3339 time>,
+ *   "endsAt": <rfc3339 time>, // not used
+ * }]
+ */
+int format_json_notification (char *buffer, size_t buffer_size, /* {{{ */
+                              notification_t const *n)
+{
+  yajl_gen g;
+  unsigned char const *out;
+#if HAVE_YAJL_V2
+  size_t unused_out_len;
+#else
+  unsigned int unused_out_len;
+#endif
+
+  if ((buffer == NULL) || (n == NULL))
+    return EINVAL;
+
+#if HAVE_YAJL_V2
+  g = yajl_gen_alloc (NULL);
+  if (g == NULL)
+    return -1;
+# if COLLECT_DEBUG
+  yajl_gen_config (g, yajl_gen_beautify);
+  yajl_gen_config (g, yajl_gen_validate_utf8);
+# endif
+
+#else /* !HAVE_YAJL_V2 */
+  yajl_gen_config conf = { 0 };
+# if COLLECT_DEBUG
+  conf.beautify = 1;
+  conf.indentString = "  ";
+# endif
+  g = yajl_gen_alloc (&conf, NULL);
+  if (g == NULL)
+    return -1;
+#endif
+
+  if (format_alert (g, n) != 0)
+  {
+    yajl_gen_clear (g);
+    yajl_gen_free (g);
+    return -1;
+  }
+
+  /* copy to output buffer */
+  yajl_gen_get_buf (g, &out, &unused_out_len);
+  sstrncpy (buffer, (void *) out, buffer_size);
+
+  yajl_gen_clear (g);
+  yajl_gen_free (g);
+  return 0;
+} /* }}} format_json_notification */
+#else
+int format_json_notification (char *buffer, size_t buffer_size, /* {{{ */
+                              notification_t const *n)
+{
+  ERROR ("format_json_notification: Not available (requires libyajl).");
+  return ENOTSUP;
+} /* }}} int format_json_notification */
+#endif
+
 /* vim: set sw=2 sts=2 et fdm=marker : */
index 3584dec..a3eda30 100644 (file)
@@ -42,5 +42,7 @@ int format_json_value_list (char *buffer,
     const data_set_t *ds, const value_list_t *vl, int store_rates);
 int format_json_finalize (char *buffer,
     size_t *ret_buffer_fill, size_t *ret_buffer_free);
+int format_json_notification (char *buffer, size_t buffer_size,
+    notification_t const *n);
 
 #endif /* UTILS_FORMAT_JSON_H */
diff --git a/src/utils_format_json_test.c b/src/utils_format_json_test.c
new file mode 100644 (file)
index 0000000..aa275fd
--- /dev/null
@@ -0,0 +1,169 @@
+/**
+ * collectd - src/utils_format_json_test.c
+ * Copyright (C) 2015       Florian octo Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ */
+
+#include "testing.h"
+#include "collectd.h"
+#include "utils_format_json.h"
+#include "common.h" /* for STATIC_ARRAY_SIZE */
+
+#include <yajl/yajl_common.h>
+#include <yajl/yajl_parse.h>
+#if HAVE_YAJL_YAJL_VERSION_H
+# include <yajl/yajl_version.h>
+#endif
+#if YAJL_MAJOR > 1
+# define HAVE_YAJL_V2 1
+#endif
+
+typedef struct
+{
+  char const *key;
+  char const *value;
+} label_t;
+
+typedef struct
+{
+  label_t *expected_labels;
+  size_t   expected_labels_num;
+
+  label_t *current_label;
+} test_case_t;
+
+#if HAVE_YAJL_V2
+static int test_map_key (void *ctx, unsigned char const *key, size_t key_len)
+#else
+static int test_map_key (void *ctx, unsigned char const *key, unsigned int key_len)
+#endif
+{
+  test_case_t *c = ctx;
+  size_t i;
+
+  c->current_label = NULL;
+  for (i = 0; i < c->expected_labels_num; i++)
+  {
+    label_t *l = c->expected_labels + i;
+    if ((strlen (l->key) == key_len)
+        && (strncmp (l->key, (char const *) key, key_len) == 0))
+    {
+      c->current_label = l;
+      break;
+    }
+  }
+
+  return 1; /* continue */
+}
+
+static int expect_label (char const *name, char const *got, char const *want)
+{
+  _Bool ok = (strcmp (got, want) == 0);
+  char msg[1024];
+
+  if (ok)
+    snprintf (msg, sizeof (msg), "label[\"%s\"] = \"%s\"", name, got);
+  else
+    snprintf (msg, sizeof (msg), "label[\"%s\"] = \"%s\", want \"%s\"", name, got, want);
+
+  OK1 (ok, msg);
+  return 0;
+}
+
+#if HAVE_YAJL_V2
+static int test_string (void *ctx, unsigned char const *value, size_t value_len)
+#else
+static int test_string (void *ctx, unsigned char const *value, unsigned int value_len)
+#endif
+{
+  test_case_t *c = ctx;
+
+  if (c->current_label != NULL)
+  {
+    label_t *l = c->current_label;
+    char *got;
+    int status;
+
+    got = malloc (value_len + 1);
+    memmove (got, value, value_len);
+    got[value_len] = 0;
+
+    status = expect_label (l->key, got, l->value);
+
+    free (got);
+
+    if (status != 0)
+      return 0; /* abort */
+  }
+
+  return 1; /* continue */
+}
+
+static int expect_json_labels (char *json, label_t *labels, size_t labels_num)
+{
+  yajl_callbacks funcs = {
+    .yajl_string = test_string,
+    .yajl_map_key = test_map_key,
+  };
+
+  test_case_t c = { labels, labels_num, NULL };
+
+  yajl_handle hndl;
+#if HAVE_YAJL_V2
+  CHECK_NOT_NULL (hndl = yajl_alloc (&funcs, /* alloc = */ NULL, &c));
+#else
+  CHECK_NOT_NULL (hndl = yajl_alloc (&funcs, /* config = */ NULL, /* alloc = */ NULL, &c));
+#endif
+  OK (yajl_parse (hndl, (unsigned char *) json, strlen (json)) == yajl_status_ok);
+
+  yajl_free (hndl);
+  return 0;
+}
+
+DEF_TEST(notification)
+{
+  label_t labels[] = {
+    {"summary", "this is a message"},
+    {"alertname", "collectd_unit_test"},
+    {"instance", "example.com"},
+    {"service", "collectd"},
+    {"unit", "case"},
+  };
+
+  /* 1448284606.125 ^= 1555083754651779072 */
+  notification_t n = { NOTIF_WARNING, 1555083754651779072ULL, "this is a message",
+    "example.com", "unit", "", "test", "case", NULL };
+
+  char got[1024];
+  CHECK_ZERO (format_json_notification (got, sizeof (got), &n));
+  // printf ("got = \"%s\";\n", got);
+
+  return expect_json_labels (got, labels, STATIC_ARRAY_SIZE (labels));
+}
+
+int main (void)
+{
+  RUN_TEST(notification);
+
+  END_TEST;
+}
index a12a7f5..d8d8c34 100644 (file)
@@ -65,6 +65,8 @@ struct wh_callback_s
 #define WH_FORMAT_JSON     1
 #define WH_FORMAT_KAIROSDB 2
         int format;
+        _Bool send_metrics;
+        _Bool send_notifications;
 
         CURL *curl;
         struct curl_slist *headers;
@@ -111,11 +113,12 @@ static void wh_reset_buffer (wh_callback_t *cb)  /* {{{ */
         }
 } /* }}} wh_reset_buffer */
 
-static int wh_send_buffer (wh_callback_t *cb) /* {{{ */
+/* must hold cb->send_lock when calling */
+static int wh_post_nolock (wh_callback_t *cb, char const *data) /* {{{ */
 {
         int status = 0;
 
-        curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer);
+        curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, data);
         status = curl_easy_perform (cb->curl);
 
         wh_log_http_error (cb);
@@ -127,7 +130,7 @@ static int wh_send_buffer (wh_callback_t *cb) /* {{{ */
                                 status, cb->curl_errbuf);
         }
         return (status);
-} /* }}} wh_send_buffer */
+} /* }}} wh_post_nolock */
 
 static int wh_callback_init (wh_callback_t *cb) /* {{{ */
 {
@@ -247,7 +250,7 @@ static int wh_flush_nolock (cdtime_t timeout, wh_callback_t *cb) /* {{{ */
                         return (0);
                 }
 
-                status = wh_send_buffer (cb);
+                status = wh_post_nolock (cb, cb->send_buffer);
                 wh_reset_buffer (cb);
         }
         else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
@@ -269,7 +272,7 @@ static int wh_flush_nolock (cdtime_t timeout, wh_callback_t *cb) /* {{{ */
                         return (status);
                 }
 
-                status = wh_send_buffer (cb);
+                status = wh_post_nolock (cb, cb->send_buffer);
                 wh_reset_buffer (cb);
         }
         else
@@ -297,15 +300,11 @@ static int wh_flush (cdtime_t timeout, /* {{{ */
 
         pthread_mutex_lock (&cb->send_lock);
 
-        if (cb->curl == NULL)
+        if (wh_callback_init (cb) != 0)
         {
-                status = wh_callback_init (cb);
-                if (status != 0)
-                {
-                        ERROR ("write_http plugin: wh_callback_init failed.");
-                        pthread_mutex_unlock (&cb->send_lock);
-                        return (-1);
-                }
+                ERROR ("write_http plugin: wh_callback_init failed.");
+                pthread_mutex_unlock (&cb->send_lock);
+                return (-1);
         }
 
         status = wh_flush_nolock (timeout, cb);
@@ -398,16 +397,11 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{
         }
 
         pthread_mutex_lock (&cb->send_lock);
-
-        if (cb->curl == NULL)
+        if (wh_callback_init (cb) != 0)
         {
-                status = wh_callback_init (cb);
-                if (status != 0)
-                {
-                        ERROR ("write_http plugin: wh_callback_init failed.");
-                        pthread_mutex_unlock (&cb->send_lock);
-                        return (-1);
-                }
+                ERROR ("write_http plugin: wh_callback_init failed.");
+                pthread_mutex_unlock (&cb->send_lock);
+                return (-1);
         }
 
         if (command_len >= cb->send_buffer_free)
@@ -446,16 +440,11 @@ static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ *
         int status;
 
         pthread_mutex_lock (&cb->send_lock);
-
-        if (cb->curl == NULL)
+        if (wh_callback_init (cb) != 0)
         {
-                status = wh_callback_init (cb);
-                if (status != 0)
-                {
-                        ERROR ("write_http plugin: wh_callback_init failed.");
-                        pthread_mutex_unlock (&cb->send_lock);
-                        return (-1);
-                }
+                ERROR ("write_http plugin: wh_callback_init failed.");
+                pthread_mutex_unlock (&cb->send_lock);
+                return (-1);
         }
 
         status = format_json_value_list (cb->send_buffer,
@@ -558,6 +547,7 @@ static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
                 return (-EINVAL);
 
         cb = user_data->data;
+        assert (cb->send_metrics);
 
         switch(cb->format) {
             case WH_FORMAT_JSON:
@@ -573,6 +563,39 @@ static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
         return (status);
 } /* }}} int wh_write */
 
+static int wh_notify (notification_t const *n, user_data_t *ud) /* {{{ */
+{
+        wh_callback_t *cb;
+        char alert[4096];
+        int status;
+
+        if ((ud == NULL) || (ud->data == NULL))
+                return (EINVAL);
+
+        cb = ud->data;
+        assert (cb->send_notifications);
+
+        status = format_json_notification (alert, sizeof (alert), n);
+        if (status != 0)
+        {
+                ERROR ("write_http plugin: formatting notification failed");
+                return status;
+        }
+
+        pthread_mutex_lock (&cb->send_lock);
+        if (wh_callback_init (cb) != 0)
+        {
+                ERROR ("write_http plugin: wh_callback_init failed.");
+                pthread_mutex_unlock (&cb->send_lock);
+                return (-1);
+        }
+
+        status = wh_post_nolock (cb, alert);
+        pthread_mutex_unlock (&cb->send_lock);
+
+        return (status);
+} /* }}} int wh_notify */
+
 static int config_set_format (wh_callback_t *cb, /* {{{ */
                 oconfig_item_t *ci)
 {
@@ -644,7 +667,8 @@ static int wh_config_node (oconfig_item_t *ci) /* {{{ */
         cb->timeout = 0;
         cb->log_http_error = 0;
         cb->headers = NULL;
-
+        cb->send_metrics = 1;
+        cb->send_notifications = 0;
 
         pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
 
@@ -713,6 +737,10 @@ static int wh_config_node (oconfig_item_t *ci) /* {{{ */
                 }
                 else if (strcasecmp ("Format", child->key) == 0)
                         status = config_set_format (cb, child);
+                else if (strcasecmp ("Metrics", child->key) == 0)
+                        cf_util_get_boolean (child, &cb->send_metrics);
+                else if (strcasecmp ("Notifications", child->key) == 0)
+                        cf_util_get_boolean (child, &cb->send_notifications);
                 else if (strcasecmp ("StoreRates", child->key) == 0)
                         status = cf_util_get_boolean (child, &cb->store_rates);
                 else if (strcasecmp ("BufferSize", child->key) == 0)
@@ -750,6 +778,14 @@ static int wh_config_node (oconfig_item_t *ci) /* {{{ */
                 return (-1);
         }
 
+        if (!cb->send_metrics && !cb->send_notifications)
+        {
+                ERROR ("write_http plugin: Neither metrics nor notifications "
+                       "are enabled for \"%s\".", cb->name);
+                wh_callback_free (cb);
+                return (-1);
+        }
+
         if (cb->low_speed_limit > 0)
                 cb->low_speed_time = CDTIME_T_TO_TIME_T(plugin_get_interval());
 
@@ -781,7 +817,20 @@ static int wh_config_node (oconfig_item_t *ci) /* {{{ */
         plugin_register_flush (callback_name, wh_flush, &user_data);
 
         user_data.free_func = wh_callback_free;
-        plugin_register_write (callback_name, wh_write, &user_data);
+
+        if (cb->send_metrics)
+        {
+                plugin_register_write (callback_name, wh_write, &user_data);
+                user_data.free_func = NULL;
+
+                plugin_register_flush (callback_name, wh_flush, &user_data);
+        }
+
+        if (cb->send_notifications)
+        {
+                plugin_register_notification (callback_name, wh_notify, &user_data);
+                user_data.free_func = NULL;
+        }
 
         return (0);
 } /* }}} int wh_config_node */