src/rrd_daemon.c: Stat files before creating a tree-node for them.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69 #include <strings.h>
70 #include <stdint.h>
71 #include <inttypes.h>
72
73 #include <sys/types.h>
74 #include <sys/stat.h>
75 #include <fcntl.h>
76 #include <signal.h>
77 #include <sys/socket.h>
78 #include <sys/un.h>
79 #include <netdb.h>
80 #include <poll.h>
81 #include <syslog.h>
82 #include <pthread.h>
83 #include <errno.h>
84 #include <assert.h>
85 #include <sys/time.h>
86 #include <time.h>
87
88 #include <glib-2.0/glib.h>
89 /* }}} */
90
91 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
92
93 #ifndef __GNUC__
94 # define __attribute__(x) /**/
95 #endif
96
97 /*
98  * Types
99  */
100 struct listen_socket_s
101 {
102   int fd;
103   char path[PATH_MAX + 1];
104 };
105 typedef struct listen_socket_s listen_socket_t;
106
107 struct cache_item_s;
108 typedef struct cache_item_s cache_item_t;
109 struct cache_item_s
110 {
111   char *file;
112   char **values;
113   int values_num;
114   time_t last_flush_time;
115 #define CI_FLAGS_IN_TREE  0x01
116 #define CI_FLAGS_IN_QUEUE 0x02
117   int flags;
118
119   cache_item_t *next;
120 };
121
122 struct callback_flush_data_s
123 {
124   time_t now;
125   char **keys;
126   size_t keys_num;
127 };
128 typedef struct callback_flush_data_s callback_flush_data_t;
129
130 enum queue_side_e
131 {
132   HEAD,
133   TAIL
134 };
135 typedef enum queue_side_e queue_side_t;
136
137 /*
138  * Variables
139  */
140 static listen_socket_t *listen_fds = NULL;
141 static size_t listen_fds_num = 0;
142
143 static int do_shutdown = 0;
144
145 static pthread_t queue_thread;
146
147 static pthread_t *connetion_threads = NULL;
148 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
149 static int connetion_threads_num = 0;
150
151 /* Cache stuff */
152 static GTree          *cache_tree = NULL;
153 static cache_item_t   *cache_queue_head = NULL;
154 static cache_item_t   *cache_queue_tail = NULL;
155 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
156 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
157
158 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
159
160 static int config_write_interval = 300;
161 static int config_flush_interval = 3600;
162 static char *config_pid_file = NULL;
163 static char *config_base_dir = NULL;
164
165 static char **config_listen_address_list = NULL;
166 static int config_listen_address_list_len = 0;
167
168 static uint64_t stats_queue_length = 0;
169 static uint64_t stats_updates_written = 0;
170 static uint64_t stats_data_sets_written = 0;
171 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
172
173 /* 
174  * Functions
175  */
176 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
177 {
178   do_shutdown++;
179 } /* }}} void sig_int_handler */
180
181 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
182 {
183   do_shutdown++;
184 } /* }}} void sig_term_handler */
185
186 static int write_pidfile (void) /* {{{ */
187 {
188   pid_t pid;
189   char *file;
190   FILE *fh;
191
192   pid = getpid ();
193   
194   file = (config_pid_file != NULL)
195     ? config_pid_file
196     : LOCALSTATEDIR "/run/rrdcached.pid";
197
198   fh = fopen (file, "w");
199   if (fh == NULL)
200   {
201     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
202     return (-1);
203   }
204
205   fprintf (fh, "%i\n", (int) pid);
206   fclose (fh);
207
208   return (0);
209 } /* }}} int write_pidfile */
210
211 static int remove_pidfile (void) /* {{{ */
212 {
213   char *file;
214   int status;
215
216   file = (config_pid_file != NULL)
217     ? config_pid_file
218     : LOCALSTATEDIR "/run/rrdcached.pid";
219
220   status = unlink (file);
221   if (status == 0)
222     return (0);
223   return (errno);
224 } /* }}} int remove_pidfile */
225
226 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
227 {
228   char    *buffer;
229   size_t   buffer_used;
230   size_t   buffer_free;
231   ssize_t  status;
232
233   buffer       = (char *) buffer_void;
234   buffer_used  = 0;
235   buffer_free  = buffer_size;
236
237   while (buffer_free > 0)
238   {
239     status = read (fd, buffer + buffer_used, buffer_free);
240     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
241       continue;
242
243     if (status < 0)
244       return (-1);
245
246     if (status == 0)
247       return (0);
248
249     assert ((0 > status) || (buffer_free >= (size_t) status));
250
251     buffer_free = buffer_free - status;
252     buffer_used = buffer_used + status;
253
254     if (buffer[buffer_used - 1] == '\n')
255       break;
256   }
257
258   assert (buffer_used > 0);
259
260   if (buffer[buffer_used - 1] != '\n')
261   {
262     errno = ENOBUFS;
263     return (-1);
264   }
265
266   buffer[buffer_used - 1] = 0;
267
268   /* Fix network line endings. */
269   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
270   {
271     buffer_used--;
272     buffer[buffer_used - 1] = 0;
273   }
274
275   return (buffer_used);
276 } /* }}} ssize_t sread */
277
278 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
279 {
280   const char *ptr;
281   size_t      nleft;
282   ssize_t     status;
283
284   ptr   = (const char *) buf;
285   nleft = count;
286
287   while (nleft > 0)
288   {
289     status = write (fd, (const void *) ptr, nleft);
290
291     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
292       continue;
293
294     if (status < 0)
295       return (status);
296
297     nleft = nleft - status;
298     ptr   = ptr   + status;
299   }
300
301   return (0);
302 } /* }}} ssize_t swrite */
303
304 /*
305  * enqueue_cache_item:
306  * `cache_lock' must be acquired before calling this function!
307  */
308 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
309     queue_side_t side)
310 {
311   int did_insert = 0;
312
313   if (ci == NULL)
314     return (-1);
315
316   if (ci->values_num == 0)
317     return (0);
318
319   if (side == HEAD)
320   {
321     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
322     {
323       assert (ci->next == NULL);
324       ci->next = cache_queue_head;
325       cache_queue_head = ci;
326
327       if (cache_queue_tail == NULL)
328         cache_queue_tail = cache_queue_head;
329
330       did_insert = 1;
331     }
332     else if (cache_queue_head == ci)
333     {
334       /* do nothing */
335     }
336     else /* enqueued, but not first entry */
337     {
338       cache_item_t *prev;
339
340       /* find previous entry */
341       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
342         if (prev->next == ci)
343           break;
344       assert (prev != NULL);
345
346       /* move to the front */
347       prev->next = ci->next;
348       ci->next = cache_queue_head;
349       cache_queue_head = ci;
350
351       /* check if we need to adapt the tail */
352       if (cache_queue_tail == ci)
353         cache_queue_tail = prev;
354     }
355   }
356   else /* (side == TAIL) */
357   {
358     /* We don't move values back in the list.. */
359     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
360       return (0);
361
362     assert (ci->next == NULL);
363
364     if (cache_queue_tail == NULL)
365       cache_queue_head = ci;
366     else
367       cache_queue_tail->next = ci;
368     cache_queue_tail = ci;
369
370     did_insert = 1;
371   }
372
373   ci->flags |= CI_FLAGS_IN_QUEUE;
374
375   if (did_insert)
376   {
377     pthread_mutex_lock (&stats_lock);
378     stats_queue_length++;
379     pthread_mutex_unlock (&stats_lock);
380   }
381
382   return (0);
383 } /* }}} int enqueue_cache_item */
384
385 /*
386  * tree_callback_flush:
387  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
388  * while this is in progress.
389  */
390 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
391     gpointer data)
392 {
393   cache_item_t *ci;
394   callback_flush_data_t *cfd;
395
396   ci = (cache_item_t *) value;
397   cfd = (callback_flush_data_t *) data;
398
399   if (((cfd->now - ci->last_flush_time) >= config_write_interval)
400       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
401       && (ci->values_num > 0))
402   {
403     enqueue_cache_item (ci, TAIL);
404   }
405   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
406       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
407       && (ci->values_num <= 0))
408   {
409     char **temp;
410
411     temp = (char **) realloc (cfd->keys,
412         sizeof (char *) * (cfd->keys_num + 1));
413     if (temp == NULL)
414     {
415       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
416       return (FALSE);
417     }
418     cfd->keys = temp;
419     /* Make really sure this points to the _same_ place */
420     assert ((char *) key == ci->file);
421     cfd->keys[cfd->keys_num] = (char *) key;
422     cfd->keys_num++;
423   }
424
425   return (FALSE);
426 } /* }}} gboolean tree_callback_flush */
427
428 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
429 {
430   struct timeval now;
431   struct timespec next_flush;
432
433   gettimeofday (&now, NULL);
434   next_flush.tv_sec = now.tv_sec + config_flush_interval;
435   next_flush.tv_nsec = 1000 * now.tv_usec;
436
437   pthread_mutex_lock (&cache_lock);
438   while ((do_shutdown == 0) || (cache_queue_head != NULL))
439   {
440     cache_item_t *ci;
441     char *file;
442     char **values;
443     int values_num;
444     int status;
445     int i;
446
447     /* First, check if it's time to do the cache flush. */
448     gettimeofday (&now, NULL);
449     if ((now.tv_sec > next_flush.tv_sec)
450         || ((now.tv_sec == next_flush.tv_sec)
451           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
452     {
453       callback_flush_data_t cfd;
454       size_t k;
455
456       memset (&cfd, 0, sizeof (cfd));
457       /* Pass the current time as user data so that we don't need to call
458        * `time' for each node. */
459       cfd.now = time (NULL);
460       cfd.keys = NULL;
461       cfd.keys_num = 0;
462
463       /* `tree_callback_flush' will return the keys of all values that haven't
464        * been touched in the last `config_flush_interval' seconds in `cfd'.
465        * The char*'s in this array point to the same memory as ci->file, so we
466        * don't need to free them separately. */
467       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
468
469       for (k = 0; k < cfd.keys_num; k++)
470       {
471         /* This must not fail. */
472         ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
473         assert (ci != NULL);
474
475         /* If we end up here with values available, something's seriously
476          * messed up. */
477         assert (ci->values_num == 0);
478
479         /* Remove the node from the tree */
480         g_tree_remove (cache_tree, cfd.keys[k]);
481         cfd.keys[k] = NULL;
482
483         /* Now free and clean up `ci'. */
484         free (ci->file);
485         ci->file = NULL;
486         free (ci);
487         ci = NULL;
488       } /* for (k = 0; k < cfd.keys_num; k++) */
489
490       if (cfd.keys != NULL)
491       {
492         free (cfd.keys);
493         cfd.keys = NULL;
494       }
495
496       /* Determine the time of the next cache flush. */
497       while (next_flush.tv_sec < now.tv_sec)
498         next_flush.tv_sec += config_flush_interval;
499     }
500
501     /* Now, check if there's something to store away. If not, wait until
502      * something comes in or it's time to do the cache flush. */
503     if (cache_queue_head == NULL)
504     {
505       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
506       if ((status != 0) && (status != ETIMEDOUT))
507       {
508         RRDD_LOG (LOG_ERR, "queue_thread_main: "
509             "pthread_cond_timedwait returned %i.", status);
510       }
511     }
512
513     /* Check if a value has arrived. This may be NULL if we timed out or there
514      * was an interrupt such as a signal. */
515     if (cache_queue_head == NULL)
516       continue;
517
518     ci = cache_queue_head;
519
520     /* copy the relevant parts */
521     file = strdup (ci->file);
522     if (file == NULL)
523     {
524       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
525       continue;
526     }
527
528     values = ci->values;
529     values_num = ci->values_num;
530
531     ci->values = NULL;
532     ci->values_num = 0;
533
534     ci->last_flush_time = time (NULL);
535     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
536
537     cache_queue_head = ci->next;
538     if (cache_queue_head == NULL)
539       cache_queue_tail = NULL;
540     ci->next = NULL;
541
542     pthread_mutex_lock (&stats_lock);
543     assert (stats_queue_length > 0);
544     stats_queue_length--;
545     pthread_mutex_unlock (&stats_lock);
546
547     pthread_mutex_unlock (&cache_lock);
548
549     status = rrd_update_r (file, NULL, values_num, (void *) values);
550     if (status != 0)
551     {
552       RRDD_LOG (LOG_ERR, "queue_thread_main: "
553           "rrd_update_r failed with status %i.",
554           status);
555     }
556
557     free (file);
558     for (i = 0; i < values_num; i++)
559       free (values[i]);
560
561     if (status == 0)
562     {
563       pthread_mutex_lock (&stats_lock);
564       stats_updates_written++;
565       stats_data_sets_written += values_num;
566       pthread_mutex_unlock (&stats_lock);
567     }
568
569     pthread_mutex_lock (&cache_lock);
570     pthread_cond_broadcast (&flush_cond);
571   } /* while (do_shutdown == 0) */
572   pthread_mutex_unlock (&cache_lock);
573
574   return (NULL);
575 } /* }}} void *queue_thread_main */
576
577 static int buffer_get_field (char **buffer_ret, /* {{{ */
578     size_t *buffer_size_ret, char **field_ret)
579 {
580   char *buffer;
581   size_t buffer_pos;
582   size_t buffer_size;
583   char *field;
584   size_t field_size;
585   int status;
586
587   buffer = *buffer_ret;
588   buffer_pos = 0;
589   buffer_size = *buffer_size_ret;
590   field = *buffer_ret;
591   field_size = 0;
592
593   if (buffer_size <= 0)
594     return (-1);
595
596   /* This is ensured by `handle_request'. */
597   assert (buffer[buffer_size - 1] == ' ');
598
599   status = -1;
600   while (buffer_pos < buffer_size)
601   {
602     /* Check for end-of-field or end-of-buffer */
603     if (buffer[buffer_pos] == ' ')
604     {
605       field[field_size] = 0;
606       field_size++;
607       buffer_pos++;
608       status = 0;
609       break;
610     }
611     /* Handle escaped characters. */
612     else if (buffer[buffer_pos] == '\\')
613     {
614       if (buffer_pos >= (buffer_size - 1))
615         break;
616       buffer_pos++;
617       field[field_size] = buffer[buffer_pos];
618       field_size++;
619       buffer_pos++;
620     }
621     /* Normal operation */ 
622     else
623     {
624       field[field_size] = buffer[buffer_pos];
625       field_size++;
626       buffer_pos++;
627     }
628   } /* while (buffer_pos < buffer_size) */
629
630   if (status != 0)
631     return (status);
632
633   *buffer_ret = buffer + buffer_pos;
634   *buffer_size_ret = buffer_size - buffer_pos;
635   *field_ret = field;
636
637   return (0);
638 } /* }}} int buffer_get_field */
639
640 static int flush_file (const char *filename) /* {{{ */
641 {
642   cache_item_t *ci;
643
644   pthread_mutex_lock (&cache_lock);
645
646   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
647   if (ci == NULL)
648   {
649     pthread_mutex_unlock (&cache_lock);
650     return (ENOENT);
651   }
652
653   /* Enqueue at head */
654   enqueue_cache_item (ci, HEAD);
655   pthread_cond_signal (&cache_cond);
656
657   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
658   {
659     ci = NULL;
660
661     pthread_cond_wait (&flush_cond, &cache_lock);
662
663     ci = g_tree_lookup (cache_tree, filename);
664     if (ci == NULL)
665     {
666       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
667           "while waiting for flush.");
668       pthread_mutex_unlock (&cache_lock);
669       return (-1);
670     }
671   }
672
673   pthread_mutex_unlock (&cache_lock);
674   return (0);
675 } /* }}} int flush_file */
676
677 static int handle_request_help (int fd, /* {{{ */
678     char *buffer, size_t buffer_size)
679 {
680   int status;
681   char **help_text;
682   size_t help_text_len;
683   char *command;
684   size_t i;
685
686   char *help_help[] =
687   {
688     "4 Command overview\n",
689     "FLUSH <filename>\n",
690     "HELP [<command>]\n",
691     "UPDATE <filename> <values> [<values> ...]\n",
692     "STATS\n"
693   };
694   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
695
696   char *help_flush[] =
697   {
698     "4 Help for FLUSH\n",
699     "Usage: FLUSH <filename>\n",
700     "\n",
701     "Adds the given filename to the head of the update queue and returns\n",
702     "after is has been dequeued.\n"
703   };
704   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
705
706   char *help_update[] =
707   {
708     "9 Help for UPDATE\n",
709     "Usage: UPDATE <filename> <values> [<values> ...]\n"
710     "\n",
711     "Adds the given file to the internal cache if it is not yet known and\n",
712     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
713     "for details.\n",
714     "\n",
715     "Each <values> has the following form:\n",
716     "  <values> = <time>:<value>[:<value>[...]]\n",
717     "See the rrdupdate(1) manpage for details.\n"
718   };
719   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
720
721   char *help_stats[] =
722   {
723     "4 Help for STATS\n",
724     "Usage: STATS\n",
725     "\n",
726     "Returns some performance counters, see the rrdcached(1) manpage for\n",
727     "a description of the values.\n"
728   };
729   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
730
731   status = buffer_get_field (&buffer, &buffer_size, &command);
732   if (status != 0)
733   {
734     help_text = help_help;
735     help_text_len = help_help_len;
736   }
737   else
738   {
739     if (strcasecmp (command, "update") == 0)
740     {
741       help_text = help_update;
742       help_text_len = help_update_len;
743     }
744     else if (strcasecmp (command, "flush") == 0)
745     {
746       help_text = help_flush;
747       help_text_len = help_flush_len;
748     }
749     else if (strcasecmp (command, "stats") == 0)
750     {
751       help_text = help_stats;
752       help_text_len = help_stats_len;
753     }
754     else
755     {
756       help_text = help_help;
757       help_text_len = help_help_len;
758     }
759   }
760
761   for (i = 0; i < help_text_len; i++)
762   {
763     status = swrite (fd, help_text[i], strlen (help_text[i]));
764     if (status < 0)
765     {
766       status = errno;
767       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
768       return (status);
769     }
770   }
771
772   return (0);
773 } /* }}} int handle_request_help */
774
775 static int handle_request_stats (int fd, /* {{{ */
776     char *buffer __attribute__((unused)),
777     size_t buffer_size __attribute__((unused)))
778 {
779   int status;
780   char outbuf[4096];
781
782   uint64_t copy_queue_length;
783   uint64_t copy_updates_written;
784   uint64_t copy_data_sets_written;
785
786   uint64_t tree_nodes_number;
787   uint64_t tree_depth;
788
789   pthread_mutex_lock (&stats_lock);
790   copy_queue_length       = stats_queue_length;
791   copy_updates_written    = stats_updates_written;
792   copy_data_sets_written  = stats_data_sets_written;
793   pthread_mutex_unlock (&stats_lock);
794
795   pthread_mutex_lock (&cache_lock);
796   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
797   tree_depth        = (uint64_t) g_tree_height (cache_tree);
798   pthread_mutex_unlock (&cache_lock);
799
800 #define RRDD_STATS_SEND \
801   outbuf[sizeof (outbuf) - 1] = 0; \
802   status = swrite (fd, outbuf, strlen (outbuf)); \
803   if (status < 0) \
804   { \
805     status = errno; \
806     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
807     return (status); \
808   }
809
810   strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf));
811   RRDD_STATS_SEND;
812
813   snprintf (outbuf, sizeof (outbuf),
814       "QueueLength: %"PRIu64"\n", copy_queue_length);
815   RRDD_STATS_SEND;
816
817   snprintf (outbuf, sizeof (outbuf),
818       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
819   RRDD_STATS_SEND;
820
821   snprintf (outbuf, sizeof (outbuf),
822       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
823   RRDD_STATS_SEND;
824
825   snprintf (outbuf, sizeof (outbuf),
826       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
827   RRDD_STATS_SEND;
828
829   snprintf (outbuf, sizeof (outbuf),
830       "TreeDepth: %"PRIu64"\n", tree_depth);
831   RRDD_STATS_SEND;
832
833   return (0);
834 #undef RRDD_STATS_SEND
835 } /* }}} int handle_request_stats */
836
837 static int handle_request_flush (int fd, /* {{{ */
838     char *buffer, size_t buffer_size)
839 {
840   char *file;
841   int status;
842   char result[4096];
843
844   status = buffer_get_field (&buffer, &buffer_size, &file);
845   if (status != 0)
846   {
847     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
848   }
849   else
850   {
851     status = flush_file (file);
852     if (status == 0)
853       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
854     else if (status == ENOENT)
855       snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
856     else if (status < 0)
857       strncpy (result, "-1 Internal error.\n", sizeof (result));
858     else
859       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
860   }
861   result[sizeof (result) - 1] = 0;
862
863   status = swrite (fd, result, strlen (result));
864   if (status < 0)
865   {
866     status = errno;
867     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
868     return (status);
869   }
870
871   return (0);
872 } /* }}} int handle_request_flush */
873
874 static int handle_request_update (int fd, /* {{{ */
875     char *buffer, size_t buffer_size)
876 {
877   char *file;
878   int values_num = 0;
879   int status;
880
881   time_t now;
882
883   cache_item_t *ci;
884   char answer[4096];
885
886 #define RRDD_UPDATE_SEND \
887   answer[sizeof (answer) - 1] = 0; \
888   status = swrite (fd, answer, strlen (answer)); \
889   if (status < 0) \
890   { \
891     status = errno; \
892     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
893     return (status); \
894   }
895
896   now = time (NULL);
897
898   status = buffer_get_field (&buffer, &buffer_size, &file);
899   if (status != 0)
900   {
901     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
902         sizeof (answer));
903     RRDD_UPDATE_SEND;
904     return (0);
905   }
906
907   pthread_mutex_lock (&cache_lock);
908
909   ci = g_tree_lookup (cache_tree, file);
910   if (ci == NULL) /* {{{ */
911   {
912     struct stat statbuf;
913
914     memset (&statbuf, 0, sizeof (statbuf));
915     status = stat (file, &statbuf);
916     if (status != 0)
917     {
918       pthread_mutex_unlock (&cache_lock);
919       RRDD_LOG (LOG_ERR, "handle_request_update: stat (%s) failed.", file);
920
921       status = errno;
922       if (status == ENOENT)
923         snprintf (answer, sizeof (answer), "-1 No such file: %s", file);
924       else
925         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
926             status);
927       RRDD_UPDATE_SEND;
928       return (0);
929     }
930     if (!S_ISREG (statbuf.st_mode))
931     {
932       pthread_mutex_unlock (&cache_lock);
933
934       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s", file);
935       RRDD_UPDATE_SEND;
936       return (0);
937     }
938
939     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
940     if (ci == NULL)
941     {
942       pthread_mutex_unlock (&cache_lock);
943       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
944
945       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
946       RRDD_UPDATE_SEND;
947       return (0);
948     }
949     memset (ci, 0, sizeof (cache_item_t));
950
951     ci->file = strdup (file);
952     if (ci->file == NULL)
953     {
954       pthread_mutex_unlock (&cache_lock);
955       free (ci);
956       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
957
958       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
959       RRDD_UPDATE_SEND;
960       return (0);
961     }
962
963     ci->values = NULL;
964     ci->values_num = 0;
965     ci->last_flush_time = now;
966     ci->flags = CI_FLAGS_IN_TREE;
967
968     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
969   } /* }}} */
970   assert (ci != NULL);
971
972   while (buffer_size > 0)
973   {
974     char **temp;
975     char *value;
976
977     status = buffer_get_field (&buffer, &buffer_size, &value);
978     if (status != 0)
979     {
980       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
981       break;
982     }
983
984     temp = (char **) realloc (ci->values,
985         sizeof (char *) * (ci->values_num + 1));
986     if (temp == NULL)
987     {
988       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
989       continue;
990     }
991     ci->values = temp;
992
993     ci->values[ci->values_num] = strdup (value);
994     if (ci->values[ci->values_num] == NULL)
995     {
996       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
997       continue;
998     }
999     ci->values_num++;
1000
1001     values_num++;
1002   }
1003
1004   if (((now - ci->last_flush_time) >= config_write_interval)
1005       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1006       && (ci->values_num > 0))
1007   {
1008     enqueue_cache_item (ci, TAIL);
1009     pthread_cond_signal (&cache_cond);
1010   }
1011
1012   pthread_mutex_unlock (&cache_lock);
1013
1014   if (values_num < 1)
1015   {
1016     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1017   }
1018   else
1019   {
1020     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1021         (values_num == 1) ? "" : "s");
1022   }
1023   RRDD_UPDATE_SEND;
1024   return (0);
1025 #undef RRDD_UPDATE_SEND
1026 } /* }}} int handle_request_update */
1027
1028 static int handle_request (int fd) /* {{{ */
1029 {
1030   char buffer[4096];
1031   size_t buffer_size;
1032   char *buffer_ptr;
1033   char *command;
1034   int status;
1035
1036   status = (int) sread (fd, buffer, sizeof (buffer));
1037   if (status == 0)
1038   {
1039     return (1);
1040   }
1041   else if (status < 0)
1042   {
1043     RRDD_LOG (LOG_ERR, "handle_request: sread failed.");
1044     return (-1);
1045   }
1046   buffer_size = (size_t) status;
1047   assert (buffer_size <= sizeof (buffer));
1048   assert (buffer[buffer_size - 1] == 0);
1049
1050   /* Place the normal field separator at the end to simplify
1051    * `buffer_get_field's work. */
1052   buffer[buffer_size - 1] = ' ';
1053
1054   buffer_ptr = buffer;
1055   command = NULL;
1056   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1057   if (status != 0)
1058   {
1059     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1060     return (-1);
1061   }
1062
1063   if (strcasecmp (command, "update") == 0)
1064   {
1065     return (handle_request_update (fd, buffer_ptr, buffer_size));
1066   }
1067   else if (strcasecmp (command, "flush") == 0)
1068   {
1069     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1070   }
1071   else if (strcasecmp (command, "stats") == 0)
1072   {
1073     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1074   }
1075   else if (strcasecmp (command, "help") == 0)
1076   {
1077     return (handle_request_help (fd, buffer_ptr, buffer_size));
1078   }
1079   else
1080   {
1081     char result[4096];
1082
1083     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1084     result[sizeof (result) - 1] = 0;
1085
1086     status = swrite (fd, result, strlen (result));
1087     if (status < 0)
1088     {
1089       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1090       return (-1);
1091     }
1092   }
1093
1094   return (0);
1095 } /* }}} int handle_request */
1096
1097 static void *connection_thread_main (void *args /* {{{ */
1098     __attribute__((unused)))
1099 {
1100   pthread_t self;
1101   int i;
1102   int fd;
1103   
1104   fd = *((int *) args);
1105
1106   pthread_mutex_lock (&connetion_threads_lock);
1107   {
1108     pthread_t *temp;
1109
1110     temp = (pthread_t *) realloc (connetion_threads,
1111         sizeof (pthread_t) * (connetion_threads_num + 1));
1112     if (temp == NULL)
1113     {
1114       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1115     }
1116     else
1117     {
1118       connetion_threads = temp;
1119       connetion_threads[connetion_threads_num] = pthread_self ();
1120       connetion_threads_num++;
1121     }
1122   }
1123   pthread_mutex_unlock (&connetion_threads_lock);
1124
1125   while (do_shutdown == 0)
1126   {
1127     struct pollfd pollfd;
1128     int status;
1129
1130     pollfd.fd = fd;
1131     pollfd.events = POLLIN | POLLPRI;
1132     pollfd.revents = 0;
1133
1134     status = poll (&pollfd, 1, /* timeout = */ 500);
1135     if (status == 0) /* timeout */
1136       continue;
1137     else if (status < 0) /* error */
1138     {
1139       status = errno;
1140       if (status == EINTR)
1141         continue;
1142       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1143       continue;
1144     }
1145
1146     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1147     {
1148       close (fd);
1149       break;
1150     }
1151     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1152     {
1153       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1154           "poll(2) returned something unexpected: %#04hx",
1155           pollfd.revents);
1156       close (fd);
1157       break;
1158     }
1159
1160     status = handle_request (fd);
1161     if (status != 0)
1162     {
1163       close (fd);
1164       break;
1165     }
1166   }
1167
1168   self = pthread_self ();
1169   /* Remove this thread from the connection threads list */
1170   pthread_mutex_lock (&connetion_threads_lock);
1171   /* Find out own index in the array */
1172   for (i = 0; i < connetion_threads_num; i++)
1173     if (pthread_equal (connetion_threads[i], self) != 0)
1174       break;
1175   assert (i < connetion_threads_num);
1176
1177   /* Move the trailing threads forward. */
1178   if (i < (connetion_threads_num - 1))
1179   {
1180     memmove (connetion_threads + i,
1181         connetion_threads + i + 1,
1182         sizeof (pthread_t) * (connetion_threads_num - i - 1));
1183   }
1184
1185   connetion_threads_num--;
1186   pthread_mutex_unlock (&connetion_threads_lock);
1187
1188   free (args);
1189   return (NULL);
1190 } /* }}} void *connection_thread_main */
1191
1192 static int open_listen_socket_unix (const char *path) /* {{{ */
1193 {
1194   int fd;
1195   struct sockaddr_un sa;
1196   listen_socket_t *temp;
1197   int status;
1198
1199   temp = (listen_socket_t *) realloc (listen_fds,
1200       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1201   if (temp == NULL)
1202   {
1203     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1204     return (-1);
1205   }
1206   listen_fds = temp;
1207   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1208
1209   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1210   if (fd < 0)
1211   {
1212     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1213     return (-1);
1214   }
1215
1216   memset (&sa, 0, sizeof (sa));
1217   sa.sun_family = AF_UNIX;
1218   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1219
1220   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1221   if (status != 0)
1222   {
1223     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1224     close (fd);
1225     unlink (path);
1226     return (-1);
1227   }
1228
1229   status = listen (fd, /* backlog = */ 10);
1230   if (status != 0)
1231   {
1232     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1233     close (fd);
1234     unlink (path);
1235     return (-1);
1236   }
1237   
1238   listen_fds[listen_fds_num].fd = fd;
1239   snprintf (listen_fds[listen_fds_num].path,
1240       sizeof (listen_fds[listen_fds_num].path) - 1,
1241       "unix:%s", path);
1242   listen_fds_num++;
1243
1244   return (0);
1245 } /* }}} int open_listen_socket_unix */
1246
1247 static int open_listen_socket (const char *addr) /* {{{ */
1248 {
1249   struct addrinfo ai_hints;
1250   struct addrinfo *ai_res;
1251   struct addrinfo *ai_ptr;
1252   int status;
1253
1254   assert (addr != NULL);
1255
1256   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1257     return (open_listen_socket_unix (addr + strlen ("unix:")));
1258   else if (addr[0] == '/')
1259     return (open_listen_socket_unix (addr));
1260
1261   memset (&ai_hints, 0, sizeof (ai_hints));
1262   ai_hints.ai_flags = 0;
1263 #ifdef AI_ADDRCONFIG
1264   ai_hints.ai_flags |= AI_ADDRCONFIG;
1265 #endif
1266   ai_hints.ai_family = AF_UNSPEC;
1267   ai_hints.ai_socktype = SOCK_STREAM;
1268
1269   ai_res = NULL;
1270   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
1271   if (status != 0)
1272   {
1273     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1274         "%s", addr, gai_strerror (status));
1275     return (-1);
1276   }
1277
1278   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1279   {
1280     int fd;
1281     listen_socket_t *temp;
1282
1283     temp = (listen_socket_t *) realloc (listen_fds,
1284         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1285     if (temp == NULL)
1286     {
1287       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1288       continue;
1289     }
1290     listen_fds = temp;
1291     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1292
1293     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1294     if (fd < 0)
1295     {
1296       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1297       continue;
1298     }
1299
1300     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1301     if (status != 0)
1302     {
1303       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1304       close (fd);
1305       continue;
1306     }
1307
1308     status = listen (fd, /* backlog = */ 10);
1309     if (status != 0)
1310     {
1311       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1312       close (fd);
1313       return (-1);
1314     }
1315
1316     listen_fds[listen_fds_num].fd = fd;
1317     strncpy (listen_fds[listen_fds_num].path, addr,
1318         sizeof (listen_fds[listen_fds_num].path) - 1);
1319     listen_fds_num++;
1320   } /* for (ai_ptr) */
1321
1322   return (0);
1323 } /* }}} int open_listen_socket */
1324
1325 static int close_listen_sockets (void) /* {{{ */
1326 {
1327   size_t i;
1328
1329   for (i = 0; i < listen_fds_num; i++)
1330   {
1331     close (listen_fds[i].fd);
1332     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1333       unlink (listen_fds[i].path + strlen ("unix:"));
1334   }
1335
1336   free (listen_fds);
1337   listen_fds = NULL;
1338   listen_fds_num = 0;
1339
1340   return (0);
1341 } /* }}} int close_listen_sockets */
1342
1343 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1344 {
1345   struct pollfd *pollfds;
1346   int pollfds_num;
1347   int status;
1348   int i;
1349
1350   for (i = 0; i < config_listen_address_list_len; i++)
1351     open_listen_socket (config_listen_address_list[i]);
1352
1353   if (config_listen_address_list_len < 1)
1354     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1355
1356   if (listen_fds_num < 1)
1357   {
1358     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1359         "could be opened. Sorry.");
1360     return (NULL);
1361   }
1362
1363   pollfds_num = listen_fds_num;
1364   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1365   if (pollfds == NULL)
1366   {
1367     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1368     return (NULL);
1369   }
1370   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1371
1372   while (do_shutdown == 0)
1373   {
1374     assert (pollfds_num == ((int) listen_fds_num));
1375     for (i = 0; i < pollfds_num; i++)
1376     {
1377       pollfds[i].fd = listen_fds[i].fd;
1378       pollfds[i].events = POLLIN | POLLPRI;
1379       pollfds[i].revents = 0;
1380     }
1381
1382     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1383     if (status < 1)
1384     {
1385       status = errno;
1386       if (status != EINTR)
1387       {
1388         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1389       }
1390       continue;
1391     }
1392
1393     for (i = 0; i < pollfds_num; i++)
1394     {
1395       int *client_sd;
1396       struct sockaddr_storage client_sa;
1397       socklen_t client_sa_size;
1398       pthread_t tid;
1399       pthread_attr_t attr;
1400
1401       if (pollfds[i].revents == 0)
1402         continue;
1403
1404       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1405       {
1406         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1407             "poll(2) returned something unexpected for listen FD #%i.",
1408             pollfds[i].fd);
1409         continue;
1410       }
1411
1412       client_sd = (int *) malloc (sizeof (int));
1413       if (client_sd == NULL)
1414       {
1415         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1416         continue;
1417       }
1418
1419       client_sa_size = sizeof (client_sa);
1420       *client_sd = accept (pollfds[i].fd,
1421           (struct sockaddr *) &client_sa, &client_sa_size);
1422       if (*client_sd < 0)
1423       {
1424         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1425         continue;
1426       }
1427
1428       pthread_attr_init (&attr);
1429       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1430
1431       status = pthread_create (&tid, &attr, connection_thread_main,
1432           /* args = */ (void *) client_sd);
1433       if (status != 0)
1434       {
1435         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1436         close (*client_sd);
1437         free (client_sd);
1438         continue;
1439       }
1440     } /* for (pollfds_num) */
1441   } /* while (do_shutdown == 0) */
1442
1443   close_listen_sockets ();
1444
1445   pthread_mutex_lock (&connetion_threads_lock);
1446   while (connetion_threads_num > 0)
1447   {
1448     pthread_t wait_for;
1449
1450     wait_for = connetion_threads[0];
1451
1452     pthread_mutex_unlock (&connetion_threads_lock);
1453     pthread_join (wait_for, /* retval = */ NULL);
1454     pthread_mutex_lock (&connetion_threads_lock);
1455   }
1456   pthread_mutex_unlock (&connetion_threads_lock);
1457
1458   return (NULL);
1459 } /* }}} void *listen_thread_main */
1460
1461 static int daemonize (void) /* {{{ */
1462 {
1463   pid_t child;
1464   int status;
1465   char *base_dir;
1466
1467   /* These structures are static, because `sigaction' behaves weird if the are
1468    * overwritten.. */
1469   static struct sigaction sa_int;
1470   static struct sigaction sa_term;
1471   static struct sigaction sa_pipe;
1472
1473   child = fork ();
1474   if (child < 0)
1475   {
1476     fprintf (stderr, "daemonize: fork(2) failed.\n");
1477     return (-1);
1478   }
1479   else if (child > 0)
1480   {
1481     return (1);
1482   }
1483
1484   /* Change into the /tmp directory. */
1485   base_dir = (config_base_dir != NULL)
1486     ? config_base_dir
1487     : "/tmp";
1488   status = chdir (base_dir);
1489   if (status != 0)
1490   {
1491     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1492     return (-1);
1493   }
1494
1495   /* Become session leader */
1496   setsid ();
1497
1498   /* Open the first three file descriptors to /dev/null */
1499   close (2);
1500   close (1);
1501   close (0);
1502
1503   open ("/dev/null", O_RDWR);
1504   dup (0);
1505   dup (0);
1506
1507   /* Install signal handlers */
1508   memset (&sa_int, 0, sizeof (sa_int));
1509   sa_int.sa_handler = sig_int_handler;
1510   sigaction (SIGINT, &sa_int, NULL);
1511
1512   memset (&sa_term, 0, sizeof (sa_term));
1513   sa_term.sa_handler = sig_term_handler;
1514   sigaction (SIGTERM, &sa_term, NULL);
1515
1516   memset (&sa_pipe, 0, sizeof (sa_pipe));
1517   sa_pipe.sa_handler = SIG_IGN;
1518   sigaction (SIGPIPE, &sa_pipe, NULL);
1519
1520   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1521
1522   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1523   if (cache_tree == NULL)
1524   {
1525     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1526     return (-1);
1527   }
1528
1529   memset (&queue_thread, 0, sizeof (queue_thread));
1530   status = pthread_create (&queue_thread, /* attr = */ NULL,
1531       queue_thread_main, /* args = */ NULL);
1532   if (status != 0)
1533   {
1534     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
1535     return (-1);
1536   }
1537
1538   write_pidfile ();
1539
1540   return (0);
1541 } /* }}} int daemonize */
1542
1543 static int cleanup (void) /* {{{ */
1544 {
1545   do_shutdown++;
1546
1547   pthread_cond_signal (&cache_cond);
1548   pthread_join (queue_thread, /* return = */ NULL);
1549
1550   remove_pidfile ();
1551
1552   closelog ();
1553
1554   return (0);
1555 } /* }}} int cleanup */
1556
1557 static int read_options (int argc, char **argv) /* {{{ */
1558 {
1559   int option;
1560   int status = 0;
1561
1562   while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1)
1563   {
1564     switch (option)
1565     {
1566       case 'l':
1567       {
1568         char **temp;
1569
1570         temp = (char **) realloc (config_listen_address_list,
1571             sizeof (char *) * (config_listen_address_list_len + 1));
1572         if (temp == NULL)
1573         {
1574           fprintf (stderr, "read_options: realloc failed.\n");
1575           return (2);
1576         }
1577         config_listen_address_list = temp;
1578
1579         temp[config_listen_address_list_len] = strdup (optarg);
1580         if (temp[config_listen_address_list_len] == NULL)
1581         {
1582           fprintf (stderr, "read_options: strdup failed.\n");
1583           return (2);
1584         }
1585         config_listen_address_list_len++;
1586       }
1587       break;
1588
1589       case 'f':
1590       {
1591         int temp;
1592
1593         temp = atoi (optarg);
1594         if (temp > 0)
1595           config_flush_interval = temp;
1596         else
1597         {
1598           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1599           status = 3;
1600         }
1601       }
1602       break;
1603
1604       case 'w':
1605       {
1606         int temp;
1607
1608         temp = atoi (optarg);
1609         if (temp > 0)
1610           config_write_interval = temp;
1611         else
1612         {
1613           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1614           status = 2;
1615         }
1616       }
1617       break;
1618
1619       case 'b':
1620       {
1621         size_t len;
1622
1623         if (config_base_dir != NULL)
1624           free (config_base_dir);
1625         config_base_dir = strdup (optarg);
1626         if (config_base_dir == NULL)
1627         {
1628           fprintf (stderr, "read_options: strdup failed.\n");
1629           return (3);
1630         }
1631
1632         len = strlen (config_base_dir);
1633         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1634         {
1635           config_base_dir[len - 1] = 0;
1636           len--;
1637         }
1638
1639         if (len < 1)
1640         {
1641           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1642           return (4);
1643         }
1644       }
1645       break;
1646
1647       case 'p':
1648       {
1649         if (config_pid_file != NULL)
1650           free (config_pid_file);
1651         config_pid_file = strdup (optarg);
1652         if (config_pid_file == NULL)
1653         {
1654           fprintf (stderr, "read_options: strdup failed.\n");
1655           return (3);
1656         }
1657       }
1658       break;
1659
1660       case 'h':
1661       case '?':
1662         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1663             "\n"
1664             "Usage: rrdcached [options]\n"
1665             "\n"
1666             "Valid options are:\n"
1667             "  -l <address>  Socket address to listen to.\n"
1668             "  -w <seconds>  Interval in which to write data.\n"
1669             "  -f <seconds>  Interval in which to flush dead data.\n"
1670             "  -p <file>     Location of the PID-file.\n"
1671             "  -b <dir>      Base directory to change to.\n"
1672             "\n"
1673             "For more information and a detailed description of all options "
1674             "please refer\n"
1675             "to the rrdcached(1) manual page.\n",
1676             VERSION);
1677         status = -1;
1678         break;
1679     } /* switch (option) */
1680   } /* while (getopt) */
1681
1682   return (status);
1683 } /* }}} int read_options */
1684
1685 int main (int argc, char **argv)
1686 {
1687   int status;
1688
1689   status = read_options (argc, argv);
1690   if (status != 0)
1691   {
1692     if (status < 0)
1693       status = 0;
1694     return (status);
1695   }
1696
1697   status = daemonize ();
1698   if (status == 1)
1699   {
1700     struct sigaction sigchld;
1701
1702     memset (&sigchld, 0, sizeof (sigchld));
1703     sigchld.sa_handler = SIG_IGN;
1704     sigaction (SIGCHLD, &sigchld, NULL);
1705
1706     return (0);
1707   }
1708   else if (status != 0)
1709   {
1710     fprintf (stderr, "daemonize failed, exiting.\n");
1711     return (1);
1712   }
1713
1714   listen_thread_main (NULL);
1715
1716   cleanup ();
1717
1718   return (0);
1719 } /* int main */
1720
1721 /*
1722  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1723  */