Merge branch 'collectd-4.10' into collectd-5.0
[collectd.git] / src / network.c
index 457637b..5fed1b1 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/network.c
- * Copyright (C) 2005-2009  Florian octo Forster
+ * Copyright (C) 2005-2010  Florian octo Forster
  * Copyright (C) 2009       Aman Gupta
  *
  * This program is free software; you can redistribute it and/or modify it
@@ -31,6 +31,7 @@
 #include "utils_fbhash.h"
 #include "utils_avltree.h"
 #include "utils_cache.h"
+#include "utils_complain.h"
 
 #include "network.h"
 
@@ -258,7 +259,8 @@ typedef struct receive_list_entry_s receive_list_entry_t;
  * Private variables
  */
 static int network_config_ttl = 0;
-static size_t network_config_packet_size = 1024;
+/* Ethernet - (IPv6 + UDP) = 1500 - (40 + 8) = 1452 */
+static size_t network_config_packet_size = 1452;
 static int network_config_forward = 0;
 static int network_config_stats = 0;
 
@@ -295,14 +297,14 @@ static pthread_mutex_t  send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
  * example). Only if neither is true, the stats_lock is acquired. The counters
  * are always read without holding a lock in the hope that writing 8 bytes to
  * memory is an atomic operation. */
-static uint64_t stats_octets_rx  = 0;
-static uint64_t stats_octets_tx  = 0;
-static uint64_t stats_packets_rx = 0;
-static uint64_t stats_packets_tx = 0;
-static uint64_t stats_values_dispatched = 0;
-static uint64_t stats_values_not_dispatched = 0;
-static uint64_t stats_values_sent = 0;
-static uint64_t stats_values_not_sent = 0;
+static derive_t stats_octets_rx  = 0;
+static derive_t stats_octets_tx  = 0;
+static derive_t stats_packets_rx = 0;
+static derive_t stats_packets_tx = 0;
+static derive_t stats_values_dispatched = 0;
+static derive_t stats_values_not_dispatched = 0;
+static derive_t stats_values_sent = 0;
+static derive_t stats_values_not_sent = 0;
 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /*
@@ -319,30 +321,30 @@ static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */
   /* This is a value we already sent. Don't allow it to be received again in
    * order to avoid looping. */
   if ((status == 0) && (time_sent >= ((uint64_t) vl->time)))
-    return (false);
+    return (0);
 
-  return (true);
+  return (1);
 } /* }}} _Bool check_receive_okay */
 
 static _Bool check_send_okay (const value_list_t *vl) /* {{{ */
 {
-  _Bool received = false;
+  _Bool received = 0;
   int status;
 
   if (network_config_forward != 0)
-    return (true);
+    return (1);
 
   if (vl->meta == NULL)
-    return (true);
+    return (1);
 
   status = meta_data_get_boolean (vl->meta, "network:received", &received);
   if (status == -ENOENT)
-    return (true);
+    return (1);
   else if (status != 0)
   {
     ERROR ("network plugin: check_send_okay: meta_data_get_boolean failed "
        "with status %i.", status);
-    return (true);
+    return (1);
   }
 
   /* By default, only *send* value lists that were not *received* by the
@@ -350,6 +352,43 @@ static _Bool check_send_okay (const value_list_t *vl) /* {{{ */
   return (!received);
 } /* }}} _Bool check_send_okay */
 
