src/rrd_daemon.c: Implemented removal of unused tree nodes.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 /*
95  * Types
96  */
97 struct listen_socket_s
98 {
99   int fd;
100   char path[PATH_MAX + 1];
101 };
102 typedef struct listen_socket_s listen_socket_t;
103
104 struct cache_item_s;
105 typedef struct cache_item_s cache_item_t;
106 struct cache_item_s
107 {
108   char *file;
109   char **values;
110   int values_num;
111   time_t last_flush_time;
112 #define CI_FLAGS_IN_TREE  0x01
113 #define CI_FLAGS_IN_QUEUE 0x02
114   int flags;
115
116   cache_item_t *next;
117 };
118
119 struct callback_flush_data_s
120 {
121   time_t now;
122   char **keys;
123   size_t keys_num;
124 };
125 typedef struct callback_flush_data_s callback_flush_data_t;
126
127 enum queue_side_e
128 {
129   HEAD,
130   TAIL
131 };
132 typedef enum queue_side_e queue_side_t;
133
134 /*
135  * Variables
136  */
137 static listen_socket_t *listen_fds = NULL;
138 static size_t listen_fds_num = 0;
139
140 static int do_shutdown = 0;
141
142 static pthread_t queue_thread;
143
144 static pthread_t *connetion_threads = NULL;
145 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
146 static int connetion_threads_num = 0;
147
148 /* Cache stuff */
149 static GTree          *cache_tree = NULL;
150 static cache_item_t   *cache_queue_head = NULL;
151 static cache_item_t   *cache_queue_tail = NULL;
152 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
153 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
154
155 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
156
157 static int config_write_interval = 300;
158 static int config_flush_interval = 3600;
159 static char *config_pid_file = NULL;
160 static char *config_base_dir = NULL;
161
162 static char **config_listen_address_list = NULL;
163 static int config_listen_address_list_len = 0;
164
165 /* 
166  * Functions
167  */
168 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
169 {
170   do_shutdown++;
171 } /* }}} void sig_int_handler */
172
173 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
174 {
175   do_shutdown++;
176 } /* }}} void sig_term_handler */
177
178 static int write_pidfile (void) /* {{{ */
179 {
180   pid_t pid;
181   char *file;
182   FILE *fh;
183
184   pid = getpid ();
185   
186   file = (config_pid_file != NULL)
187     ? config_pid_file
188     : LOCALSTATEDIR "/run/rrdcached.pid";
189
190   fh = fopen (file, "w");
191   if (fh == NULL)
192   {
193     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
194     return (-1);
195   }
196
197   fprintf (fh, "%i\n", (int) pid);
198   fclose (fh);
199
200   return (0);
201 } /* }}} int write_pidfile */
202
203 static int remove_pidfile (void) /* {{{ */
204 {
205   char *file;
206   int status;
207
208   file = (config_pid_file != NULL)
209     ? config_pid_file
210     : LOCALSTATEDIR "/run/rrdcached.pid";
211
212   status = unlink (file);
213   if (status == 0)
214     return (0);
215   return (errno);
216 } /* }}} int remove_pidfile */
217
218 /*
219  * enqueue_cache_item:
220  * `cache_lock' must be acquired before calling this function!
221  */
222 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
223     queue_side_t side)
224 {
225   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
226       ci->file);
227
228   if (ci == NULL)
229     return (-1);
230
231   if (ci->values_num == 0)
232     return (0);
233
234   if (side == HEAD)
235   {
236     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
237     {
238       assert (ci->next == NULL);
239       ci->next = cache_queue_head;
240       cache_queue_head = ci;
241
242       if (cache_queue_tail == NULL)
243         cache_queue_tail = cache_queue_head;
244     }
245     else if (cache_queue_head == ci)
246     {
247       /* do nothing */
248     }
249     else /* enqueued, but not first entry */
250     {
251       cache_item_t *prev;
252
253       /* find previous entry */
254       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
255         if (prev->next == ci)
256           break;
257       assert (prev != NULL);
258
259       /* move to the front */
260       prev->next = ci->next;
261       ci->next = cache_queue_head;
262       cache_queue_head = ci;
263
264       /* check if we need to adapt the tail */
265       if (cache_queue_tail == ci)
266         cache_queue_tail = prev;
267     }
268   }
269   else /* (side == TAIL) */
270   {
271     /* We don't move values back in the list.. */
272     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
273       return (0);
274
275     assert (ci->next == NULL);
276
277     if (cache_queue_tail == NULL)
278       cache_queue_head = ci;
279     else
280       cache_queue_tail->next = ci;
281     cache_queue_tail = ci;
282   }
283
284   ci->flags |= CI_FLAGS_IN_QUEUE;
285
286   return (0);
287 } /* }}} int enqueue_cache_item */
288
289 /*
290  * tree_callback_flush:
291  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
292  * while this is in progress.
293  */
294 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
295     gpointer data)
296 {
297   cache_item_t *ci;
298   callback_flush_data_t *cfd;
299
300   ci = (cache_item_t *) value;
301   cfd = (callback_flush_data_t *) data;
302
303   if (((cfd->now - ci->last_flush_time) >= config_write_interval)
304       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
305       && (ci->values_num > 0))
306   {
307     enqueue_cache_item (ci, TAIL);
308   }
309   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
310       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
311       && (ci->values_num <= 0))
312   {
313     char **temp;
314
315     temp = (char **) realloc (cfd->keys,
316         sizeof (char *) * (cfd->keys_num + 1));
317     if (temp == NULL)
318     {
319       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
320       return (FALSE);
321     }
322     cfd->keys = temp;
323     /* Make really sure this points to the _same_ place */
324     assert ((char *) key == ci->file);
325     cfd->keys[cfd->keys_num] = (char *) key;
326     cfd->keys_num++;
327   }
328
329   return (FALSE);
330 } /* }}} gboolean tree_callback_flush */
331
332 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
333 {
334   struct timeval now;
335   struct timespec next_flush;
336
337   gettimeofday (&now, NULL);
338   next_flush.tv_sec = now.tv_sec + config_flush_interval;
339   next_flush.tv_nsec = 1000 * now.tv_usec;
340
341   pthread_mutex_lock (&cache_lock);
342   while ((do_shutdown == 0) || (cache_queue_head != NULL))
343   {
344     cache_item_t *ci;
345     char *file;
346     char **values;
347     int values_num;
348     int status;
349     int i;
350
351     /* First, check if it's time to do the cache flush. */
352     gettimeofday (&now, NULL);
353     if ((now.tv_sec > next_flush.tv_sec)
354         || ((now.tv_sec == next_flush.tv_sec)
355           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
356     {
357       callback_flush_data_t cfd;
358       size_t k;
359
360       memset (&cfd, 0, sizeof (cfd));
361       /* Pass the current time as user data so that we don't need to call
362        * `time' for each node. */
363       cfd.now = time (NULL);
364       cfd.keys = NULL;
365       cfd.keys_num = 0;
366
367       /* `tree_callback_flush' will return the keys of all values that haven't
368        * been touched in the last `config_flush_interval' seconds in `cfd'.
369        * The char*'s in this array point to the same memory as ci->file, so we
370        * don't need to free them separately. */
371       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
372
373       for (k = 0; k < cfd.keys_num; k++)
374       {
375         /* This must not fail. */
376         ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
377         assert (ci != NULL);
378
379         /* If we end up here with values available, something's seriously
380          * messed up. */
381         assert (ci->values_num == 0);
382
383         /* Remove the node from the tree */
384         g_tree_remove (cache_tree, cfd.keys[k]);
385         cfd.keys[k] = NULL;
386
387         /* Now free and clean up `ci'. */
388         free (ci->file);
389         ci->file = NULL;
390         free (ci);
391         ci = NULL;
392       } /* for (k = 0; k < cfd.keys_num; k++) */
393
394       if (cfd.keys != NULL)
395       {
396         free (cfd.keys);
397         cfd.keys = NULL;
398       }
399
400       /* Determine the time of the next cache flush. */
401       while (next_flush.tv_sec < now.tv_sec)
402         next_flush.tv_sec += config_flush_interval;
403     }
404
405     /* Now, check if there's something to store away. If not, wait until
406      * something comes in or it's time to do the cache flush. */
407     if (cache_queue_head == NULL)
408     {
409       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
410       if ((status != 0) && (status != ETIMEDOUT))
411       {
412         RRDD_LOG (LOG_ERR, "queue_thread_main: "
413             "pthread_cond_timedwait returned %i.", status);
414       }
415     }
416
417     /* Check if a value has arrived. This may be NULL if we timed out or there
418      * was an interrupt such as a signal. */
419     if (cache_queue_head == NULL)
420       continue;
421
422     ci = cache_queue_head;
423
424     /* copy the relevant parts */
425     file = strdup (ci->file);
426     if (file == NULL)
427     {
428       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
429       continue;
430     }
431
432     values = ci->values;
433     values_num = ci->values_num;
434
435     ci->values = NULL;
436     ci->values_num = 0;
437
438     ci->last_flush_time = time (NULL);
439     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
440
441     cache_queue_head = ci->next;
442     if (cache_queue_head == NULL)
443       cache_queue_tail = NULL;
444     ci->next = NULL;
445
446     pthread_mutex_unlock (&cache_lock);
447
448     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
449         file, values_num, (void *) values);
450
451     status = rrd_update_r (file, NULL, values_num, (void *) values);
452     if (status != 0)
453     {
454       RRDD_LOG (LOG_ERR, "queue_thread_main: "
455           "rrd_update_r failed with status %i.",
456           status);
457     }
458
459     free (file);
460     for (i = 0; i < values_num; i++)
461       free (values[i]);
462
463     pthread_mutex_lock (&cache_lock);
464     pthread_cond_broadcast (&flush_cond);
465   } /* while (do_shutdown == 0) */
466   pthread_mutex_unlock (&cache_lock);
467
468   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
469
470   return (NULL);
471 } /* }}} void *queue_thread_main */
472
473 static int buffer_get_field (char **buffer_ret, /* {{{ */
474     size_t *buffer_size_ret, char **field_ret)
475 {
476   char *buffer;
477   size_t buffer_pos;
478   size_t buffer_size;
479   char *field;
480   size_t field_size;
481   int status;
482
483   buffer = *buffer_ret;
484   buffer_pos = 0;
485   buffer_size = *buffer_size_ret;
486   field = *buffer_ret;
487   field_size = 0;
488
489   /* This is ensured by `handle_request'. */
490   assert (buffer[buffer_size - 1] == ' ');
491
492   status = -1;
493   while (buffer_pos < buffer_size)
494   {
495     /* Check for end-of-field or end-of-buffer */
496     if (buffer[buffer_pos] == ' ')
497     {
498       field[field_size] = 0;
499       field_size++;
500       buffer_pos++;
501       status = 0;
502       break;
503     }
504     /* Handle escaped characters. */
505     else if (buffer[buffer_pos] == '\\')
506     {
507       if (buffer_pos >= (buffer_size - 1))
508         break;
509       buffer_pos++;
510       field[field_size] = buffer[buffer_pos];
511       field_size++;
512       buffer_pos++;
513     }
514     /* Normal operation */ 
515     else
516     {
517       field[field_size] = buffer[buffer_pos];
518       field_size++;
519       buffer_pos++;
520     }
521   } /* while (buffer_pos < buffer_size) */
522
523   if (status != 0)
524     return (status);
525
526   *buffer_ret = buffer + buffer_pos;
527   *buffer_size_ret = buffer_size - buffer_pos;
528   *field_ret = field;
529
530   return (0);
531 } /* }}} int buffer_get_field */
532
533 static int flush_file (const char *filename) /* {{{ */
534 {
535   cache_item_t *ci;
536
537   pthread_mutex_lock (&cache_lock);
538
539   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
540   if (ci == NULL)
541   {
542     pthread_mutex_unlock (&cache_lock);
543     return (ENOENT);
544   }
545
546   /* Enqueue at head */
547   enqueue_cache_item (ci, HEAD);
548   pthread_cond_signal (&cache_cond);
549
550   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
551   {
552     ci = NULL;
553
554     pthread_cond_wait (&flush_cond, &cache_lock);
555
556     ci = g_tree_lookup (cache_tree, filename);
557     if (ci == NULL)
558     {
559       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
560           "while waiting for flush.");
561       pthread_mutex_unlock (&cache_lock);
562       return (-1);
563     }
564   }
565
566   pthread_mutex_unlock (&cache_lock);
567   return (0);
568 } /* }}} int flush_file */
569
570 static int handle_request_flush (int fd, /* {{{ */
571     char *buffer, size_t buffer_size)
572 {
573   char *file;
574   int status;
575   char result[4096];
576
577   status = buffer_get_field (&buffer, &buffer_size, &file);
578   if (status != 0)
579   {
580     RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name.");
581     return (-1);
582   }
583
584   status = flush_file (file);
585   if (status == 0)
586     snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
587   else if (status == ENOENT)
588     snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
589   else if (status < 0)
590     strncpy (result, "-1 Internal error.\n", sizeof (result));
591   else
592     snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
593   result[sizeof (result) - 1] = 0;
594
595   status = write (fd, result, strlen (result));
596   if (status < 0)
597   {
598     status = errno;
599     RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error.");
600     return (status);
601   }
602
603   return (0);
604 } /* }}} int handle_request_flush */
605
606 static int handle_request_update (int fd, /* {{{ */
607     char *buffer, size_t buffer_size)
608 {
609   char *file;
610   int values_num = 0;
611   int status;
612
613   time_t now;
614
615   cache_item_t *ci;
616   char answer[4096];
617
618   now = time (NULL);
619
620   status = buffer_get_field (&buffer, &buffer_size, &file);
621   if (status != 0)
622   {
623     RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
624     return (-1);
625   }
626
627   pthread_mutex_lock (&cache_lock);
628
629   ci = g_tree_lookup (cache_tree, file);
630   if (ci == NULL) /* {{{ */
631   {
632     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
633     if (ci == NULL)
634     {
635       pthread_mutex_unlock (&cache_lock);
636       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
637       return (-1);
638     }
639     memset (ci, 0, sizeof (cache_item_t));
640
641     ci->file = strdup (file);
642     if (ci->file == NULL)
643     {
644       pthread_mutex_unlock (&cache_lock);
645       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
646       free (ci);
647       return (-1);
648     }
649
650     ci->values = NULL;
651     ci->values_num = 0;
652     ci->last_flush_time = now;
653     ci->flags = CI_FLAGS_IN_TREE;
654
655     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
656
657     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
658         ci->file);
659   } /* }}} */
660   assert (ci != NULL);
661
662   while (buffer_size > 0)
663   {
664     char **temp;
665     char *value;
666
667     status = buffer_get_field (&buffer, &buffer_size, &value);
668     if (status != 0)
669     {
670       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
671       break;
672     }
673
674     temp = (char **) realloc (ci->values,
675         sizeof (char *) * (ci->values_num + 1));
676     if (temp == NULL)
677     {
678       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
679       continue;
680     }
681     ci->values = temp;
682
683     ci->values[ci->values_num] = strdup (value);
684     if (ci->values[ci->values_num] == NULL)
685     {
686       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
687       continue;
688     }
689     ci->values_num++;
690
691     values_num++;
692   }
693
694   if (((now - ci->last_flush_time) >= config_write_interval)
695       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
696       && (ci->values_num > 0))
697   {
698     enqueue_cache_item (ci, TAIL);
699     pthread_cond_signal (&cache_cond);
700   }
701
702   pthread_mutex_unlock (&cache_lock);
703
704   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
705   answer[sizeof (answer) - 1] = 0;
706
707   status = write (fd, answer, strlen (answer));
708   if (status < 0)
709   {
710     status = errno;
711     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
712     return (status);
713   }
714
715   return (0);
716 } /* }}} int handle_request_update */
717
718 static int handle_request (int fd) /* {{{ */
719 {
720   char buffer[4096];
721   size_t buffer_size;
722   char *buffer_ptr;
723   char *command;
724   int status;
725
726   status = read (fd, buffer, sizeof (buffer));
727   if (status == 0)
728   {
729     return (1);
730   }
731   else if (status < 0)
732   {
733     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
734     return (-1);
735   }
736   buffer_size = status;
737   assert (((size_t) buffer_size) <= sizeof (buffer));
738
739   if (buffer[buffer_size - 1] != '\n')
740   {
741     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
742     return (-1);
743   }
744
745   /* Accept Windows style line endings, too */
746   if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r'))
747   {
748     buffer_size--;
749     buffer[buffer_size - 1] = '\n';
750   }
751
752   /* Place the normal field separator at the end to simplify
753    * `buffer_get_field's work. */
754   buffer[buffer_size - 1] = ' ';
755
756   buffer_ptr = buffer;
757   command = NULL;
758   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
759   if (status != 0)
760   {
761     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
762     return (-1);
763   }
764
765   if (strcmp (command, "update") == 0)
766   {
767     return (handle_request_update (fd, buffer_ptr, buffer_size));
768   }
769   else if (strcmp (command, "flush") == 0)
770   {
771     return (handle_request_flush (fd, buffer_ptr, buffer_size));
772   }
773   else
774   {
775     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
776     return (-1);
777   }
778 } /* }}} int handle_request */
779
780 static void *connection_thread_main (void *args /* {{{ */
781     __attribute__((unused)))
782 {
783   pthread_t self;
784   int i;
785   int fd;
786   
787   fd = *((int *) args);
788
789   pthread_mutex_lock (&connetion_threads_lock);
790   {
791     pthread_t *temp;
792
793     temp = (pthread_t *) realloc (connetion_threads,
794         sizeof (pthread_t) * (connetion_threads_num + 1));
795     if (temp == NULL)
796     {
797       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
798     }
799     else
800     {
801       connetion_threads = temp;
802       connetion_threads[connetion_threads_num] = pthread_self ();
803       connetion_threads_num++;
804     }
805   }
806   pthread_mutex_unlock (&connetion_threads_lock);
807
808   while (do_shutdown == 0)
809   {
810     struct pollfd pollfd;
811     int status;
812
813     pollfd.fd = fd;
814     pollfd.events = POLLIN | POLLPRI;
815     pollfd.revents = 0;
816
817     status = poll (&pollfd, 1, /* timeout = */ 500);
818     if (status == 0) /* timeout */
819       continue;
820     else if (status < 0) /* error */
821     {
822       status = errno;
823       if (status == EINTR)
824         continue;
825       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
826       continue;
827     }
828
829     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
830     {
831       close (fd);
832       break;
833     }
834     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
835     {
836       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
837           "poll(2) returned something unexpected: %#04hx",
838           pollfd.revents);
839       close (fd);
840       break;
841     }
842
843     status = handle_request (fd);
844     if (status != 0)
845     {
846       close (fd);
847       break;
848     }
849   }
850
851   self = pthread_self ();
852   /* Remove this thread from the connection threads list */
853   pthread_mutex_lock (&connetion_threads_lock);
854   /* Find out own index in the array */
855   for (i = 0; i < connetion_threads_num; i++)
856     if (pthread_equal (connetion_threads[i], self) != 0)
857       break;
858   assert (i < connetion_threads_num);
859
860   /* Move the trailing threads forward. */
861   if (i < (connetion_threads_num - 1))
862   {
863     memmove (connetion_threads + i,
864         connetion_threads + i + 1,
865         sizeof (pthread_t) * (connetion_threads_num - i - 1));
866   }
867
868   connetion_threads_num--;
869   pthread_mutex_unlock (&connetion_threads_lock);
870
871   free (args);
872   return (NULL);
873 } /* }}} void *connection_thread_main */
874
875 static int open_listen_socket_unix (const char *path) /* {{{ */
876 {
877   int fd;
878   struct sockaddr_un sa;
879   listen_socket_t *temp;
880   int status;
881
882   temp = (listen_socket_t *) realloc (listen_fds,
883       sizeof (listen_fds[0]) * (listen_fds_num + 1));
884   if (temp == NULL)
885   {
886     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
887     return (-1);
888   }
889   listen_fds = temp;
890   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
891
892   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
893   if (fd < 0)
894   {
895     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
896     return (-1);
897   }
898
899   memset (&sa, 0, sizeof (sa));
900   sa.sun_family = AF_UNIX;
901   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
902
903   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
904   if (status != 0)
905   {
906     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
907     close (fd);
908     unlink (path);
909     return (-1);
910   }
911
912   status = listen (fd, /* backlog = */ 10);
913   if (status != 0)
914   {
915     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
916     close (fd);
917     unlink (path);
918     return (-1);
919   }
920   
921   listen_fds[listen_fds_num].fd = fd;
922   snprintf (listen_fds[listen_fds_num].path,
923       sizeof (listen_fds[listen_fds_num].path) - 1,
924       "unix:%s", path);
925   listen_fds_num++;
926
927   return (0);
928 } /* }}} int open_listen_socket_unix */
929
930 static int open_listen_socket (const char *addr) /* {{{ */
931 {
932   struct addrinfo ai_hints;
933   struct addrinfo *ai_res;
934   struct addrinfo *ai_ptr;
935   int status;
936
937   assert (addr != NULL);
938
939   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
940     return (open_listen_socket_unix (addr + strlen ("unix:")));
941   else if (addr[0] == '/')
942     return (open_listen_socket_unix (addr));
943
944   memset (&ai_hints, 0, sizeof (ai_hints));
945   ai_hints.ai_flags = 0;
946 #ifdef AI_ADDRCONFIG
947   ai_hints.ai_flags |= AI_ADDRCONFIG;
948 #endif
949   ai_hints.ai_family = AF_UNSPEC;
950   ai_hints.ai_socktype = SOCK_STREAM;
951
952   ai_res = NULL;
953   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
954   if (status != 0)
955   {
956     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
957         "%s", addr, gai_strerror (status));
958     return (-1);
959   }
960
961   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
962   {
963     int fd;
964     listen_socket_t *temp;
965
966     temp = (listen_socket_t *) realloc (listen_fds,
967         sizeof (listen_fds[0]) * (listen_fds_num + 1));
968     if (temp == NULL)
969     {
970       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
971       continue;
972     }
973     listen_fds = temp;
974     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
975
976     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
977     if (fd < 0)
978     {
979       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
980       continue;
981     }
982
983     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
984     if (status != 0)
985     {
986       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
987       close (fd);
988       continue;
989     }
990
991     status = listen (fd, /* backlog = */ 10);
992     if (status != 0)
993     {
994       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
995       close (fd);
996       return (-1);
997     }
998
999     listen_fds[listen_fds_num].fd = fd;
1000     strncpy (listen_fds[listen_fds_num].path, addr,
1001         sizeof (listen_fds[listen_fds_num].path) - 1);
1002     listen_fds_num++;
1003   } /* for (ai_ptr) */
1004
1005   return (0);
1006 } /* }}} int open_listen_socket */
1007
1008 static int close_listen_sockets (void) /* {{{ */
1009 {
1010   size_t i;
1011
1012   for (i = 0; i < listen_fds_num; i++)
1013   {
1014     close (listen_fds[i].fd);
1015     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1016       unlink (listen_fds[i].path + strlen ("unix:"));
1017   }
1018
1019   free (listen_fds);
1020   listen_fds = NULL;
1021   listen_fds_num = 0;
1022
1023   return (0);
1024 } /* }}} int close_listen_sockets */
1025
1026 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1027 {
1028   struct pollfd *pollfds;
1029   int pollfds_num;
1030   int status;
1031   int i;
1032
1033   for (i = 0; i < config_listen_address_list_len; i++)
1034   {
1035     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
1036         "= %s", i, config_listen_address_list[i]);
1037     open_listen_socket (config_listen_address_list[i]);
1038   }
1039
1040   if (config_listen_address_list_len < 1)
1041     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1042
1043   if (listen_fds_num < 1)
1044   {
1045     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1046         "could be opened. Sorry.");
1047     return (NULL);
1048   }
1049
1050   pollfds_num = listen_fds_num;
1051   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1052   if (pollfds == NULL)
1053   {
1054     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1055     return (NULL);
1056   }
1057   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1058
1059   while (do_shutdown == 0)
1060   {
1061     assert (pollfds_num == ((int) listen_fds_num));
1062     for (i = 0; i < pollfds_num; i++)
1063     {
1064       pollfds[i].fd = listen_fds[i].fd;
1065       pollfds[i].events = POLLIN | POLLPRI;
1066       pollfds[i].revents = 0;
1067     }
1068
1069     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1070     if (status < 1)
1071     {
1072       status = errno;
1073       if (status != EINTR)
1074       {
1075         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1076       }
1077       continue;
1078     }
1079
1080     for (i = 0; i < pollfds_num; i++)
1081     {
1082       int *client_sd;
1083       struct sockaddr_storage client_sa;
1084       socklen_t client_sa_size;
1085       pthread_t tid;
1086
1087       if (pollfds[i].revents == 0)
1088         continue;
1089
1090       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1091       {
1092         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1093             "poll(2) returned something unexpected for listen FD #%i.",
1094             pollfds[i].fd);
1095         continue;
1096       }
1097
1098       client_sd = (int *) malloc (sizeof (int));
1099       if (client_sd == NULL)
1100       {
1101         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1102         continue;
1103       }
1104
1105       client_sa_size = sizeof (client_sa);
1106       *client_sd = accept (pollfds[i].fd,
1107           (struct sockaddr *) &client_sa, &client_sa_size);
1108       if (*client_sd < 0)
1109       {
1110         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1111         continue;
1112       }
1113
1114       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
1115           /* args = */ (void *) client_sd);
1116       if (status != 0)
1117       {
1118         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1119         close (*client_sd);
1120         free (client_sd);
1121         continue;
1122       }
1123     } /* for (pollfds_num) */
1124   } /* while (do_shutdown == 0) */
1125
1126   close_listen_sockets ();
1127
1128   pthread_mutex_lock (&connetion_threads_lock);
1129   while (connetion_threads_num > 0)
1130   {
1131     pthread_t wait_for;
1132
1133     wait_for = connetion_threads[0];
1134
1135     pthread_mutex_unlock (&connetion_threads_lock);
1136     pthread_join (wait_for, /* retval = */ NULL);
1137     pthread_mutex_lock (&connetion_threads_lock);
1138   }
1139   pthread_mutex_unlock (&connetion_threads_lock);
1140
1141   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
1142
1143   return (NULL);
1144 } /* }}} void *listen_thread_main */
1145
1146 static int daemonize (void) /* {{{ */
1147 {
1148   pid_t child;
1149   int status;
1150   char *base_dir;
1151
1152   /* These structures are static, because `sigaction' behaves weird if the are
1153    * overwritten.. */
1154   static struct sigaction sa_int;
1155   static struct sigaction sa_term;
1156   static struct sigaction sa_pipe;
1157
1158   child = fork ();
1159   if (child < 0)
1160   {
1161     fprintf (stderr, "daemonize: fork(2) failed.\n");
1162     return (-1);
1163   }
1164   else if (child > 0)
1165   {
1166     return (1);
1167   }
1168
1169   /* Change into the /tmp directory. */
1170   base_dir = (config_base_dir != NULL)
1171     ? config_base_dir
1172     : "/tmp";
1173   status = chdir (base_dir);
1174   if (status != 0)
1175   {
1176     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1177     return (-1);
1178   }
1179
1180   /* Become session leader */
1181   setsid ();
1182
1183   /* Open the first three file descriptors to /dev/null */
1184   close (2);
1185   close (1);
1186   close (0);
1187
1188   open ("/dev/null", O_RDWR);
1189   dup (0);
1190   dup (0);
1191
1192   /* Install signal handlers */
1193   memset (&sa_int, 0, sizeof (sa_int));
1194   sa_int.sa_handler = sig_int_handler;
1195   sigaction (SIGINT, &sa_int, NULL);
1196
1197   memset (&sa_term, 0, sizeof (sa_term));
1198   sa_term.sa_handler = sig_term_handler;
1199   sigaction (SIGINT, &sa_term, NULL);
1200
1201   memset (&sa_pipe, 0, sizeof (sa_pipe));
1202   sa_pipe.sa_handler = SIG_IGN;
1203   sigaction (SIGPIPE, &sa_pipe, NULL);
1204
1205   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1206
1207   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1208   if (cache_tree == NULL)
1209   {
1210     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1211     return (-1);
1212   }
1213
1214   memset (&queue_thread, 0, sizeof (queue_thread));
1215   status = pthread_create (&queue_thread, /* attr = */ NULL,
1216       queue_thread_main, /* args = */ NULL);
1217   if (status != 0)
1218   {
1219     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
1220     return (-1);
1221   }
1222
1223   write_pidfile ();
1224
1225   return (0);
1226 } /* }}} int daemonize */
1227
1228 static int cleanup (void) /* {{{ */
1229 {
1230   RRDD_LOG (LOG_DEBUG, "cleanup ()");
1231
1232   do_shutdown++;
1233
1234   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
1235   pthread_cond_signal (&cache_cond);
1236   pthread_join (queue_thread, /* return = */ NULL);
1237   RRDD_LOG (LOG_DEBUG, "cleanup: done");
1238
1239   remove_pidfile ();
1240
1241   closelog ();
1242
1243   return (0);
1244 } /* }}} int cleanup */
1245
1246 static int read_options (int argc, char **argv) /* {{{ */
1247 {
1248   int option;
1249   int status = 0;
1250
1251   while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1)
1252   {
1253     switch (option)
1254     {
1255       case 'l':
1256       {
1257         char **temp;
1258
1259         temp = (char **) realloc (config_listen_address_list,
1260             sizeof (char *) * (config_listen_address_list_len + 1));
1261         if (temp == NULL)
1262         {
1263           fprintf (stderr, "read_options: realloc failed.\n");
1264           return (2);
1265         }
1266         config_listen_address_list = temp;
1267
1268         temp[config_listen_address_list_len] = strdup (optarg);
1269         if (temp[config_listen_address_list_len] == NULL)
1270         {
1271           fprintf (stderr, "read_options: strdup failed.\n");
1272           return (2);
1273         }
1274         config_listen_address_list_len++;
1275       }
1276       break;
1277
1278       case 'f':
1279       {
1280         int temp;
1281
1282         temp = atoi (optarg);
1283         if (temp > 0)
1284           config_flush_interval = temp;
1285         else
1286         {
1287           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1288           status = 3;
1289         }
1290       }
1291       break;
1292
1293       case 'w':
1294       {
1295         int temp;
1296
1297         temp = atoi (optarg);
1298         if (temp > 0)
1299           config_write_interval = temp;
1300         else
1301         {
1302           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1303           status = 2;
1304         }
1305       }
1306       break;
1307
1308       case 'b':
1309       {
1310         size_t len;
1311
1312         if (config_base_dir != NULL)
1313           free (config_base_dir);
1314         config_base_dir = strdup (optarg);
1315         if (config_base_dir == NULL)
1316         {
1317           fprintf (stderr, "read_options: strdup failed.\n");
1318           return (3);
1319         }
1320
1321         len = strlen (config_base_dir);
1322         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1323         {
1324           config_base_dir[len - 1] = 0;
1325           len--;
1326         }
1327
1328         if (len < 1)
1329         {
1330           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1331           return (4);
1332         }
1333       }
1334       break;
1335
1336       case 'p':
1337       {
1338         if (config_pid_file != NULL)
1339           free (config_pid_file);
1340         config_pid_file = strdup (optarg);
1341         if (config_pid_file == NULL)
1342         {
1343           fprintf (stderr, "read_options: strdup failed.\n");
1344           return (3);
1345         }
1346       }
1347       break;
1348
1349       case 'h':
1350       case '?':
1351         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1352             "\n"
1353             "Usage: rrdcached [options]\n"
1354             "\n"
1355             "Valid options are:\n"
1356             "  -l <address>  Socket address to listen to.\n"
1357             "  -w <seconds>  Interval in which to write data.\n"
1358             "  -f <seconds>  Interval in which to flush dead data.\n"
1359             "  -p <file>     Location of the PID-file.\n"
1360             "  -b <dir>      Base directory to change to.\n"
1361             "\n"
1362             "For more information and a detailed description of all options "
1363             "please refer\n"
1364             "to the rrdcached(1) manual page.\n",
1365             VERSION);
1366         status = -1;
1367         break;
1368     } /* switch (option) */
1369   } /* while (getopt) */
1370
1371   return (status);
1372 } /* }}} int read_options */
1373
1374 int main (int argc, char **argv)
1375 {
1376   int status;
1377
1378   status = read_options (argc, argv);
1379   if (status != 0)
1380   {
1381     if (status < 0)
1382       status = 0;
1383     return (status);
1384   }
1385
1386   status = daemonize ();
1387   if (status == 1)
1388   {
1389     struct sigaction sigchld;
1390
1391     memset (&sigchld, 0, sizeof (sigchld));
1392     sigchld.sa_handler = SIG_IGN;
1393     sigaction (SIGCHLD, &sigchld, NULL);
1394
1395     return (0);
1396   }
1397   else if (status != 0)
1398   {
1399     fprintf (stderr, "daemonize failed, exiting.\n");
1400     return (1);
1401   }
1402
1403   listen_thread_main (NULL);
1404
1405   cleanup ();
1406
1407   return (0);
1408 } /* int main */
1409
1410 /*
1411  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1412  */