src/plugin.c, network, rrdtool: Improved thread shutdown.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sun, 1 Mar 2009 08:43:54 +0000 (09:43 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sun, 1 Mar 2009 08:43:54 +0000 (09:43 +0100)
Just some comments, info messages and not assigning zero to pthread_t.

src/network.c
src/plugin.c
src/rrdtool.c

index c9a92ac..7e55986 100644 (file)
@@ -168,19 +168,27 @@ static pthread_mutex_t       receive_list_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t        receive_list_cond = PTHREAD_COND_INITIALIZER;
 
 static struct pollfd *listen_sockets = NULL;
-static int listen_sockets_num = 0;
-
-static int listen_loop = 0;
-static pthread_t receive_thread_id = 0;
-static pthread_t dispatch_thread_id = 0;
-
-static char         send_buffer[BUFF_SIZE];
-static char        *send_buffer_ptr;
-static int          send_buffer_fill;
-static value_list_t send_buffer_vl = VALUE_LIST_STATIC;
-static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
-
-static c_avl_tree_t      *cache_tree = NULL;
+static int            listen_sockets_num = 0;
+
+/* The receive and dispatch threads will run as long as `listen_loop' is set to
+ * zero. */
+static int       listen_loop = 0;
+static int       receive_thread_running = 0;
+static pthread_t receive_thread_id;
+static int       dispatch_thread_running = 0;
+static pthread_t dispatch_thread_id;
+
+/* Buffer in which to-be-sent network packets are constructed. */
+static char             send_buffer[BUFF_SIZE];
+static char            *send_buffer_ptr;
+static int              send_buffer_fill;
+static value_list_t     send_buffer_vl = VALUE_LIST_STATIC;
+static pthread_mutex_t  send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/* In this cache we store all the values we received, so we can send out only
+ * those values which were *not* received via the network plugin, too. This is
+ * used for the `Forward false' option. */
+static c_avl_tree_t    *cache_tree = NULL;
 static pthread_mutex_t  cache_lock = PTHREAD_MUTEX_INITIALIZER;
 static time_t           cache_flush_last = 0;
 static int              cache_flush_interval = 1800;
@@ -1329,6 +1337,10 @@ static int network_receive (void)
                                return (-1);
                        }
 
+                       /* TODO: Possible performance enhancement: Do not free
+                        * these entries in the dispatch thread but put them in
+                        * another list, so we don't have to allocate more and
+                        * more of these structures. */
                        ent = malloc (sizeof (receive_list_entry_t));
                        if (ent == NULL)
                        {
@@ -1709,16 +1721,23 @@ static int network_shutdown (void)
        listen_loop++;
 
        /* Kill the listening thread */
-       if (receive_thread_id != (pthread_t) 0)
+       if (receive_thread_running != 0)
        {
+               INFO ("network plugin: Stopping receive thread.");
                pthread_kill (receive_thread_id, SIGTERM);
                pthread_join (receive_thread_id, NULL /* no return value */);
-               receive_thread_id = (pthread_t) 0;
+               memset (&receive_thread_id, 0, sizeof (receive_thread_id));
+               receive_thread_running = 0;
        }
 
        /* Shutdown the dispatching thread */
-       if (dispatch_thread_id != (pthread_t) 0)
+       if (dispatch_thread_running != 0)
+       {
+               INFO ("network plugin: Stopping dispatch thread.");
+               pthread_join (dispatch_thread_id, /* ret = */ NULL);
                pthread_cond_broadcast (&receive_list_cond);
+               dispatch_thread_running = 0;
+       }
 
        if (send_buffer_fill > 0)
                flush_buffer ();
@@ -1775,10 +1794,15 @@ static int network_init (void)
                                /* user_data = */ NULL);
        }
 
-       if ((listen_sockets_num != 0) && (receive_thread_id == 0))
+       /* If no threads need to be started, return here. */
+       if ((listen_sockets_num == 0)
+                       || ((dispatch_thread_running != 0)
+                               && (receive_thread_running != 0)))
+               return (0);
+
+       if (dispatch_thread_running == 0)
        {
                int status;
-
                status = pthread_create (&dispatch_thread_id,
                                NULL /* no attributes */,
                                dispatch_thread,
@@ -1790,7 +1814,15 @@ static int network_init (void)
                                        sstrerror (errno, errbuf,
                                                sizeof (errbuf)));
                }
+               else
+               {
+                       dispatch_thread_running = 1;
+               }
+       }
 
+       if (receive_thread_running == 0)
+       {
+               int status;
                status = pthread_create (&receive_thread_id,
                                NULL /* no attributes */,
                                receive_thread,
@@ -1802,7 +1834,12 @@ static int network_init (void)
                                        sstrerror (errno, errbuf,
                                                sizeof (errbuf)));
                }
+               else
+               {
+                       receive_thread_running = 1;
+               }
        }
+
        return (0);
 } /* int network_init */
 
index d185ef2..265bf46 100644 (file)
@@ -404,6 +404,8 @@ static void stop_read_threads (void)
        if (read_threads == NULL)
                return;
 
+       INFO ("collectd: Stopping %i read threads.", read_threads_num);
+
        pthread_mutex_lock (&read_lock);
        read_loop = 0;
        DEBUG ("plugin: stop_read_threads: Signalling `read_cond'");
index 6ddbfc9..b8885e8 100644 (file)
@@ -111,7 +111,8 @@ static rrd_queue_t    *queue_head = NULL;
 static rrd_queue_t    *queue_tail = NULL;
 static rrd_queue_t    *flushq_head = NULL;
 static rrd_queue_t    *flushq_tail = NULL;
-static pthread_t       queue_thread = 0;
+static pthread_t       queue_thread;
+static int             queue_thread_running = 1;
 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
 
@@ -973,14 +974,28 @@ static int rrd_shutdown (void)
        pthread_cond_signal (&queue_cond);
        pthread_mutex_unlock (&queue_lock);
 
+       if ((queue_thread_running != 0)
+                       && ((queue_head != NULL) || (flushq_head != NULL)))
+       {
+               INFO ("rrdtool plugin: Shutting down the queue thread. "
+                               "This may take a while.");
+       }
+       else if (queue_thread_running != 0)
+       {
+               INFO ("rrdtool plugin: Shutting down the queue thread.");
+       }
+
        /* Wait for all the values to be written to disk before returning. */
-       if (queue_thread != 0)
+       if (queue_thread_running != 0)
        {
                pthread_join (queue_thread, NULL);
-               queue_thread = 0;
+               memset (&queue_thread, 0, sizeof (queue_thread));
+               queue_thread_running = 0;
                DEBUG ("rrdtool plugin: queue_thread exited.");
        }
 
+       /* TODO: Maybe it'd be a good idea to free the cache here.. */
+
        return (0);
 } /* int rrd_shutdown */
 
@@ -1025,12 +1040,14 @@ static int rrd_init (void)
 
        pthread_mutex_unlock (&cache_lock);
 
-       status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
+       status = pthread_create (&queue_thread, /* attr = */ NULL,
+                       rrd_queue_thread, /* args = */ NULL);
        if (status != 0)
        {
                ERROR ("rrdtool plugin: Cannot create queue-thread.");
                return (-1);
        }
+       queue_thread_running = 1;
 
        DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
                        " heartbeat = %i; rrarows = %i; xff = %lf;",