024738b7a30bc0b2d3c14ac1122763fd92c5ac27
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69 #include <stdint.h>
70 #include <inttypes.h>
71
72 #include <sys/types.h>
73 #include <sys/stat.h>
74 #include <fcntl.h>
75 #include <signal.h>
76 #include <sys/socket.h>
77 #include <sys/un.h>
78 #include <netdb.h>
79 #include <poll.h>
80 #include <syslog.h>
81 #include <pthread.h>
82 #include <errno.h>
83 #include <assert.h>
84 #include <sys/time.h>
85 #include <time.h>
86
87 #include <glib-2.0/glib.h>
88 /* }}} */
89
90 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
91
92 #ifndef __GNUC__
93 # define __attribute__(x) /**/
94 #endif
95
96 /*
97  * Types
98  */
99 struct listen_socket_s
100 {
101   int fd;
102   char path[PATH_MAX + 1];
103 };
104 typedef struct listen_socket_s listen_socket_t;
105
106 struct cache_item_s;
107 typedef struct cache_item_s cache_item_t;
108 struct cache_item_s
109 {
110   char *file;
111   char **values;
112   int values_num;
113   time_t last_flush_time;
114 #define CI_FLAGS_IN_TREE  0x01
115 #define CI_FLAGS_IN_QUEUE 0x02
116   int flags;
117
118   cache_item_t *next;
119 };
120
121 struct callback_flush_data_s
122 {
123   time_t now;
124   char **keys;
125   size_t keys_num;
126 };
127 typedef struct callback_flush_data_s callback_flush_data_t;
128
129 enum queue_side_e
130 {
131   HEAD,
132   TAIL
133 };
134 typedef enum queue_side_e queue_side_t;
135
136 /*
137  * Variables
138  */
139 static listen_socket_t *listen_fds = NULL;
140 static size_t listen_fds_num = 0;
141
142 static int do_shutdown = 0;
143
144 static pthread_t queue_thread;
145
146 static pthread_t *connetion_threads = NULL;
147 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
148 static int connetion_threads_num = 0;
149
150 /* Cache stuff */
151 static GTree          *cache_tree = NULL;
152 static cache_item_t   *cache_queue_head = NULL;
153 static cache_item_t   *cache_queue_tail = NULL;
154 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
155 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
156
157 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
158
159 static int config_write_interval = 300;
160 static int config_flush_interval = 3600;
161 static char *config_pid_file = NULL;
162 static char *config_base_dir = NULL;
163
164 static char **config_listen_address_list = NULL;
165 static int config_listen_address_list_len = 0;
166
167 static uint64_t stats_queue_length = 0;
168 static uint64_t stats_updates_total = 0;
169 static uint64_t stats_values_total = 0;
170 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
171
172 /* 
173  * Functions
174  */
175 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
176 {
177   do_shutdown++;
178 } /* }}} void sig_int_handler */
179
180 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
181 {
182   do_shutdown++;
183 } /* }}} void sig_term_handler */
184
185 static int write_pidfile (void) /* {{{ */
186 {
187   pid_t pid;
188   char *file;
189   FILE *fh;
190
191   pid = getpid ();
192   
193   file = (config_pid_file != NULL)
194     ? config_pid_file
195     : LOCALSTATEDIR "/run/rrdcached.pid";
196
197   fh = fopen (file, "w");
198   if (fh == NULL)
199   {
200     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
201     return (-1);
202   }
203
204   fprintf (fh, "%i\n", (int) pid);
205   fclose (fh);
206
207   return (0);
208 } /* }}} int write_pidfile */
209
210 static int remove_pidfile (void) /* {{{ */
211 {
212   char *file;
213   int status;
214
215   file = (config_pid_file != NULL)
216     ? config_pid_file
217     : LOCALSTATEDIR "/run/rrdcached.pid";
218
219   status = unlink (file);
220   if (status == 0)
221     return (0);
222   return (errno);
223 } /* }}} int remove_pidfile */
224
225 /*
226  * enqueue_cache_item:
227  * `cache_lock' must be acquired before calling this function!
228  */
229 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
230     queue_side_t side)
231 {
232   int did_insert = 0;
233
234   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
235       ci->file);
236
237   if (ci == NULL)
238     return (-1);
239
240   if (ci->values_num == 0)
241     return (0);
242
243   if (side == HEAD)
244   {
245     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
246     {
247       assert (ci->next == NULL);
248       ci->next = cache_queue_head;
249       cache_queue_head = ci;
250
251       if (cache_queue_tail == NULL)
252         cache_queue_tail = cache_queue_head;
253
254       did_insert = 1;
255     }
256     else if (cache_queue_head == ci)
257     {
258       /* do nothing */
259     }
260     else /* enqueued, but not first entry */
261     {
262       cache_item_t *prev;
263
264       /* find previous entry */
265       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
266         if (prev->next == ci)
267           break;
268       assert (prev != NULL);
269
270       /* move to the front */
271       prev->next = ci->next;
272       ci->next = cache_queue_head;
273       cache_queue_head = ci;
274
275       /* check if we need to adapt the tail */
276       if (cache_queue_tail == ci)
277         cache_queue_tail = prev;
278     }
279   }
280   else /* (side == TAIL) */
281   {
282     /* We don't move values back in the list.. */
283     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
284       return (0);
285
286     assert (ci->next == NULL);
287
288     if (cache_queue_tail == NULL)
289       cache_queue_head = ci;
290     else
291       cache_queue_tail->next = ci;
292     cache_queue_tail = ci;
293
294     did_insert = 1;
295   }
296
297   ci->flags |= CI_FLAGS_IN_QUEUE;
298
299   if (did_insert)
300   {
301     pthread_mutex_lock (&stats_lock);
302     stats_queue_length++;
303     pthread_mutex_unlock (&stats_lock);
304   }
305
306   return (0);
307 } /* }}} int enqueue_cache_item */
308
309 /*
310  * tree_callback_flush:
311  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
312  * while this is in progress.
313  */
314 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
315     gpointer data)
316 {
317   cache_item_t *ci;
318   callback_flush_data_t *cfd;
319
320   ci = (cache_item_t *) value;
321   cfd = (callback_flush_data_t *) data;
322
323   if (((cfd->now - ci->last_flush_time) >= config_write_interval)
324       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
325       && (ci->values_num > 0))
326   {
327     enqueue_cache_item (ci, TAIL);
328   }
329   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
330       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
331       && (ci->values_num <= 0))
332   {
333     char **temp;
334
335     temp = (char **) realloc (cfd->keys,
336         sizeof (char *) * (cfd->keys_num + 1));
337     if (temp == NULL)
338     {
339       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
340       return (FALSE);
341     }
342     cfd->keys = temp;
343     /* Make really sure this points to the _same_ place */
344     assert ((char *) key == ci->file);
345     cfd->keys[cfd->keys_num] = (char *) key;
346     cfd->keys_num++;
347   }
348
349   return (FALSE);
350 } /* }}} gboolean tree_callback_flush */
351
352 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
353 {
354   struct timeval now;
355   struct timespec next_flush;
356
357   gettimeofday (&now, NULL);
358   next_flush.tv_sec = now.tv_sec + config_flush_interval;
359   next_flush.tv_nsec = 1000 * now.tv_usec;
360
361   pthread_mutex_lock (&cache_lock);
362   while ((do_shutdown == 0) || (cache_queue_head != NULL))
363   {
364     cache_item_t *ci;
365     char *file;
366     char **values;
367     int values_num;
368     int status;
369     int i;
370
371     /* First, check if it's time to do the cache flush. */
372     gettimeofday (&now, NULL);
373     if ((now.tv_sec > next_flush.tv_sec)
374         || ((now.tv_sec == next_flush.tv_sec)
375           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
376     {
377       callback_flush_data_t cfd;
378       size_t k;
379
380       memset (&cfd, 0, sizeof (cfd));
381       /* Pass the current time as user data so that we don't need to call
382        * `time' for each node. */
383       cfd.now = time (NULL);
384       cfd.keys = NULL;
385       cfd.keys_num = 0;
386
387       /* `tree_callback_flush' will return the keys of all values that haven't
388        * been touched in the last `config_flush_interval' seconds in `cfd'.
389        * The char*'s in this array point to the same memory as ci->file, so we
390        * don't need to free them separately. */
391       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
392
393       for (k = 0; k < cfd.keys_num; k++)
394       {
395         /* This must not fail. */
396         ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
397         assert (ci != NULL);
398
399         /* If we end up here with values available, something's seriously
400          * messed up. */
401         assert (ci->values_num == 0);
402
403         /* Remove the node from the tree */
404         g_tree_remove (cache_tree, cfd.keys[k]);
405         cfd.keys[k] = NULL;
406
407         /* Now free and clean up `ci'. */
408         free (ci->file);
409         ci->file = NULL;
410         free (ci);
411         ci = NULL;
412       } /* for (k = 0; k < cfd.keys_num; k++) */
413
414       if (cfd.keys != NULL)
415       {
416         free (cfd.keys);
417         cfd.keys = NULL;
418       }
419
420       /* Determine the time of the next cache flush. */
421       while (next_flush.tv_sec < now.tv_sec)
422         next_flush.tv_sec += config_flush_interval;
423     }
424
425     /* Now, check if there's something to store away. If not, wait until
426      * something comes in or it's time to do the cache flush. */
427     if (cache_queue_head == NULL)
428     {
429       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
430       if ((status != 0) && (status != ETIMEDOUT))
431       {
432         RRDD_LOG (LOG_ERR, "queue_thread_main: "
433             "pthread_cond_timedwait returned %i.", status);
434       }
435     }
436
437     /* Check if a value has arrived. This may be NULL if we timed out or there
438      * was an interrupt such as a signal. */
439     if (cache_queue_head == NULL)
440       continue;
441
442     ci = cache_queue_head;
443
444     /* copy the relevant parts */
445     file = strdup (ci->file);
446     if (file == NULL)
447     {
448       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
449       continue;
450     }
451
452     values = ci->values;
453     values_num = ci->values_num;
454
455     ci->values = NULL;
456     ci->values_num = 0;
457
458     ci->last_flush_time = time (NULL);
459     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
460
461     cache_queue_head = ci->next;
462     if (cache_queue_head == NULL)
463       cache_queue_tail = NULL;
464     ci->next = NULL;
465
466     pthread_mutex_lock (&stats_lock);
467     assert (stats_queue_length > 0);
468     stats_queue_length--;
469     pthread_mutex_unlock (&stats_lock);
470
471     pthread_mutex_unlock (&cache_lock);
472
473     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
474         file, values_num, (void *) values);
475
476     status = rrd_update_r (file, NULL, values_num, (void *) values);
477     if (status != 0)
478     {
479       RRDD_LOG (LOG_ERR, "queue_thread_main: "
480           "rrd_update_r failed with status %i.",
481           status);
482     }
483
484     free (file);
485     for (i = 0; i < values_num; i++)
486       free (values[i]);
487
488     pthread_mutex_lock (&stats_lock);
489     stats_updates_total++;
490     stats_values_total += values_num;
491     pthread_mutex_unlock (&stats_lock);
492
493     pthread_mutex_lock (&cache_lock);
494     pthread_cond_broadcast (&flush_cond);
495   } /* while (do_shutdown == 0) */
496   pthread_mutex_unlock (&cache_lock);
497
498   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
499
500   return (NULL);
501 } /* }}} void *queue_thread_main */
502
503 static int buffer_get_field (char **buffer_ret, /* {{{ */
504     size_t *buffer_size_ret, char **field_ret)
505 {
506   char *buffer;
507   size_t buffer_pos;
508   size_t buffer_size;
509   char *field;
510   size_t field_size;
511   int status;
512
513   buffer = *buffer_ret;
514   buffer_pos = 0;
515   buffer_size = *buffer_size_ret;
516   field = *buffer_ret;
517   field_size = 0;
518
519   /* This is ensured by `handle_request'. */
520   assert (buffer[buffer_size - 1] == ' ');
521
522   status = -1;
523   while (buffer_pos < buffer_size)
524   {
525     /* Check for end-of-field or end-of-buffer */
526     if (buffer[buffer_pos] == ' ')
527     {
528       field[field_size] = 0;
529       field_size++;
530       buffer_pos++;
531       status = 0;
532       break;
533     }
534     /* Handle escaped characters. */
535     else if (buffer[buffer_pos] == '\\')
536     {
537       if (buffer_pos >= (buffer_size - 1))
538         break;
539       buffer_pos++;
540       field[field_size] = buffer[buffer_pos];
541       field_size++;
542       buffer_pos++;
543     }
544     /* Normal operation */ 
545     else
546     {
547       field[field_size] = buffer[buffer_pos];
548       field_size++;
549       buffer_pos++;
550     }
551   } /* while (buffer_pos < buffer_size) */
552
553   if (status != 0)
554     return (status);
555
556   *buffer_ret = buffer + buffer_pos;
557   *buffer_size_ret = buffer_size - buffer_pos;
558   *field_ret = field;
559
560   return (0);
561 } /* }}} int buffer_get_field */
562
563 static int flush_file (const char *filename) /* {{{ */
564 {
565   cache_item_t *ci;
566
567   pthread_mutex_lock (&cache_lock);
568
569   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
570   if (ci == NULL)
571   {
572     pthread_mutex_unlock (&cache_lock);
573     return (ENOENT);
574   }
575
576   /* Enqueue at head */
577   enqueue_cache_item (ci, HEAD);
578   pthread_cond_signal (&cache_cond);
579
580   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
581   {
582     ci = NULL;
583
584     pthread_cond_wait (&flush_cond, &cache_lock);
585
586     ci = g_tree_lookup (cache_tree, filename);
587     if (ci == NULL)
588     {
589       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
590           "while waiting for flush.");
591       pthread_mutex_unlock (&cache_lock);
592       return (-1);
593     }
594   }
595
596   pthread_mutex_unlock (&cache_lock);
597   return (0);
598 } /* }}} int flush_file */
599
600 static int handle_request_stats (int fd, /* {{{ */
601     char *buffer __attribute__((unused)),
602     size_t buffer_size __attribute__((unused)))
603 {
604   int status;
605   char outbuf[4096];
606
607   uint64_t copy_queue_length;
608   uint64_t copy_updates_total;
609   uint64_t copy_values_total;
610
611   uint64_t tree_nodes;
612   uint64_t tree_depth;
613
614   pthread_mutex_lock (&stats_lock);
615   copy_queue_length  = stats_queue_length;
616   copy_updates_total = stats_updates_total;
617   copy_values_total  = stats_values_total;
618   pthread_mutex_unlock (&stats_lock);
619
620   pthread_mutex_lock (&cache_lock);
621   tree_nodes = (uint64_t) g_tree_nnodes (cache_tree);
622   tree_depth = (uint64_t) g_tree_height (cache_tree);
623   pthread_mutex_unlock (&cache_lock);
624
625 #define RRDD_STATS_SEND \
626   outbuf[sizeof (outbuf) - 1] = 0; \
627   status = write (fd, outbuf, strlen (outbuf)); \
628   if (status < 0) \
629   { \
630     status = errno; \
631     RRDD_LOG (LOG_INFO, "handle_request_stats: write(2) returned an error."); \
632     return (status); \
633   }
634
635   strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf));
636   RRDD_STATS_SEND;
637
638   snprintf (outbuf, sizeof (outbuf),
639       "QueueLength: %"PRIu64"\n", copy_queue_length);
640   RRDD_STATS_SEND;
641
642   snprintf (outbuf, sizeof (outbuf),
643       "UpdatesWritten: %"PRIu64"\n", copy_updates_total);
644   RRDD_STATS_SEND;
645
646   snprintf (outbuf, sizeof (outbuf),
647       "ValuesWritten: %"PRIu64"\n", copy_values_total);
648   RRDD_STATS_SEND;
649
650   snprintf (outbuf, sizeof (outbuf),
651       "TreeNodesNumber: %"PRIu64"\n", tree_nodes);
652   RRDD_STATS_SEND;
653
654   snprintf (outbuf, sizeof (outbuf),
655       "TreeDepth: %"PRIu64"\n", tree_depth);
656   RRDD_STATS_SEND;
657
658   return (0);
659 } /* }}} int handle_request_stats */
660
661 static int handle_request_flush (int fd, /* {{{ */
662     char *buffer, size_t buffer_size)
663 {
664   char *file;
665   int status;
666   char result[4096];
667
668   status = buffer_get_field (&buffer, &buffer_size, &file);
669   if (status != 0)
670   {
671     RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name.");
672     return (-1);
673   }
674
675   status = flush_file (file);
676   if (status == 0)
677     snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
678   else if (status == ENOENT)
679     snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
680   else if (status < 0)
681     strncpy (result, "-1 Internal error.\n", sizeof (result));
682   else
683     snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
684   result[sizeof (result) - 1] = 0;
685
686   status = write (fd, result, strlen (result));
687   if (status < 0)
688   {
689     status = errno;
690     RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error.");
691     return (status);
692   }
693
694   return (0);
695 } /* }}} int handle_request_flush */
696
697 static int handle_request_update (int fd, /* {{{ */
698     char *buffer, size_t buffer_size)
699 {
700   char *file;
701   int values_num = 0;
702   int status;
703
704   time_t now;
705
706   cache_item_t *ci;
707   char answer[4096];
708
709   now = time (NULL);
710
711   status = buffer_get_field (&buffer, &buffer_size, &file);
712   if (status != 0)
713   {
714     RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
715     return (-1);
716   }
717
718   pthread_mutex_lock (&cache_lock);
719
720   ci = g_tree_lookup (cache_tree, file);
721   if (ci == NULL) /* {{{ */
722   {
723     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
724     if (ci == NULL)
725     {
726       pthread_mutex_unlock (&cache_lock);
727       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
728       return (-1);
729     }
730     memset (ci, 0, sizeof (cache_item_t));
731
732     ci->file = strdup (file);
733     if (ci->file == NULL)
734     {
735       pthread_mutex_unlock (&cache_lock);
736       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
737       free (ci);
738       return (-1);
739     }
740
741     ci->values = NULL;
742     ci->values_num = 0;
743     ci->last_flush_time = now;
744     ci->flags = CI_FLAGS_IN_TREE;
745
746     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
747
748     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
749         ci->file);
750   } /* }}} */
751   assert (ci != NULL);
752
753   while (buffer_size > 0)
754   {
755     char **temp;
756     char *value;
757
758     status = buffer_get_field (&buffer, &buffer_size, &value);
759     if (status != 0)
760     {
761       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
762       break;
763     }
764
765     temp = (char **) realloc (ci->values,
766         sizeof (char *) * (ci->values_num + 1));
767     if (temp == NULL)
768     {
769       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
770       continue;
771     }
772     ci->values = temp;
773
774     ci->values[ci->values_num] = strdup (value);
775     if (ci->values[ci->values_num] == NULL)
776     {
777       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
778       continue;
779     }
780     ci->values_num++;
781
782     values_num++;
783   }
784
785   if (((now - ci->last_flush_time) >= config_write_interval)
786       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
787       && (ci->values_num > 0))
788   {
789     enqueue_cache_item (ci, TAIL);
790     pthread_cond_signal (&cache_cond);
791   }
792
793   pthread_mutex_unlock (&cache_lock);
794
795   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
796   answer[sizeof (answer) - 1] = 0;
797
798   status = write (fd, answer, strlen (answer));
799   if (status < 0)
800   {
801     status = errno;
802     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
803     return (status);
804   }
805
806   return (0);
807 } /* }}} int handle_request_update */
808
809 static int handle_request (int fd) /* {{{ */
810 {
811   char buffer[4096];
812   size_t buffer_size;
813   char *buffer_ptr;
814   char *command;
815   int status;
816
817   status = read (fd, buffer, sizeof (buffer));
818   if (status == 0)
819   {
820     return (1);
821   }
822   else if (status < 0)
823   {
824     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
825     return (-1);
826   }
827   buffer_size = status;
828   assert (((size_t) buffer_size) <= sizeof (buffer));
829
830   if (buffer[buffer_size - 1] != '\n')
831   {
832     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
833     return (-1);
834   }
835
836   /* Accept Windows style line endings, too */
837   if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r'))
838   {
839     buffer_size--;
840     buffer[buffer_size - 1] = '\n';
841   }
842
843   /* Place the normal field separator at the end to simplify
844    * `buffer_get_field's work. */
845   buffer[buffer_size - 1] = ' ';
846
847   buffer_ptr = buffer;
848   command = NULL;
849   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
850   if (status != 0)
851   {
852     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
853     return (-1);
854   }
855
856   if (strcmp (command, "update") == 0)
857   {
858     return (handle_request_update (fd, buffer_ptr, buffer_size));
859   }
860   else if (strcmp (command, "flush") == 0)
861   {
862     return (handle_request_flush (fd, buffer_ptr, buffer_size));
863   }
864   else if (strcmp (command, "stats") == 0)
865   {
866     return (handle_request_stats (fd, buffer_ptr, buffer_size));
867   }
868   else
869   {
870     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
871     return (-1);
872   }
873 } /* }}} int handle_request */
874
875 static void *connection_thread_main (void *args /* {{{ */
876     __attribute__((unused)))
877 {
878   pthread_t self;
879   int i;
880   int fd;
881   
882   fd = *((int *) args);
883
884   pthread_mutex_lock (&connetion_threads_lock);
885   {
886     pthread_t *temp;
887
888     temp = (pthread_t *) realloc (connetion_threads,
889         sizeof (pthread_t) * (connetion_threads_num + 1));
890     if (temp == NULL)
891     {
892       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
893     }
894     else
895     {
896       connetion_threads = temp;
897       connetion_threads[connetion_threads_num] = pthread_self ();
898       connetion_threads_num++;
899     }
900   }
901   pthread_mutex_unlock (&connetion_threads_lock);
902
903   while (do_shutdown == 0)
904   {
905     struct pollfd pollfd;
906     int status;
907
908     pollfd.fd = fd;
909     pollfd.events = POLLIN | POLLPRI;
910     pollfd.revents = 0;
911
912     status = poll (&pollfd, 1, /* timeout = */ 500);
913     if (status == 0) /* timeout */
914       continue;
915     else if (status < 0) /* error */
916     {
917       status = errno;
918       if (status == EINTR)
919         continue;
920       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
921       continue;
922     }
923
924     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
925     {
926       close (fd);
927       break;
928     }
929     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
930     {
931       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
932           "poll(2) returned something unexpected: %#04hx",
933           pollfd.revents);
934       close (fd);
935       break;
936     }
937
938     status = handle_request (fd);
939     if (status != 0)
940     {
941       close (fd);
942       break;
943     }
944   }
945
946   self = pthread_self ();
947   /* Remove this thread from the connection threads list */
948   pthread_mutex_lock (&connetion_threads_lock);
949   /* Find out own index in the array */
950   for (i = 0; i < connetion_threads_num; i++)
951     if (pthread_equal (connetion_threads[i], self) != 0)
952       break;
953   assert (i < connetion_threads_num);
954
955   /* Move the trailing threads forward. */
956   if (i < (connetion_threads_num - 1))
957   {
958     memmove (connetion_threads + i,
959         connetion_threads + i + 1,
960         sizeof (pthread_t) * (connetion_threads_num - i - 1));
961   }
962
963   connetion_threads_num--;
964   pthread_mutex_unlock (&connetion_threads_lock);
965
966   free (args);
967   return (NULL);
968 } /* }}} void *connection_thread_main */
969
970 static int open_listen_socket_unix (const char *path) /* {{{ */
971 {
972   int fd;
973   struct sockaddr_un sa;
974   listen_socket_t *temp;
975   int status;
976
977   temp = (listen_socket_t *) realloc (listen_fds,
978       sizeof (listen_fds[0]) * (listen_fds_num + 1));
979   if (temp == NULL)
980   {
981     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
982     return (-1);
983   }
984   listen_fds = temp;
985   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
986
987   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
988   if (fd < 0)
989   {
990     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
991     return (-1);
992   }
993
994   memset (&sa, 0, sizeof (sa));
995   sa.sun_family = AF_UNIX;
996   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
997
998   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
999   if (status != 0)
1000   {
1001     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1002     close (fd);
1003     unlink (path);
1004     return (-1);
1005   }
1006
1007   status = listen (fd, /* backlog = */ 10);
1008   if (status != 0)
1009   {
1010     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1011     close (fd);
1012     unlink (path);
1013     return (-1);
1014   }
1015   
1016   listen_fds[listen_fds_num].fd = fd;
1017   snprintf (listen_fds[listen_fds_num].path,
1018       sizeof (listen_fds[listen_fds_num].path) - 1,
1019       "unix:%s", path);
1020   listen_fds_num++;
1021
1022   return (0);
1023 } /* }}} int open_listen_socket_unix */
1024
1025 static int open_listen_socket (const char *addr) /* {{{ */
1026 {
1027   struct addrinfo ai_hints;
1028   struct addrinfo *ai_res;
1029   struct addrinfo *ai_ptr;
1030   int status;
1031
1032   assert (addr != NULL);
1033
1034   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1035     return (open_listen_socket_unix (addr + strlen ("unix:")));
1036   else if (addr[0] == '/')
1037     return (open_listen_socket_unix (addr));
1038
1039   memset (&ai_hints, 0, sizeof (ai_hints));
1040   ai_hints.ai_flags = 0;
1041 #ifdef AI_ADDRCONFIG
1042   ai_hints.ai_flags |= AI_ADDRCONFIG;
1043 #endif
1044   ai_hints.ai_family = AF_UNSPEC;
1045   ai_hints.ai_socktype = SOCK_STREAM;
1046
1047   ai_res = NULL;
1048   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
1049   if (status != 0)
1050   {
1051     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1052         "%s", addr, gai_strerror (status));
1053     return (-1);
1054   }
1055
1056   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1057   {
1058     int fd;
1059     listen_socket_t *temp;
1060
1061     temp = (listen_socket_t *) realloc (listen_fds,
1062         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1063     if (temp == NULL)
1064     {
1065       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1066       continue;
1067     }
1068     listen_fds = temp;
1069     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1070
1071     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1072     if (fd < 0)
1073     {
1074       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1075       continue;
1076     }
1077
1078     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1079     if (status != 0)
1080     {
1081       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1082       close (fd);
1083       continue;
1084     }
1085
1086     status = listen (fd, /* backlog = */ 10);
1087     if (status != 0)
1088     {
1089       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1090       close (fd);
1091       return (-1);
1092     }
1093
1094     listen_fds[listen_fds_num].fd = fd;
1095     strncpy (listen_fds[listen_fds_num].path, addr,
1096         sizeof (listen_fds[listen_fds_num].path) - 1);
1097     listen_fds_num++;
1098   } /* for (ai_ptr) */
1099
1100   return (0);
1101 } /* }}} int open_listen_socket */
1102
1103 static int close_listen_sockets (void) /* {{{ */
1104 {
1105   size_t i;
1106
1107   for (i = 0; i < listen_fds_num; i++)
1108   {
1109     close (listen_fds[i].fd);
1110     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1111       unlink (listen_fds[i].path + strlen ("unix:"));
1112   }
1113
1114   free (listen_fds);
1115   listen_fds = NULL;
1116   listen_fds_num = 0;
1117
1118   return (0);
1119 } /* }}} int close_listen_sockets */
1120
1121 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1122 {
1123   struct pollfd *pollfds;
1124   int pollfds_num;
1125   int status;
1126   int i;
1127
1128   for (i = 0; i < config_listen_address_list_len; i++)
1129   {
1130     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
1131         "= %s", i, config_listen_address_list[i]);
1132     open_listen_socket (config_listen_address_list[i]);
1133   }
1134
1135   if (config_listen_address_list_len < 1)
1136     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1137
1138   if (listen_fds_num < 1)
1139   {
1140     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1141         "could be opened. Sorry.");
1142     return (NULL);
1143   }
1144
1145   pollfds_num = listen_fds_num;
1146   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1147   if (pollfds == NULL)
1148   {
1149     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1150     return (NULL);
1151   }
1152   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1153
1154   while (do_shutdown == 0)
1155   {
1156     assert (pollfds_num == ((int) listen_fds_num));
1157     for (i = 0; i < pollfds_num; i++)
1158     {
1159       pollfds[i].fd = listen_fds[i].fd;
1160       pollfds[i].events = POLLIN | POLLPRI;
1161       pollfds[i].revents = 0;
1162     }
1163
1164     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1165     if (status < 1)
1166     {
1167       status = errno;
1168       if (status != EINTR)
1169       {
1170         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1171       }
1172       continue;
1173     }
1174
1175     for (i = 0; i < pollfds_num; i++)
1176     {
1177       int *client_sd;
1178       struct sockaddr_storage client_sa;
1179       socklen_t client_sa_size;
1180       pthread_t tid;
1181
1182       if (pollfds[i].revents == 0)
1183         continue;
1184
1185       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1186       {
1187         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1188             "poll(2) returned something unexpected for listen FD #%i.",
1189             pollfds[i].fd);
1190         continue;
1191       }
1192
1193       client_sd = (int *) malloc (sizeof (int));
1194       if (client_sd == NULL)
1195       {
1196         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1197         continue;
1198       }
1199
1200       client_sa_size = sizeof (client_sa);
1201       *client_sd = accept (pollfds[i].fd,
1202           (struct sockaddr *) &client_sa, &client_sa_size);
1203       if (*client_sd < 0)
1204       {
1205         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1206         continue;
1207       }
1208
1209       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
1210           /* args = */ (void *) client_sd);
1211       if (status != 0)
1212       {
1213         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1214         close (*client_sd);
1215         free (client_sd);
1216         continue;
1217       }
1218     } /* for (pollfds_num) */
1219   } /* while (do_shutdown == 0) */
1220
1221   close_listen_sockets ();
1222
1223   pthread_mutex_lock (&connetion_threads_lock);
1224   while (connetion_threads_num > 0)
1225   {
1226     pthread_t wait_for;
1227
1228     wait_for = connetion_threads[0];
1229
1230     pthread_mutex_unlock (&connetion_threads_lock);
1231     pthread_join (wait_for, /* retval = */ NULL);
1232     pthread_mutex_lock (&connetion_threads_lock);
1233   }
1234   pthread_mutex_unlock (&connetion_threads_lock);
1235
1236   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
1237
1238   return (NULL);
1239 } /* }}} void *listen_thread_main */
1240
1241 static int daemonize (void) /* {{{ */
1242 {
1243   pid_t child;
1244   int status;
1245   char *base_dir;
1246
1247   /* These structures are static, because `sigaction' behaves weird if the are
1248    * overwritten.. */
1249   static struct sigaction sa_int;
1250   static struct sigaction sa_term;
1251   static struct sigaction sa_pipe;
1252
1253   child = fork ();
1254   if (child < 0)
1255   {
1256     fprintf (stderr, "daemonize: fork(2) failed.\n");
1257     return (-1);
1258   }
1259   else if (child > 0)
1260   {
1261     return (1);
1262   }
1263
1264   /* Change into the /tmp directory. */
1265   base_dir = (config_base_dir != NULL)
1266     ? config_base_dir
1267     : "/tmp";
1268   status = chdir (base_dir);
1269   if (status != 0)
1270   {
1271     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1272     return (-1);
1273   }
1274
1275   /* Become session leader */
1276   setsid ();
1277
1278   /* Open the first three file descriptors to /dev/null */
1279   close (2);
1280   close (1);
1281   close (0);
1282
1283   open ("/dev/null", O_RDWR);
1284   dup (0);
1285   dup (0);
1286
1287   /* Install signal handlers */
1288   memset (&sa_int, 0, sizeof (sa_int));
1289   sa_int.sa_handler = sig_int_handler;
1290   sigaction (SIGINT, &sa_int, NULL);
1291
1292   memset (&sa_term, 0, sizeof (sa_term));
1293   sa_term.sa_handler = sig_term_handler;
1294   sigaction (SIGINT, &sa_term, NULL);
1295
1296   memset (&sa_pipe, 0, sizeof (sa_pipe));
1297   sa_pipe.sa_handler = SIG_IGN;
1298   sigaction (SIGPIPE, &sa_pipe, NULL);
1299
1300   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1301
1302   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1303   if (cache_tree == NULL)
1304   {
1305     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1306     return (-1);
1307   }
1308
1309   memset (&queue_thread, 0, sizeof (queue_thread));
1310   status = pthread_create (&queue_thread, /* attr = */ NULL,
1311       queue_thread_main, /* args = */ NULL);
1312   if (status != 0)
1313   {
1314     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
1315     return (-1);
1316   }
1317
1318   write_pidfile ();
1319
1320   return (0);
1321 } /* }}} int daemonize */
1322
1323 static int cleanup (void) /* {{{ */
1324 {
1325   RRDD_LOG (LOG_DEBUG, "cleanup ()");
1326
1327   do_shutdown++;
1328
1329   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
1330   pthread_cond_signal (&cache_cond);
1331   pthread_join (queue_thread, /* return = */ NULL);
1332   RRDD_LOG (LOG_DEBUG, "cleanup: done");
1333
1334   remove_pidfile ();
1335
1336   closelog ();
1337
1338   return (0);
1339 } /* }}} int cleanup */
1340
1341 static int read_options (int argc, char **argv) /* {{{ */
1342 {
1343   int option;
1344   int status = 0;
1345
1346   while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1)
1347   {
1348     switch (option)
1349     {
1350       case 'l':
1351       {
1352         char **temp;
1353
1354         temp = (char **) realloc (config_listen_address_list,
1355             sizeof (char *) * (config_listen_address_list_len + 1));
1356         if (temp == NULL)
1357         {
1358           fprintf (stderr, "read_options: realloc failed.\n");
1359           return (2);
1360         }
1361         config_listen_address_list = temp;
1362
1363         temp[config_listen_address_list_len] = strdup (optarg);
1364         if (temp[config_listen_address_list_len] == NULL)
1365         {
1366           fprintf (stderr, "read_options: strdup failed.\n");
1367           return (2);
1368         }
1369         config_listen_address_list_len++;
1370       }
1371       break;
1372
1373       case 'f':
1374       {
1375         int temp;
1376
1377         temp = atoi (optarg);
1378         if (temp > 0)
1379           config_flush_interval = temp;
1380         else
1381         {
1382           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1383           status = 3;
1384         }
1385       }
1386       break;
1387
1388       case 'w':
1389       {
1390         int temp;
1391
1392         temp = atoi (optarg);
1393         if (temp > 0)
1394           config_write_interval = temp;
1395         else
1396         {
1397           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1398           status = 2;
1399         }
1400       }
1401       break;
1402
1403       case 'b':
1404       {
1405         size_t len;
1406
1407         if (config_base_dir != NULL)
1408           free (config_base_dir);
1409         config_base_dir = strdup (optarg);
1410         if (config_base_dir == NULL)
1411         {
1412           fprintf (stderr, "read_options: strdup failed.\n");
1413           return (3);
1414         }
1415
1416         len = strlen (config_base_dir);
1417         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1418         {
1419           config_base_dir[len - 1] = 0;
1420           len--;
1421         }
1422
1423         if (len < 1)
1424         {
1425           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1426           return (4);
1427         }
1428       }
1429       break;
1430
1431       case 'p':
1432       {
1433         if (config_pid_file != NULL)
1434           free (config_pid_file);
1435         config_pid_file = strdup (optarg);
1436         if (config_pid_file == NULL)
1437         {
1438           fprintf (stderr, "read_options: strdup failed.\n");
1439           return (3);
1440         }
1441       }
1442       break;
1443
1444       case 'h':
1445       case '?':
1446         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1447             "\n"
1448             "Usage: rrdcached [options]\n"
1449             "\n"
1450             "Valid options are:\n"
1451             "  -l <address>  Socket address to listen to.\n"
1452             "  -w <seconds>  Interval in which to write data.\n"
1453             "  -f <seconds>  Interval in which to flush dead data.\n"
1454             "  -p <file>     Location of the PID-file.\n"
1455             "  -b <dir>      Base directory to change to.\n"
1456             "\n"
1457             "For more information and a detailed description of all options "
1458             "please refer\n"
1459             "to the rrdcached(1) manual page.\n",
1460             VERSION);
1461         status = -1;
1462         break;
1463     } /* switch (option) */
1464   } /* while (getopt) */
1465
1466   return (status);
1467 } /* }}} int read_options */
1468
1469 int main (int argc, char **argv)
1470 {
1471   int status;
1472
1473   status = read_options (argc, argv);
1474   if (status != 0)
1475   {
1476     if (status < 0)
1477       status = 0;
1478     return (status);
1479   }
1480
1481   status = daemonize ();
1482   if (status == 1)
1483   {
1484     struct sigaction sigchld;
1485
1486     memset (&sigchld, 0, sizeof (sigchld));
1487     sigchld.sa_handler = SIG_IGN;
1488     sigaction (SIGCHLD, &sigchld, NULL);
1489
1490     return (0);
1491   }
1492   else if (status != 0)
1493   {
1494     fprintf (stderr, "daemonize failed, exiting.\n");
1495     return (1);
1496   }
1497
1498   listen_thread_main (NULL);
1499
1500   cleanup ();
1501
1502   return (0);
1503 } /* int main */
1504
1505 /*
1506  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1507  */