+static _Bool check_notify_received (const notification_t *n) /* {{{ */
+{
+  notification_meta_t *ptr;
+
+  for (ptr = n->meta; ptr != NULL; ptr = ptr->next)
+    if ((strcmp ("network:received", ptr->name) == 0)
+        && (ptr->type == NM_TYPE_BOOLEAN))
+      return ((_Bool) ptr->nm_value.nm_boolean);
+
+  return (0);
+} /* }}} _Bool check_notify_received */
+
+static _Bool check_send_notify_okay (const notification_t *n) /* {{{ */
+{
+  static c_complain_t complain_forwarding = C_COMPLAIN_INIT_STATIC;
+  _Bool received = 0;
+
+  if (n->meta == NULL)
+    return (1);
+
+  received = check_notify_received (n);
+
+  if (network_config_forward && received)
+  {
+    c_complain_once (LOG_ERR, &complain_forwarding,
+        "network plugin: A notification has been received via the network "
+        "forwarding if enabled. Forwarding of notifications is currently "
+        "not supported, because there is not loop-deteciton available. "
+        "Please contact the collectd mailing list if you need this "
+        "feature.");
+  }
+
+  /* By default, only *send* value lists that were not *received* by the
+   * network plugin. */
+  return (!received);
+} /* }}} _Bool check_send_notify_okay */
+
 static int network_dispatch_values (value_list_t *vl, /* {{{ */
     const char *username)
 {
@@ -383,7 +422,7 @@ static int network_dispatch_values (value_list_t *vl, /* {{{ */
     return (-ENOMEM);
   }
 
-  status = meta_data_add_boolean (vl->meta, "network:received", true);
+  status = meta_data_add_boolean (vl->meta, "network:received", 1);
   if (status != 0)
   {
     ERROR ("network plugin: meta_data_add_boolean failed.");
@@ -404,7 +443,7 @@ static int network_dispatch_values (value_list_t *vl, /* {{{ */
     }
   }
 
-  plugin_dispatch_values (vl);
+  plugin_dispatch_values_secure (vl);
   stats_values_dispatched++;
 
   meta_data_destroy (vl->meta);
@@ -413,6 +452,29 @@ static int network_dispatch_values (value_list_t *vl, /* {{{ */
   return (0);
 } /* }}} int network_dispatch_values */
 
+static int network_dispatch_notification (notification_t *n) /* {{{ */
+{
+  int status;
+
+  assert (n->meta == NULL);
+
+  status = plugin_notification_meta_add_boolean (n, "network:received", 1);
+  if (status != 0)
+  {
+    ERROR ("network plugin: plugin_notification_meta_add_boolean failed.");
+    plugin_notification_meta_free (n->meta);
+    n->meta = NULL;
+    return (status);
+  }
+
+  status = plugin_dispatch_notification (n);
+
+  plugin_notification_meta_free (n->meta);
+  n->meta = NULL;
+
+  return (status);
+} /* }}} int network_dispatch_notification */
+
 #if HAVE_LIBGCRYPT
 static gcry_cipher_hd_t network_get_aes256_cypher (sockent_t *se, /* {{{ */
     const void *iv, size_t iv_size, const char *username)
@@ -703,7 +765,7 @@ static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
 
        exp_size = 3 * sizeof (uint16_t)
                + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
-       if ((buffer_len < 0) || (buffer_len < exp_size))
+       if (buffer_len < exp_size)
        {
                WARNING ("network plugin: parse_part_values: "
                                "Packet too short: "
@@ -787,9 +849,8 @@ static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
        size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
 
        uint16_t pkg_length;
-       uint16_t pkg_type;
 
-       if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
+       if (buffer_len < exp_size)
        {
                WARNING ("network plugin: parse_part_number: "
                                "Packet too short: "
@@ -801,7 +862,7 @@ static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
 
        memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
        buffer += sizeof (tmp16);
-       pkg_type = ntohs (tmp16);
+       /* pkg_type = ntohs (tmp16); */
 
        memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
        buffer += sizeof (tmp16);
@@ -827,9 +888,8 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
        size_t header_size = 2 * sizeof (uint16_t);
 
        uint16_t pkg_length;
-       uint16_t pkg_type;
 
-       if ((buffer_len < 0) || (buffer_len < header_size))
+       if (buffer_len < header_size)
        {
                WARNING ("network plugin: parse_part_string: "
                                "Packet too short: "
@@ -841,7 +901,7 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
 
        memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
        buffer += sizeof (tmp16);
-       pkg_type = ntohs (tmp16);
+       /* pkg_type = ntohs (tmp16); */
 
        memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
        buffer += sizeof (tmp16);
@@ -917,6 +977,8 @@ static int parse_packet (sockent_t *se,
 static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */
     void **ret_buffer, size_t *ret_buffer_len, int flags)
 {
+  static c_complain_t complain_no_users = C_COMPLAIN_INIT_STATIC;
+
   char *buffer;
   size_t buffer_len;
   size_t buffer_offset;
@@ -938,8 +1000,9 @@ static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */
 
   if (se->data.server.userdb == NULL)
   {
-    NOTICE ("network plugin: Received signed network packet but can't verify "
-        "it because no user DB has been configured. Will accept it.");
+    c_complain (LOG_NOTICE, &complain_no_users,
+        "network plugin: Received signed network packet but can't verify it "
+        "because no user DB has been configured. Will accept it.");
     return (0);
   }
 
@@ -1377,8 +1440,19 @@ static int parse_packet (sockent_t *se, /* {{{ */
                                        &tmp);
                        if (status == 0)
                        {
-                               vl.time = (time_t) tmp;
-                               n.time = (time_t) tmp;
+                               vl.time = TIME_T_TO_CDTIME_T (tmp);
+                               n.time  = TIME_T_TO_CDTIME_T (tmp);
+                       }
+               }
+               else if (pkg_type == TYPE_TIME_HR)
+               {
+                       uint64_t tmp = 0;
+                       status = parse_part_number (&buffer, &buffer_size,
+                                       &tmp);
+                       if (status == 0)
+                       {
+                               vl.time = (cdtime_t) tmp;
+                               n.time  = (cdtime_t) tmp;
                        }
                }
                else if (pkg_type == TYPE_INTERVAL)
@@ -1387,7 +1461,15 @@ static int parse_packet (sockent_t *se, /* {{{ */
                        status = parse_part_number (&buffer, &buffer_size,
                                        &tmp);
                        if (status == 0)
-                               vl.interval = (int) tmp;
+                               vl.interval = TIME_T_TO_CDTIME_T (tmp);
+               }
+               else if (pkg_type == TYPE_INTERVAL_HR)
+               {
+                       uint64_t tmp = 0;
+                       status = parse_part_number (&buffer, &buffer_size,
+                                       &tmp);
+                       if (status == 0)
+                               vl.interval = (cdtime_t) tmp;
                }
                else if (pkg_type == TYPE_HOST)
                {
@@ -1462,7 +1544,7 @@ static int parse_packet (sockent_t *se, /* {{{ */
                        }
                        else
                        {
-                               plugin_dispatch_notification (&n);
+                               network_dispatch_notification (&n);
                        }
                }
                else if (pkg_type == TYPE_SEVERITY)
@@ -1681,9 +1763,9 @@ static int network_set_interface (const sockent_t *se, const struct addrinfo *ai
        }
 
        /* else: Not a multicast interface. */
-#if defined(HAVE_IF_INDEXTONAME) && HAVE_IF_INDEXTONAME && defined(SO_BINDTODEVICE)
        if (se->interface != 0)
        {
+#if defined(HAVE_IF_INDEXTONAME) && HAVE_IF_INDEXTONAME && defined(SO_BINDTODEVICE)
                char interface_name[IFNAMSIZ];
 
                if (if_indextoname (se->interface, interface_name) == NULL)
@@ -1700,20 +1782,21 @@ static int network_set_interface (const sockent_t *se, const struct addrinfo *ai
                                        sstrerror (errno, errbuf, sizeof (errbuf)));
                        return (-1);
                }
-       }
 /* #endif HAVE_IF_INDEXTONAME && SO_BINDTODEVICE */
 
 #else
-       WARNING ("network plugin: Cannot set the interface on a unicast "
+               WARNING ("network plugin: Cannot set the interface on a unicast "
                        "socket because "
 # if !defined(SO_BINDTODEVICE)
-                       "the the \"SO_BINDTODEVICE\" socket option "
+                       "the \"SO_BINDTODEVICE\" socket option "
 # else
                        "the \"if_indextoname\" function "
 # endif
                        "is not available on your system.");
 #endif
 
+       }
+
        return (0);
 } /* }}} network_set_interface */
 
@@ -2583,7 +2666,7 @@ static int add_to_buffer (char *buffer, int buffer_size, /* {{{ */
 
        if (vl_def->time != vl->time)
        {
-               if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
+               if (write_part_number (&buffer, &buffer_size, TYPE_TIME_HR,
                                        (uint64_t) vl->time))
                        return (-1);
                vl_def->time = vl->time;
@@ -2591,7 +2674,7 @@ static int add_to_buffer (char *buffer, int buffer_size, /* {{{ */
 
        if (vl_def->interval != vl->interval)
        {
-               if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL,
+               if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL_HR,
                                        (uint64_t) vl->interval))
                        return (-1);
                vl_def->interval = vl->interval;
@@ -3053,8 +3136,6 @@ static int network_config (oconfig_item_t *ci) /* {{{ */
       network_config_set_boolean (child, &network_config_forward);
     else if (strcasecmp ("ReportStats", child->key) == 0)
       network_config_set_boolean (child, &network_config_stats);
-    else if (strcasecmp ("CacheFlush", child->key) == 0)
-      /* no op for backwards compatibility only */;
     else
     {
       WARNING ("network plugin: Option `%s' is not allowed here.",
@@ -3066,17 +3147,19 @@ static int network_config (oconfig_item_t *ci) /* {{{ */
 } /* }}} int network_config */
 
 static int network_notification (const notification_t *n,
-               user_data_t __attribute__((unused)) *user_data)
+    user_data_t __attribute__((unused)) *user_data)
 {
   char  buffer[network_config_packet_size];
   char *buffer_ptr = buffer;
   int   buffer_free = sizeof (buffer);
   int   status;
 
-  memset (buffer, '\0', sizeof (buffer));
+  if (!check_send_notify_okay (n))
+    return (0);
 
+  memset (buffer, 0, sizeof (buffer));
 
-  status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME,
+  status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME_HR,
       (uint64_t) n->time);
   if (status != 0)
     return (-1);
@@ -3089,7 +3172,7 @@ static int network_notification (const notification_t *n,
   if (strlen (n->host) > 0)
   {
     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST,
-       n->host, strlen (n->host));
+        n->host, strlen (n->host));
     if (status != 0)
       return (-1);
   }
@@ -3097,7 +3180,7 @@ static int network_notification (const notification_t *n,
   if (strlen (n->plugin) > 0)
   {
     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN,
-       n->plugin, strlen (n->plugin));
+        n->plugin, strlen (n->plugin));
     if (status != 0)
       return (-1);
   }
@@ -3105,8 +3188,8 @@ static int network_notification (const notification_t *n,
   if (strlen (n->plugin_instance) > 0)
   {
     status = write_part_string (&buffer_ptr, &buffer_free,
-       TYPE_PLUGIN_INSTANCE,
-       n->plugin_instance, strlen (n->plugin_instance));
+        TYPE_PLUGIN_INSTANCE,
+        n->plugin_instance, strlen (n->plugin_instance));
     if (status != 0)
       return (-1);
   }
@@ -3114,7 +3197,7 @@ static int network_notification (const notification_t *n,
   if (strlen (n->type) > 0)
   {
     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE,
-       n->type, strlen (n->type));
+        n->type, strlen (n->type));
     if (status != 0)
       return (-1);
   }
@@ -3122,7 +3205,7 @@ static int network_notification (const notification_t *n,
   if (strlen (n->type_instance) > 0)
   {
     status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE,
-       n->type_instance, strlen (n->type_instance));
+        n->type_instance, strlen (n->type_instance));
     if (status != 0)
       return (-1);
   }
@@ -3181,15 +3264,15 @@ static int network_shutdown (void)
 
 static int network_stats_read (void) /* {{{ */
 {
-       uint64_t copy_octets_rx;
-       uint64_t copy_octets_tx;
-       uint64_t copy_packets_rx;
-       uint64_t copy_packets_tx;
-       uint64_t copy_values_dispatched;
-       uint64_t copy_values_not_dispatched;
-       uint64_t copy_values_sent;
-       uint64_t copy_values_not_sent;
-       uint64_t copy_receive_list_length;
+       derive_t copy_octets_rx;
+       derive_t copy_octets_tx;
+       derive_t copy_packets_rx;
+       derive_t copy_packets_tx;
+       derive_t copy_values_dispatched;
+       derive_t copy_values_not_dispatched;
+       derive_t copy_values_sent;
+       derive_t copy_values_not_sent;
+       derive_t copy_receive_list_length;
        value_list_t vl = VALUE_LIST_INIT;
        value_t values[2];
 
@@ -3212,16 +3295,16 @@ static int network_stats_read (void) /* {{{ */
        sstrncpy (vl.plugin, "network", sizeof (vl.plugin));
 
        /* Octets received / sent */
-       vl.values[0].counter = (counter_t) copy_octets_rx;
-       vl.values[1].counter = (counter_t) copy_octets_tx;
+       vl.values[0].derive = (derive_t) copy_octets_rx;
+       vl.values[1].derive = (derive_t) copy_octets_tx;
        sstrncpy (vl.type, "if_octets", sizeof (vl.type));
-       plugin_dispatch_values (&vl);
+       plugin_dispatch_values_secure (&vl);
 
        /* Packets received / send */
-       vl.values[0].counter = (counter_t) copy_packets_rx;
-       vl.values[1].counter = (counter_t) copy_packets_tx;
+       vl.values[0].derive = (derive_t) copy_packets_rx;
+       vl.values[1].derive = (derive_t) copy_packets_tx;
        sstrncpy (vl.type, "if_packets", sizeof (vl.type));
-       plugin_dispatch_values (&vl);
+       plugin_dispatch_values_secure (&vl);
 
        /* Values (not) dispatched and (not) send */
        sstrncpy (vl.type, "total_values", sizeof (vl.type));
@@ -3230,41 +3313,41 @@ static int network_stats_read (void) /* {{{ */
        vl.values[0].derive = (derive_t) copy_values_dispatched;
        sstrncpy (vl.type_instance, "dispatch-accepted",
                        sizeof (vl.type_instance));
-       plugin_dispatch_values (&vl);
+       plugin_dispatch_values_secure (&vl);
 
        vl.values[0].derive = (derive_t) copy_values_not_dispatched;
        sstrncpy (vl.type_instance, "dispatch-rejected",
                        sizeof (vl.type_instance));
-       plugin_dispatch_values (&vl);
+       plugin_dispatch_values_secure (&vl);
 
        vl.values[0].derive = (derive_t) copy_values_sent;
        sstrncpy (vl.type_instance, "send-accepted",
                        sizeof (vl.type_instance));
-       plugin_dispatch_values (&vl);
+       plugin_dispatch_values_secure (&vl);
 
        vl.values[0].derive = (derive_t) copy_values_not_sent;
        sstrncpy (vl.type_instance, "send-rejected",
                        sizeof (vl.type_instance));
-       plugin_dispatch_values (&vl);
+       plugin_dispatch_values_secure (&vl);
 
        /* Receive queue length */
        vl.values[0].gauge = (gauge_t) copy_receive_list_length;
        sstrncpy (vl.type, "queue_length", sizeof (vl.type));
        vl.type_instance[0] = 0;
-       plugin_dispatch_values (&vl);
+       plugin_dispatch_values_secure (&vl);
 
        return (0);
 } /* }}} int network_stats_read */
 
 static int network_init (void)
 {
-       static _Bool have_init = false;
+       static _Bool have_init = 0;
 
        /* Check if we were already initialized. If so, just return - there's
         * nothing more to do (for now, that is). */
        if (have_init)
                return (0);
-       have_init = true;
+       have_init = 1;
 
 #if HAVE_LIBGCRYPT
        gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
@@ -3350,9 +3433,9 @@ static int network_init (void)
  * just send the buffer if `flush'  is called - if the requested value was in
  * there, good. If not, well, then there is nothing to flush.. -octo
  */
-static int network_flush (int timeout,
-               const char __attribute__((unused)) *identifier,
-               user_data_t __attribute__((unused)) *user_data)
+static int network_flush (__attribute__((unused)) cdtime_t timeout,
+               __attribute__((unused)) const char *identifier,
+               __attribute__((unused)) user_data_t *user_data)
 {
        pthread_mutex_lock (&send_buffer_lock);