Merge branch 'collectd-4.2'
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 23 Jan 2008 19:54:10 +0000 (20:54 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 23 Jan 2008 19:54:10 +0000 (20:54 +0100)
1  2 
src/network.c

diff --combined src/network.c
@@@ -1,6 -1,6 +1,6 @@@
  /**
   * collectd - src/network.c
 - * Copyright (C) 2005-2007  Florian octo Forster
 + * Copyright (C) 2005-2008  Florian octo Forster
   *
   * This program is free software; you can redistribute it and/or modify it
   * under the terms of the GNU General Public License as published by the
@@@ -136,6 -136,14 +136,14 @@@ struct part_values_
  };
  typedef struct part_values_s part_values_t;
  
+ struct receive_list_entry_s
+ {
+   char data[BUFF_SIZE];
+   int  data_len;
+   struct receive_list_entry_s *next;
+ };
+ typedef struct receive_list_entry_s receive_list_entry_t;
  /*
   * Private variables
   */
@@@ -154,10 -162,17 +162,17 @@@ static int network_config_forward = 0
  
  static sockent_t *sending_sockets = NULL;
  
+ static receive_list_entry_t *receive_list_head = NULL;
+ static receive_list_entry_t *receive_list_tail = NULL;
+ static pthread_mutex_t       receive_list_lock = PTHREAD_MUTEX_INITIALIZER;
+ static pthread_cond_t        receive_list_cond = PTHREAD_COND_INITIALIZER;
  static struct pollfd *listen_sockets = NULL;
  static int listen_sockets_num = 0;
- static pthread_t listen_thread = 0;
  static int listen_loop = 0;
+ static pthread_t receive_thread_id = 0;
+ static pthread_t dispatch_thread_id = 0;
  
  static char         send_buffer[BUFF_SIZE];
  static char        *send_buffer_ptr;
@@@ -474,8 -489,7 +489,8 @@@ static int parse_part_string (void **re
                        || (h_type == TYPE_PLUGIN)
                        || (h_type == TYPE_PLUGIN_INSTANCE)
                        || (h_type == TYPE_TYPE)
 -                      || (h_type == TYPE_TYPE_INSTANCE));
 +                      || (h_type == TYPE_TYPE_INSTANCE)
 +                      || (h_type == TYPE_MESSAGE));
  
        ps.value = buffer + 4;
        if (ps.value[h_length - 5] != '\0')
@@@ -506,18 -520,15 +521,18 @@@ static int parse_packet (void *buffer, 
  
        value_list_t vl = VALUE_LIST_INIT;
        char type[DATA_MAX_NAME_LEN];
 +      notification_t n;
  
        DEBUG ("network plugin: parse_packet: buffer = %p; buffer_len = %i;",
                        buffer, buffer_len);
  
        memset (&vl, '\0', sizeof (vl));
        memset (&type, '\0', sizeof (type));
 +      memset (&n, '\0', sizeof (n));
        status = 0;
  
 -      while ((status == 0) && (buffer_len > sizeof (part_header_t)))
 +      while ((status == 0) && (0 < buffer_len)
 +                      && ((unsigned int) buffer_len > sizeof (part_header_t)))
        {
                header = (part_header_t *) buffer;
  
                        uint64_t tmp = 0;
                        status = parse_part_number (&buffer, &buffer_len, &tmp);
                        if (status == 0)
 +                      {
                                vl.time = (time_t) tmp;
 +                              n.time = (time_t) tmp;
 +                      }
                }
                else if (ntohs (header->type) == TYPE_INTERVAL)
                {
                {
                        status = parse_part_string (&buffer, &buffer_len,
                                        vl.host, sizeof (vl.host));
 -                      DEBUG ("network plugin: parse_packet: vl.host = %s", vl.host);
 +                      strncpy (n.host, vl.host, sizeof (n.host));
 +                      n.host[sizeof (n.host) - 1] = '\0';
 +                      DEBUG ("network plugin: parse_packet: vl.host = %s",
 +                                      vl.host);
                }
                else if (ntohs (header->type) == TYPE_PLUGIN)
                {
                        status = parse_part_string (&buffer, &buffer_len,
                                        vl.plugin, sizeof (vl.plugin));
 -                      DEBUG ("network plugin: parse_packet: vl.plugin = %s", vl.plugin);
 +                      strncpy (n.plugin, vl.plugin, sizeof (n.plugin));
 +                      n.plugin[sizeof (n.plugin) - 1] = '\0';
 +                      DEBUG ("network plugin: parse_packet: vl.plugin = %s",
 +                                      vl.plugin);
                }
                else if (ntohs (header->type) == TYPE_PLUGIN_INSTANCE)
                {
                        status = parse_part_string (&buffer, &buffer_len,
 -                                      vl.plugin_instance, sizeof (vl.plugin_instance));
 -                      DEBUG ("network plugin: parse_packet: vl.plugin_instance = %s", vl.plugin_instance);
 +                                      vl.plugin_instance,
 +                                      sizeof (vl.plugin_instance));
 +                      strncpy (n.plugin_instance, vl.plugin_instance,
 +                                      sizeof (n.plugin_instance));
 +                      n.plugin_instance[sizeof (n.plugin_instance) - 1] = '\0';
 +                      DEBUG ("network plugin: parse_packet: "
 +                                      "vl.plugin_instance = %s",
 +                                      vl.plugin_instance);
                }
                else if (ntohs (header->type) == TYPE_TYPE)
                {
                        status = parse_part_string (&buffer, &buffer_len,
                                        type, sizeof (type));
 -                      DEBUG ("network plugin: parse_packet: type = %s", type);
 +                      strncpy (n.type, type, sizeof (n.type));
 +                      n.type[sizeof (n.type) - 1] = '\0';
 +                      DEBUG ("network plugin: parse_packet: type = %s",
 +                                      type);
                }
                else if (ntohs (header->type) == TYPE_TYPE_INSTANCE)
                {
                        status = parse_part_string (&buffer, &buffer_len,
 -                                      vl.type_instance, sizeof (vl.type_instance));
 -                      DEBUG ("network plugin: parse_packet: vl.type_instance = %s", vl.type_instance);
 +                                      vl.type_instance,
 +                                      sizeof (vl.type_instance));
 +                      strncpy (n.type_instance, vl.type_instance,
 +                                      sizeof (n.type_instance));
 +                      n.type_instance[sizeof (n.type_instance) - 1] = '\0';
 +                      DEBUG ("network plugin: parse_packet: "
 +                                      "vl.type_instance = %s",
 +                                      vl.type_instance);
 +              }
 +              else if (ntohs (header->type) == TYPE_MESSAGE)
 +              {
 +                      status = parse_part_string (&buffer, &buffer_len,
 +                                      n.message, sizeof (n.message));
 +                      DEBUG ("network plugin: parse_packet: n.message = %s",
 +                                      n.message);
 +
 +                      if ((n.severity != NOTIF_FAILURE)
 +                                      && (n.severity != NOTIF_WARNING)
 +                                      && (n.severity != NOTIF_OKAY))
 +                      {
 +                              INFO ("network plugin: "
 +                                              "Ignoring notification with "
 +                                              "unknown severity %s.",
 +                                              n.severity);
 +                      }
 +                      else if (n.time <= 0)
 +                      {
 +                              INFO ("network plugin: "
 +                                              "Ignoring notification with "
 +                                              "time == 0.");
 +                      }
 +                      else if (strlen (n.message) <= 0)
 +                      {
 +                              INFO ("network plugin: "
 +                                              "Ignoring notification with "
 +                                              "an empty message.");
 +                      }
 +                      else
 +                      {
 +                              /*
 +                               * TODO: Let this do a separate thread so that
 +                               * no packets are lost if this takes too long.
 +                               */
 +                              plugin_dispatch_notification (&n);
 +                      }
 +              }
 +              else if (ntohs (header->type) == TYPE_SEVERITY)
 +              {
 +                      uint64_t tmp = 0;
 +                      status = parse_part_number (&buffer, &buffer_len, &tmp);
 +                      if (status == 0)
 +                              n.severity = (int) tmp;
                }
                else
                {
@@@ -754,16 -697,6 +769,16 @@@ static int network_set_ttl (const socke
  static int network_bind_socket (const sockent_t *se, const struct addrinfo *ai)
  {
        int loop = 0;
 +      int yes  = 1;
 +
 +      /* allow multiple sockets to use the same PORT number */
 +      if (setsockopt(se->fd, SOL_SOCKET, SO_REUSEADDR,
 +                              &yes, sizeof(yes)) == -1) {
 +                char errbuf[1024];
 +                ERROR ("setsockopt: %s", 
 +                                sstrerror (errno, errbuf, sizeof (errbuf)));
 +              return (-1);
 +      }
  
        DEBUG ("fd = %i; calling `bind'", se->fd);
  
@@@ -1071,6 -1004,37 +1086,37 @@@ static int network_add_sending_socket (
        return (0);
  } /* int network_get_listen_socket */
  
+ static void *dispatch_thread (void *arg)
+ {
+   while (42)
+   {
+     receive_list_entry_t *ent;
+     /* Lock and wait for more data to come in */
+     pthread_mutex_lock (&receive_list_lock);
+     while ((listen_loop == 0)
+       && (receive_list_head == NULL))
+       pthread_cond_wait (&receive_list_cond, &receive_list_lock);
+     /* Remove the head entry and unlock */
+     ent = receive_list_head;
+     if (ent != NULL)
+       receive_list_head = ent->next;
+     pthread_mutex_unlock (&receive_list_lock);
+     /* Check whether we are supposed to exit. We do NOT check `listen_loop'
+      * because we dispatch all missing packets before shutting down. */
+     if (ent == NULL)
+       break;
+     parse_packet (ent->data, ent->data_len);
+     sfree (ent);
+   } /* while (42) */
+   return (NULL);
+ } /* void *receive_thread */
  static int network_receive (void)
  {
        char buffer[BUFF_SIZE];
  
                for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
                {
+                       receive_list_entry_t *ent;
                        if ((listen_sockets[i].revents & (POLLIN | POLLPRI)) == 0)
                                continue;
                        status--;
                                return (-1);
                        }
  
-                       parse_packet (buffer, buffer_len);
+                       ent = malloc (sizeof (receive_list_entry_t));
+                       if (ent == NULL)
+                       {
+                               ERROR ("network plugin: malloc failed.");
+                               return (-1);
+                       }
+                       memset (ent, '\0', sizeof (receive_list_entry_t));
+                       /* Hopefully this be optimized out by the compiler. It
+                        * might help prevent stupid bugs in the future though.
+                        */
+                       assert (sizeof (ent->data) == sizeof (buffer));
+                       memcpy (ent->data, buffer, buffer_len);
+                       ent->data_len = buffer_len;
+                       pthread_mutex_lock (&receive_list_lock);
+                       if (receive_list_head == NULL)
+                       {
+                               receive_list_head = ent;
+                               receive_list_tail = ent;
+                       }
+                       else
+                       {
+                               receive_list_tail->next = ent;
+                               receive_list_tail = ent;
+                       }
+                       pthread_cond_signal (&receive_list_cond);
+                       pthread_mutex_unlock (&receive_list_lock);
                } /* for (listen_sockets) */
        } /* while (listen_loop == 0) */
  
@@@ -1362,88 -1356,22 +1438,93 @@@ static int network_config (const char *
        return (0);
  } /* int network_config */
  
 +static int network_notification (const notification_t *n)
 +{
 +  char  buffer[BUFF_SIZE];
 +  char *buffer_ptr = buffer;
 +  int   buffer_free = sizeof (buffer);
 +  int   status;
 +
 +  memset (buffer, '\0', sizeof (buffer));
 +
 +
 +  status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME,
 +      (uint64_t) n->time);
 +  if (status != 0)
 +    return (-1);
 +
 +  status = write_part_number (&buffer_ptr, &buffer_free, TYPE_SEVERITY,
 +      (uint64_t) n->severity);
 +  if (status != 0)
 +    return (-1);
 +
 +  if (strlen (n->host) > 0)
 +  {
 +    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST,
 +      n->host, strlen (n->host));
 +    if (status != 0)
 +      return (-1);
 +  }
 +
 +  if (strlen (n->plugin) > 0)
 +  {
 +    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN,
 +      n->plugin, strlen (n->plugin));
 +    if (status != 0)
 +      return (-1);
 +  }
 +
 +  if (strlen (n->plugin_instance) > 0)
 +  {
 +    status = write_part_string (&buffer_ptr, &buffer_free,
 +      TYPE_PLUGIN_INSTANCE,
 +      n->plugin_instance, strlen (n->plugin_instance));
 +    if (status != 0)
 +      return (-1);
 +  }
 +
 +  if (strlen (n->type) > 0)
 +  {
 +    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE,
 +      n->type, strlen (n->type));
 +    if (status != 0)
 +      return (-1);
 +  }
 +
 +  if (strlen (n->type_instance) > 0)
 +  {
 +    status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE,
 +      n->type_instance, strlen (n->type_instance));
 +    if (status != 0)
 +      return (-1);
 +  }
 +
 +  status = write_part_string (&buffer_ptr, &buffer_free, TYPE_MESSAGE,
 +      n->message, strlen (n->message));
 +  if (status != 0)
 +    return (-1);
 +
 +  network_send_buffer (buffer, sizeof (buffer) - buffer_free);
 +
 +  return (0);
 +} /* int network_notification */
 +
  static int network_shutdown (void)
  {
        listen_loop++;
  
-       if (listen_thread != (pthread_t) 0)
+       /* Kill the listening thread */
+       if (receive_thread_id != (pthread_t) 0)
        {
-               pthread_kill (listen_thread, SIGTERM);
-               pthread_join (listen_thread, NULL /* no return value */);
-               listen_thread = (pthread_t) 0;
+               pthread_kill (receive_thread_id, SIGTERM);
+               pthread_join (receive_thread_id, NULL /* no return value */);
+               receive_thread_id = (pthread_t) 0;
        }
  
+       /* Shutdown the dispatching thread */
+       if (dispatch_thread_id != (pthread_t) 0)
+               pthread_cond_broadcast (&receive_list_cond);
        if (send_buffer_fill > 0)
                flush_buffer ();
  
@@@ -1485,18 -1413,28 +1566,31 @@@ static int network_init (void
  
        /* setup socket(s) and so on */
        if (sending_sockets != NULL)
 +      {
                plugin_register_write ("network", network_write);
 +              plugin_register_notification ("network", network_notification);
 +      }
  
-       if ((listen_sockets_num != 0) && (listen_thread == 0))
+       if ((listen_sockets_num != 0) && (receive_thread_id == 0))
        {
                int status;
  
-               status = pthread_create (&listen_thread, NULL /* no attributes */,
-                               receive_thread, NULL /* no argument */);
+               status = pthread_create (&dispatch_thread_id,
+                               NULL /* no attributes */,
+                               dispatch_thread,
+                               NULL /* no argument */);
+               if (status != 0)
+               {
+                       char errbuf[1024];
+                       ERROR ("network: pthread_create failed: %s",
+                                       sstrerror (errno, errbuf,
+                                               sizeof (errbuf)));
+               }
  
+               status = pthread_create (&receive_thread_id,
+                               NULL /* no attributes */,
+                               receive_thread,
+                               NULL /* no argument */);
                if (status != 0)
                {
                        char errbuf[1024];