This fixes a couple problems when exiting due to signal:
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 struct listen_socket_s
105 {
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
110
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122
123   cache_item_t *next;
124 };
125
126 struct callback_flush_data_s
127 {
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
134
135 enum queue_side_e
136 {
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
141
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
144
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
149
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
152
153 static int do_shutdown = 0;
154
155 static pthread_t queue_thread;
156
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
160
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
167
168 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
169
170 static int config_write_interval = 300;
171 static int config_write_jitter   = 0;
172 static int config_flush_interval = 3600;
173 static char *config_pid_file = NULL;
174 static char *config_base_dir = NULL;
175
176 static char **config_listen_address_list = NULL;
177 static int config_listen_address_list_len = 0;
178
179 static uint64_t stats_queue_length = 0;
180 static uint64_t stats_updates_received = 0;
181 static uint64_t stats_flush_received = 0;
182 static uint64_t stats_updates_written = 0;
183 static uint64_t stats_data_sets_written = 0;
184 static uint64_t stats_journal_bytes = 0;
185 static uint64_t stats_journal_rotate = 0;
186 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
187
188 /* Journaled updates */
189 static char *journal_cur = NULL;
190 static char *journal_old = NULL;
191 static FILE *journal_fh = NULL;
192 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
193 static int journal_write(char *cmd, char *args);
194 static void journal_done(void);
195 static void journal_rotate(void);
196
197 /* 
198  * Functions
199  */
200 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
201 {
202   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
203   do_shutdown++;
204   pthread_cond_broadcast(&cache_cond);
205 } /* }}} void sig_int_handler */
206
207 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
208 {
209   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
210   do_shutdown++;
211   pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_term_handler */
213
214 static int write_pidfile (void) /* {{{ */
215 {
216   pid_t pid;
217   char *file;
218   int fd;
219   FILE *fh;
220
221   pid = getpid ();
222   
223   file = (config_pid_file != NULL)
224     ? config_pid_file
225     : LOCALSTATEDIR "/run/rrdcached.pid";
226
227   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
228   if (fd < 0)
229   {
230     RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
231              file, rrd_strerror(errno));
232     return (-1);
233   }
234
235   fh = fdopen (fd, "w");
236   if (fh == NULL)
237   {
238     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
239     close(fd);
240     return (-1);
241   }
242
243   fprintf (fh, "%i\n", (int) pid);
244   fclose (fh);
245
246   return (0);
247 } /* }}} int write_pidfile */
248
249 static int remove_pidfile (void) /* {{{ */
250 {
251   char *file;
252   int status;
253
254   file = (config_pid_file != NULL)
255     ? config_pid_file
256     : LOCALSTATEDIR "/run/rrdcached.pid";
257
258   status = unlink (file);
259   if (status == 0)
260     return (0);
261   return (errno);
262 } /* }}} int remove_pidfile */
263
264 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
265 {
266   char    *buffer;
267   size_t   buffer_used;
268   size_t   buffer_free;
269   ssize_t  status;
270
271   buffer       = (char *) buffer_void;
272   buffer_used  = 0;
273   buffer_free  = buffer_size;
274
275   while (buffer_free > 0)
276   {
277     status = read (fd, buffer + buffer_used, buffer_free);
278     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
279       continue;
280
281     if (status < 0)
282       return (-1);
283
284     if (status == 0)
285       return (0);
286
287     assert ((0 > status) || (buffer_free >= (size_t) status));
288
289     buffer_free = buffer_free - status;
290     buffer_used = buffer_used + status;
291
292     if (buffer[buffer_used - 1] == '\n')
293       break;
294   }
295
296   assert (buffer_used > 0);
297
298   if (buffer[buffer_used - 1] != '\n')
299   {
300     errno = ENOBUFS;
301     return (-1);
302   }
303
304   buffer[buffer_used - 1] = 0;
305
306   /* Fix network line endings. */
307   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
308   {
309     buffer_used--;
310     buffer[buffer_used - 1] = 0;
311   }
312
313   return (buffer_used);
314 } /* }}} ssize_t sread */
315
316 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
317 {
318   const char *ptr;
319   size_t      nleft;
320   ssize_t     status;
321
322   /* special case for journal replay */
323   if (fd < 0) return 0;
324
325   ptr   = (const char *) buf;
326   nleft = count;
327
328   while (nleft > 0)
329   {
330     status = write (fd, (const void *) ptr, nleft);
331
332     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
333       continue;
334
335     if (status < 0)
336       return (status);
337
338     nleft -= status;
339     ptr   += status;
340   }
341
342   return (0);
343 } /* }}} ssize_t swrite */
344
345 static void _wipe_ci_values(cache_item_t *ci, time_t when)
346 {
347   ci->values = NULL;
348   ci->values_num = 0;
349
350   ci->last_flush_time = when;
351   if (config_write_jitter > 0)
352     ci->last_flush_time += (random() % config_write_jitter);
353
354   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
355 }
356
357 /*
358  * enqueue_cache_item:
359  * `cache_lock' must be acquired before calling this function!
360  */
361 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
362     queue_side_t side)
363 {
364   int did_insert = 0;
365
366   if (ci == NULL)
367     return (-1);
368
369   if (ci->values_num == 0)
370     return (0);
371
372   if (side == HEAD)
373   {
374     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
375     {
376       assert (ci->next == NULL);
377       ci->next = cache_queue_head;
378       cache_queue_head = ci;
379
380       if (cache_queue_tail == NULL)
381         cache_queue_tail = cache_queue_head;
382
383       did_insert = 1;
384     }
385     else if (cache_queue_head == ci)
386     {
387       /* do nothing */
388     }
389     else /* enqueued, but not first entry */
390     {
391       cache_item_t *prev;
392
393       /* find previous entry */
394       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
395         if (prev->next == ci)
396           break;
397       assert (prev != NULL);
398
399       /* move to the front */
400       prev->next = ci->next;
401       ci->next = cache_queue_head;
402       cache_queue_head = ci;
403
404       /* check if we need to adapt the tail */
405       if (cache_queue_tail == ci)
406         cache_queue_tail = prev;
407     }
408   }
409   else /* (side == TAIL) */
410   {
411     /* We don't move values back in the list.. */
412     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
413       return (0);
414
415     assert (ci->next == NULL);
416
417     if (cache_queue_tail == NULL)
418       cache_queue_head = ci;
419     else
420       cache_queue_tail->next = ci;
421     cache_queue_tail = ci;
422
423     did_insert = 1;
424   }
425
426   ci->flags |= CI_FLAGS_IN_QUEUE;
427
428   if (did_insert)
429   {
430     pthread_mutex_lock (&stats_lock);
431     stats_queue_length++;
432     pthread_mutex_unlock (&stats_lock);
433   }
434
435   return (0);
436 } /* }}} int enqueue_cache_item */
437
438 /*
439  * tree_callback_flush:
440  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
441  * while this is in progress.
442  */
443 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
444     gpointer data)
445 {
446   cache_item_t *ci;
447   callback_flush_data_t *cfd;
448
449   ci = (cache_item_t *) value;
450   cfd = (callback_flush_data_t *) data;
451
452   if ((ci->last_flush_time <= cfd->abs_timeout)
453       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
454       && (ci->values_num > 0))
455   {
456     enqueue_cache_item (ci, TAIL);
457   }
458   else if ((do_shutdown != 0)
459       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
460       && (ci->values_num > 0))
461   {
462     enqueue_cache_item (ci, TAIL);
463   }
464   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
465       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
466       && (ci->values_num <= 0))
467   {
468     char **temp;
469
470     temp = (char **) realloc (cfd->keys,
471         sizeof (char *) * (cfd->keys_num + 1));
472     if (temp == NULL)
473     {
474       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
475       return (FALSE);
476     }
477     cfd->keys = temp;
478     /* Make really sure this points to the _same_ place */
479     assert ((char *) key == ci->file);
480     cfd->keys[cfd->keys_num] = (char *) key;
481     cfd->keys_num++;
482   }
483
484   return (FALSE);
485 } /* }}} gboolean tree_callback_flush */
486
487 static int flush_old_values (int max_age)
488 {
489   callback_flush_data_t cfd;
490   size_t k;
491
492   memset (&cfd, 0, sizeof (cfd));
493   /* Pass the current time as user data so that we don't need to call
494    * `time' for each node. */
495   cfd.now = time (NULL);
496   cfd.keys = NULL;
497   cfd.keys_num = 0;
498
499   if (max_age > 0)
500     cfd.abs_timeout = cfd.now - max_age;
501   else
502     cfd.abs_timeout = cfd.now + 1;
503
504   /* `tree_callback_flush' will return the keys of all values that haven't
505    * been touched in the last `config_flush_interval' seconds in `cfd'.
506    * The char*'s in this array point to the same memory as ci->file, so we
507    * don't need to free them separately. */
508   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
509
510   for (k = 0; k < cfd.keys_num; k++)
511   {
512     cache_item_t *ci;
513
514     /* This must not fail. */
515     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
516     assert (ci != NULL);
517
518     /* If we end up here with values available, something's seriously
519      * messed up. */
520     assert (ci->values_num == 0);
521
522     /* Remove the node from the tree */
523     g_tree_remove (cache_tree, cfd.keys[k]);
524     cfd.keys[k] = NULL;
525
526     /* Now free and clean up `ci'. */
527     free (ci->file);
528     ci->file = NULL;
529     free (ci);
530     ci = NULL;
531   } /* for (k = 0; k < cfd.keys_num; k++) */
532
533   if (cfd.keys != NULL)
534   {
535     free (cfd.keys);
536     cfd.keys = NULL;
537   }
538
539   return (0);
540 } /* int flush_old_values */
541
542 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
543 {
544   struct timeval now;
545   struct timespec next_flush;
546
547   gettimeofday (&now, NULL);
548   next_flush.tv_sec = now.tv_sec + config_flush_interval;
549   next_flush.tv_nsec = 1000 * now.tv_usec;
550
551   pthread_mutex_lock (&cache_lock);
552   while ((do_shutdown == 0) || (cache_queue_head != NULL))
553   {
554     cache_item_t *ci;
555     char *file;
556     char **values;
557     int values_num;
558     int status;
559     int i;
560
561     /* First, check if it's time to do the cache flush. */
562     gettimeofday (&now, NULL);
563     if ((now.tv_sec > next_flush.tv_sec)
564         || ((now.tv_sec == next_flush.tv_sec)
565           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
566     {
567       /* Flush all values that haven't been written in the last
568        * `config_write_interval' seconds. */
569       flush_old_values (config_write_interval);
570
571       /* Determine the time of the next cache flush. */
572       while (next_flush.tv_sec <= now.tv_sec)
573         next_flush.tv_sec += config_flush_interval;
574
575       /* unlock the cache while we rotate so we don't block incoming
576        * updates if the fsync() blocks on disk I/O */
577       pthread_mutex_unlock(&cache_lock);
578       journal_rotate();
579       pthread_mutex_lock(&cache_lock);
580     }
581
582     /* Now, check if there's something to store away. If not, wait until
583      * something comes in or it's time to do the cache flush. */
584     if (cache_queue_head == NULL)
585     {
586       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
587       if ((status != 0) && (status != ETIMEDOUT))
588       {
589         RRDD_LOG (LOG_ERR, "queue_thread_main: "
590             "pthread_cond_timedwait returned %i.", status);
591       }
592     }
593
594     /* We're about to shut down, so lets flush the entire tree. */
595     if ((do_shutdown != 0) && (cache_queue_head == NULL))
596       flush_old_values (/* max age = */ -1);
597
598     /* Check if a value has arrived. This may be NULL if we timed out or there
599      * was an interrupt such as a signal. */
600     if (cache_queue_head == NULL)
601       continue;
602
603     ci = cache_queue_head;
604
605     /* copy the relevant parts */
606     file = strdup (ci->file);
607     if (file == NULL)
608     {
609       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
610       continue;
611     }
612
613     assert(ci->values != NULL);
614     assert(ci->values_num > 0);
615
616     values = ci->values;
617     values_num = ci->values_num;
618
619     _wipe_ci_values(ci, time(NULL));
620
621     cache_queue_head = ci->next;
622     if (cache_queue_head == NULL)
623       cache_queue_tail = NULL;
624     ci->next = NULL;
625
626     pthread_mutex_lock (&stats_lock);
627     assert (stats_queue_length > 0);
628     stats_queue_length--;
629     pthread_mutex_unlock (&stats_lock);
630
631     pthread_mutex_unlock (&cache_lock);
632
633     rrd_clear_error ();
634     status = rrd_update_r (file, NULL, values_num, (void *) values);
635     if (status != 0)
636     {
637       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
638           "rrd_update_r (%s) failed with status %i. (%s)",
639           file, status, rrd_get_error());
640     }
641
642     journal_write("wrote", file);
643
644     for (i = 0; i < values_num; i++)
645       free (values[i]);
646
647     free(values);
648     free(file);
649
650     if (status == 0)
651     {
652       pthread_mutex_lock (&stats_lock);
653       stats_updates_written++;
654       stats_data_sets_written += values_num;
655       pthread_mutex_unlock (&stats_lock);
656     }
657
658     pthread_mutex_lock (&cache_lock);
659     pthread_cond_broadcast (&flush_cond);
660
661     /* We're about to shut down, so lets flush the entire tree. */
662     if ((do_shutdown != 0) && (cache_queue_head == NULL))
663       flush_old_values (/* max age = */ -1);
664   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
665   pthread_mutex_unlock (&cache_lock);
666
667   assert(cache_queue_head == NULL);
668   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
669   journal_done();
670
671   return (NULL);
672 } /* }}} void *queue_thread_main */
673
674 static int buffer_get_field (char **buffer_ret, /* {{{ */
675     size_t *buffer_size_ret, char **field_ret)
676 {
677   char *buffer;
678   size_t buffer_pos;
679   size_t buffer_size;
680   char *field;
681   size_t field_size;
682   int status;
683
684   buffer = *buffer_ret;
685   buffer_pos = 0;
686   buffer_size = *buffer_size_ret;
687   field = *buffer_ret;
688   field_size = 0;
689
690   if (buffer_size <= 0)
691     return (-1);
692
693   /* This is ensured by `handle_request'. */
694   assert (buffer[buffer_size - 1] == '\0');
695
696   status = -1;
697   while (buffer_pos < buffer_size)
698   {
699     /* Check for end-of-field or end-of-buffer */
700     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
701     {
702       field[field_size] = 0;
703       field_size++;
704       buffer_pos++;
705       status = 0;
706       break;
707     }
708     /* Handle escaped characters. */
709     else if (buffer[buffer_pos] == '\\')
710     {
711       if (buffer_pos >= (buffer_size - 1))
712         break;
713       buffer_pos++;
714       field[field_size] = buffer[buffer_pos];
715       field_size++;
716       buffer_pos++;
717     }
718     /* Normal operation */ 
719     else
720     {
721       field[field_size] = buffer[buffer_pos];
722       field_size++;
723       buffer_pos++;
724     }
725   } /* while (buffer_pos < buffer_size) */
726
727   if (status != 0)
728     return (status);
729
730   *buffer_ret = buffer + buffer_pos;
731   *buffer_size_ret = buffer_size - buffer_pos;
732   *field_ret = field;
733
734   return (0);
735 } /* }}} int buffer_get_field */
736
737 static int flush_file (const char *filename) /* {{{ */
738 {
739   cache_item_t *ci;
740
741   pthread_mutex_lock (&cache_lock);
742
743   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
744   if (ci == NULL)
745   {
746     pthread_mutex_unlock (&cache_lock);
747     return (ENOENT);
748   }
749
750   /* Enqueue at head */
751   enqueue_cache_item (ci, HEAD);
752   pthread_cond_signal (&cache_cond);
753
754   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
755   {
756     ci = NULL;
757
758     pthread_cond_wait (&flush_cond, &cache_lock);
759
760     ci = g_tree_lookup (cache_tree, filename);
761     if (ci == NULL)
762     {
763       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
764           "while waiting for flush.");
765       pthread_mutex_unlock (&cache_lock);
766       return (-1);
767     }
768   }
769
770   pthread_mutex_unlock (&cache_lock);
771   return (0);
772 } /* }}} int flush_file */
773
774 static int handle_request_help (int fd, /* {{{ */
775     char *buffer, size_t buffer_size)
776 {
777   int status;
778   char **help_text;
779   size_t help_text_len;
780   char *command;
781   size_t i;
782
783   char *help_help[] =
784   {
785     "4 Command overview\n",
786     "FLUSH <filename>\n",
787     "HELP [<command>]\n",
788     "UPDATE <filename> <values> [<values> ...]\n",
789     "STATS\n"
790   };
791   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
792
793   char *help_flush[] =
794   {
795     "4 Help for FLUSH\n",
796     "Usage: FLUSH <filename>\n",
797     "\n",
798     "Adds the given filename to the head of the update queue and returns\n",
799     "after is has been dequeued.\n"
800   };
801   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
802
803   char *help_update[] =
804   {
805     "9 Help for UPDATE\n",
806     "Usage: UPDATE <filename> <values> [<values> ...]\n"
807     "\n",
808     "Adds the given file to the internal cache if it is not yet known and\n",
809     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
810     "for details.\n",
811     "\n",
812     "Each <values> has the following form:\n",
813     "  <values> = <time>:<value>[:<value>[...]]\n",
814     "See the rrdupdate(1) manpage for details.\n"
815   };
816   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
817
818   char *help_stats[] =
819   {
820     "4 Help for STATS\n",
821     "Usage: STATS\n",
822     "\n",
823     "Returns some performance counters, see the rrdcached(1) manpage for\n",
824     "a description of the values.\n"
825   };
826   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
827
828   status = buffer_get_field (&buffer, &buffer_size, &command);
829   if (status != 0)
830   {
831     help_text = help_help;
832     help_text_len = help_help_len;
833   }
834   else
835   {
836     if (strcasecmp (command, "update") == 0)
837     {
838       help_text = help_update;
839       help_text_len = help_update_len;
840     }
841     else if (strcasecmp (command, "flush") == 0)
842     {
843       help_text = help_flush;
844       help_text_len = help_flush_len;
845     }
846     else if (strcasecmp (command, "stats") == 0)
847     {
848       help_text = help_stats;
849       help_text_len = help_stats_len;
850     }
851     else
852     {
853       help_text = help_help;
854       help_text_len = help_help_len;
855     }
856   }
857
858   for (i = 0; i < help_text_len; i++)
859   {
860     status = swrite (fd, help_text[i], strlen (help_text[i]));
861     if (status < 0)
862     {
863       status = errno;
864       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
865       return (status);
866     }
867   }
868
869   return (0);
870 } /* }}} int handle_request_help */
871
872 static int handle_request_stats (int fd, /* {{{ */
873     char *buffer __attribute__((unused)),
874     size_t buffer_size __attribute__((unused)))
875 {
876   int status;
877   char outbuf[CMD_MAX];
878
879   uint64_t copy_queue_length;
880   uint64_t copy_updates_received;
881   uint64_t copy_flush_received;
882   uint64_t copy_updates_written;
883   uint64_t copy_data_sets_written;
884   uint64_t copy_journal_bytes;
885   uint64_t copy_journal_rotate;
886
887   uint64_t tree_nodes_number;
888   uint64_t tree_depth;
889
890   pthread_mutex_lock (&stats_lock);
891   copy_queue_length       = stats_queue_length;
892   copy_updates_received   = stats_updates_received;
893   copy_flush_received     = stats_flush_received;
894   copy_updates_written    = stats_updates_written;
895   copy_data_sets_written  = stats_data_sets_written;
896   copy_journal_bytes      = stats_journal_bytes;
897   copy_journal_rotate     = stats_journal_rotate;
898   pthread_mutex_unlock (&stats_lock);
899
900   pthread_mutex_lock (&cache_lock);
901   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
902   tree_depth        = (uint64_t) g_tree_height (cache_tree);
903   pthread_mutex_unlock (&cache_lock);
904
905 #define RRDD_STATS_SEND \
906   outbuf[sizeof (outbuf) - 1] = 0; \
907   status = swrite (fd, outbuf, strlen (outbuf)); \
908   if (status < 0) \
909   { \
910     status = errno; \
911     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
912     return (status); \
913   }
914
915   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
916   RRDD_STATS_SEND;
917
918   snprintf (outbuf, sizeof (outbuf),
919       "QueueLength: %"PRIu64"\n", copy_queue_length);
920   RRDD_STATS_SEND;
921
922   snprintf (outbuf, sizeof (outbuf),
923       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
924   RRDD_STATS_SEND;
925
926   snprintf (outbuf, sizeof (outbuf),
927       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
928   RRDD_STATS_SEND;
929
930   snprintf (outbuf, sizeof (outbuf),
931       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
932   RRDD_STATS_SEND;
933
934   snprintf (outbuf, sizeof (outbuf),
935       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
936   RRDD_STATS_SEND;
937
938   snprintf (outbuf, sizeof (outbuf),
939       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
940   RRDD_STATS_SEND;
941
942   snprintf (outbuf, sizeof (outbuf),
943       "TreeDepth: %"PRIu64"\n", tree_depth);
944   RRDD_STATS_SEND;
945
946   snprintf (outbuf, sizeof(outbuf),
947       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
948   RRDD_STATS_SEND;
949
950   snprintf (outbuf, sizeof(outbuf),
951       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
952   RRDD_STATS_SEND;
953
954   return (0);
955 #undef RRDD_STATS_SEND
956 } /* }}} int handle_request_stats */
957
958 static int handle_request_flush (int fd, /* {{{ */
959     char *buffer, size_t buffer_size)
960 {
961   char *file;
962   int status;
963   char result[CMD_MAX];
964
965   status = buffer_get_field (&buffer, &buffer_size, &file);
966   if (status != 0)
967   {
968     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
969   }
970   else
971   {
972     pthread_mutex_lock(&stats_lock);
973     stats_flush_received++;
974     pthread_mutex_unlock(&stats_lock);
975
976     status = flush_file (file);
977     if (status == 0)
978       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
979     else if (status == ENOENT)
980     {
981       /* no file in our tree; see whether it exists at all */
982       struct stat statbuf;
983
984       memset(&statbuf, 0, sizeof(statbuf));
985       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
986         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
987       else
988         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
989     }
990     else if (status < 0)
991       strncpy (result, "-1 Internal error.\n", sizeof (result));
992     else
993       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
994   }
995   result[sizeof (result) - 1] = 0;
996
997   status = swrite (fd, result, strlen (result));
998   if (status < 0)
999   {
1000     status = errno;
1001     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1002     return (status);
1003   }
1004
1005   return (0);
1006 } /* }}} int handle_request_flush */
1007
1008 static int handle_request_update (int fd, /* {{{ */
1009     char *buffer, size_t buffer_size)
1010 {
1011   char *file;
1012   int values_num = 0;
1013   int status;
1014
1015   time_t now;
1016
1017   cache_item_t *ci;
1018   char answer[CMD_MAX];
1019
1020 #define RRDD_UPDATE_SEND \
1021   answer[sizeof (answer) - 1] = 0; \
1022   status = swrite (fd, answer, strlen (answer)); \
1023   if (status < 0) \
1024   { \
1025     status = errno; \
1026     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1027     return (status); \
1028   }
1029
1030   now = time (NULL);
1031
1032   status = buffer_get_field (&buffer, &buffer_size, &file);
1033   if (status != 0)
1034   {
1035     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1036         sizeof (answer));
1037     RRDD_UPDATE_SEND;
1038     return (0);
1039   }
1040
1041   pthread_mutex_lock(&stats_lock);
1042   stats_updates_received++;
1043   pthread_mutex_unlock(&stats_lock);
1044
1045   pthread_mutex_lock (&cache_lock);
1046
1047   ci = g_tree_lookup (cache_tree, file);
1048   if (ci == NULL) /* {{{ */
1049   {
1050     struct stat statbuf;
1051
1052     memset (&statbuf, 0, sizeof (statbuf));
1053     status = stat (file, &statbuf);
1054     if (status != 0)
1055     {
1056       pthread_mutex_unlock (&cache_lock);
1057       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1058
1059       status = errno;
1060       if (status == ENOENT)
1061         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1062       else
1063         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1064             status);
1065       RRDD_UPDATE_SEND;
1066       return (0);
1067     }
1068     if (!S_ISREG (statbuf.st_mode))
1069     {
1070       pthread_mutex_unlock (&cache_lock);
1071
1072       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1073       RRDD_UPDATE_SEND;
1074       return (0);
1075     }
1076     if (access(file, R_OK|W_OK) != 0)
1077     {
1078       pthread_mutex_unlock (&cache_lock);
1079
1080       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1081                 file, rrd_strerror(errno));
1082       RRDD_UPDATE_SEND;
1083       return (0);
1084     }
1085
1086     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1087     if (ci == NULL)
1088     {
1089       pthread_mutex_unlock (&cache_lock);
1090       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1091
1092       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1093       RRDD_UPDATE_SEND;
1094       return (0);
1095     }
1096     memset (ci, 0, sizeof (cache_item_t));
1097
1098     ci->file = strdup (file);
1099     if (ci->file == NULL)
1100     {
1101       pthread_mutex_unlock (&cache_lock);
1102       free (ci);
1103       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1104
1105       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1106       RRDD_UPDATE_SEND;
1107       return (0);
1108     }
1109
1110     _wipe_ci_values(ci, now);
1111     ci->flags = CI_FLAGS_IN_TREE;
1112
1113     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1114   } /* }}} */
1115   assert (ci != NULL);
1116
1117   while (buffer_size > 0)
1118   {
1119     char **temp;
1120     char *value;
1121
1122     status = buffer_get_field (&buffer, &buffer_size, &value);
1123     if (status != 0)
1124     {
1125       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1126       break;
1127     }
1128
1129     temp = (char **) realloc (ci->values,
1130         sizeof (char *) * (ci->values_num + 1));
1131     if (temp == NULL)
1132     {
1133       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1134       continue;
1135     }
1136     ci->values = temp;
1137
1138     ci->values[ci->values_num] = strdup (value);
1139     if (ci->values[ci->values_num] == NULL)
1140     {
1141       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1142       continue;
1143     }
1144     ci->values_num++;
1145
1146     values_num++;
1147   }
1148
1149   if (((now - ci->last_flush_time) >= config_write_interval)
1150       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1151       && (ci->values_num > 0))
1152   {
1153     enqueue_cache_item (ci, TAIL);
1154     pthread_cond_signal (&cache_cond);
1155   }
1156
1157   pthread_mutex_unlock (&cache_lock);
1158
1159   if (values_num < 1)
1160   {
1161     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1162   }
1163   else
1164   {
1165     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1166         (values_num == 1) ? "" : "s");
1167   }
1168   RRDD_UPDATE_SEND;
1169   return (0);
1170 #undef RRDD_UPDATE_SEND
1171 } /* }}} int handle_request_update */
1172
1173 /* we came across a "WROTE" entry during journal replay.
1174  * throw away any values that we have accumulated for this file
1175  */
1176 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1177                                  const char *buffer,
1178                                  size_t buffer_size __attribute__((unused)))
1179 {
1180   int i;
1181   cache_item_t *ci;
1182   const char *file = buffer;
1183
1184   pthread_mutex_lock(&cache_lock);
1185
1186   ci = g_tree_lookup(cache_tree, file);
1187   if (ci == NULL)
1188   {
1189     pthread_mutex_unlock(&cache_lock);
1190     return (0);
1191   }
1192
1193   if (ci->values)
1194   {
1195     for (i=0; i < ci->values_num; i++)
1196       free(ci->values[i]);
1197
1198     free(ci->values);
1199   }
1200
1201   _wipe_ci_values(ci, time(NULL));
1202
1203   pthread_mutex_unlock(&cache_lock);
1204   return (0);
1205 } /* }}} int handle_request_wrote */
1206
1207 /* if fd < 0, we are in journal replay mode */
1208 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1209 {
1210   char *buffer_ptr;
1211   char *command;
1212   int status;
1213
1214   assert (buffer[buffer_size - 1] == '\0');
1215
1216   buffer_ptr = buffer;
1217   command = NULL;
1218   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1219   if (status != 0)
1220   {
1221     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1222     return (-1);
1223   }
1224
1225   if (strcasecmp (command, "update") == 0)
1226   {
1227     /* don't re-write updates in replay mode */
1228     if (fd >= 0)
1229       journal_write(command, buffer_ptr);
1230
1231     return (handle_request_update (fd, buffer_ptr, buffer_size));
1232   }
1233   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1234   {
1235     /* this is only valid in replay mode */
1236     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1237   }
1238   else if (strcasecmp (command, "flush") == 0)
1239   {
1240     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1241   }
1242   else if (strcasecmp (command, "stats") == 0)
1243   {
1244     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1245   }
1246   else if (strcasecmp (command, "help") == 0)
1247   {
1248     return (handle_request_help (fd, buffer_ptr, buffer_size));
1249   }
1250   else
1251   {
1252     char result[CMD_MAX];
1253
1254     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1255     result[sizeof (result) - 1] = 0;
1256
1257     status = swrite (fd, result, strlen (result));
1258     if (status < 0)
1259     {
1260       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1261       return (-1);
1262     }
1263   }
1264
1265   return (0);
1266 } /* }}} int handle_request */
1267
1268 /* MUST NOT hold journal_lock before calling this */
1269 static void journal_rotate(void) /* {{{ */
1270 {
1271   FILE *old_fh = NULL;
1272
1273   if (journal_cur == NULL || journal_old == NULL)
1274     return;
1275
1276   pthread_mutex_lock(&journal_lock);
1277
1278   /* we rotate this way (rename before close) so that the we can release
1279    * the journal lock as fast as possible.  Journal writes to the new
1280    * journal can proceed immediately after the new file is opened.  The
1281    * fclose can then block without affecting new updates.
1282    */
1283   if (journal_fh != NULL)
1284   {
1285     old_fh = journal_fh;
1286     rename(journal_cur, journal_old);
1287     ++stats_journal_rotate;
1288   }
1289
1290   journal_fh = fopen(journal_cur, "a");
1291   pthread_mutex_unlock(&journal_lock);
1292
1293   if (old_fh != NULL)
1294     fclose(old_fh);
1295
1296   if (journal_fh == NULL)
1297     RRDD_LOG(LOG_CRIT,
1298              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1299              journal_cur, rrd_strerror(errno));
1300
1301 } /* }}} static void journal_rotate */
1302
1303 static void journal_done(void) /* {{{ */
1304 {
1305   if (journal_cur == NULL)
1306     return;
1307
1308   pthread_mutex_lock(&journal_lock);
1309   if (journal_fh != NULL)
1310   {
1311     fclose(journal_fh);
1312     journal_fh = NULL;
1313   }
1314
1315   RRDD_LOG(LOG_INFO, "removing journals");
1316
1317   unlink(journal_old);
1318   unlink(journal_cur);
1319   pthread_mutex_unlock(&journal_lock);
1320
1321 } /* }}} static void journal_done */
1322
1323 static int journal_write(char *cmd, char *args) /* {{{ */
1324 {
1325   int chars;
1326
1327   if (journal_fh == NULL)
1328     return 0;
1329
1330   pthread_mutex_lock(&journal_lock);
1331   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1332   pthread_mutex_unlock(&journal_lock);
1333
1334   if (chars > 0)
1335   {
1336     pthread_mutex_lock(&stats_lock);
1337     stats_journal_bytes += chars;
1338     pthread_mutex_unlock(&stats_lock);
1339   }
1340
1341   return chars;
1342 } /* }}} static int journal_write */
1343
1344 static int journal_replay (const char *file) /* {{{ */
1345 {
1346   FILE *fh;
1347   int entry_cnt = 0;
1348   int fail_cnt = 0;
1349   uint64_t line = 0;
1350   char entry[CMD_MAX];
1351
1352   if (file == NULL) return 0;
1353
1354   fh = fopen(file, "r");
1355   if (fh == NULL)
1356   {
1357     if (errno != ENOENT)
1358       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1359                file, rrd_strerror(errno));
1360     return 0;
1361   }
1362   else
1363     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1364
1365   while(!feof(fh))
1366   {
1367     size_t entry_len;
1368
1369     ++line;
1370     fgets(entry, sizeof(entry), fh);
1371     entry_len = strlen(entry);
1372
1373     /* check \n termination in case journal writing crashed mid-line */
1374     if (entry_len == 0)
1375       continue;
1376     else if (entry[entry_len - 1] != '\n')
1377     {
1378       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1379       ++fail_cnt;
1380       continue;
1381     }
1382
1383     entry[entry_len - 1] = '\0';
1384
1385     if (handle_request(-1, entry, entry_len) == 0)
1386       ++entry_cnt;
1387     else
1388       ++fail_cnt;
1389   }
1390
1391   fclose(fh);
1392
1393   if (entry_cnt > 0)
1394   {
1395     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1396              entry_cnt, fail_cnt);
1397     return 1;
1398   }
1399   else
1400     return 0;
1401
1402 } /* }}} static int journal_replay */
1403
1404 static void *connection_thread_main (void *args) /* {{{ */
1405 {
1406   pthread_t self;
1407   int i;
1408   int fd;
1409   
1410   fd = *((int *) args);
1411   free (args);
1412
1413   pthread_mutex_lock (&connection_threads_lock);
1414   {
1415     pthread_t *temp;
1416
1417     temp = (pthread_t *) realloc (connection_threads,
1418         sizeof (pthread_t) * (connection_threads_num + 1));
1419     if (temp == NULL)
1420     {
1421       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1422     }
1423     else
1424     {
1425       connection_threads = temp;
1426       connection_threads[connection_threads_num] = pthread_self ();
1427       connection_threads_num++;
1428     }
1429   }
1430   pthread_mutex_unlock (&connection_threads_lock);
1431
1432   while (do_shutdown == 0)
1433   {
1434     char buffer[CMD_MAX];
1435
1436     struct pollfd pollfd;
1437     int status;
1438
1439     pollfd.fd = fd;
1440     pollfd.events = POLLIN | POLLPRI;
1441     pollfd.revents = 0;
1442
1443     status = poll (&pollfd, 1, /* timeout = */ 500);
1444     if (status == 0) /* timeout */
1445       continue;
1446     else if (status < 0) /* error */
1447     {
1448       status = errno;
1449       if (status == EINTR)
1450         continue;
1451       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1452       continue;
1453     }
1454
1455     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1456     {
1457       close (fd);
1458       break;
1459     }
1460     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1461     {
1462       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1463           "poll(2) returned something unexpected: %#04hx",
1464           pollfd.revents);
1465       close (fd);
1466       break;
1467     }
1468
1469     status = (int) sread (fd, buffer, sizeof (buffer));
1470     if (status <= 0)
1471     {
1472       close (fd);
1473
1474       if (status < 0)
1475         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1476
1477       break;
1478     }
1479
1480     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1481     if (status != 0)
1482       break;
1483   }
1484
1485   close(fd);
1486
1487   self = pthread_self ();
1488   /* Remove this thread from the connection threads list */
1489   pthread_mutex_lock (&connection_threads_lock);
1490   /* Find out own index in the array */
1491   for (i = 0; i < connection_threads_num; i++)
1492     if (pthread_equal (connection_threads[i], self) != 0)
1493       break;
1494   assert (i < connection_threads_num);
1495
1496   /* Move the trailing threads forward. */
1497   if (i < (connection_threads_num - 1))
1498   {
1499     memmove (connection_threads + i,
1500         connection_threads + i + 1,
1501         sizeof (pthread_t) * (connection_threads_num - i - 1));
1502   }
1503
1504   connection_threads_num--;
1505   pthread_mutex_unlock (&connection_threads_lock);
1506
1507   return (NULL);
1508 } /* }}} void *connection_thread_main */
1509
1510 static int open_listen_socket_unix (const char *path) /* {{{ */
1511 {
1512   int fd;
1513   struct sockaddr_un sa;
1514   listen_socket_t *temp;
1515   int status;
1516
1517   temp = (listen_socket_t *) realloc (listen_fds,
1518       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1519   if (temp == NULL)
1520   {
1521     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1522     return (-1);
1523   }
1524   listen_fds = temp;
1525   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1526
1527   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1528   if (fd < 0)
1529   {
1530     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1531     return (-1);
1532   }
1533
1534   memset (&sa, 0, sizeof (sa));
1535   sa.sun_family = AF_UNIX;
1536   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1537
1538   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1539   if (status != 0)
1540   {
1541     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1542     close (fd);
1543     unlink (path);
1544     return (-1);
1545   }
1546
1547   status = listen (fd, /* backlog = */ 10);
1548   if (status != 0)
1549   {
1550     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1551     close (fd);
1552     unlink (path);
1553     return (-1);
1554   }
1555   
1556   listen_fds[listen_fds_num].fd = fd;
1557   snprintf (listen_fds[listen_fds_num].path,
1558       sizeof (listen_fds[listen_fds_num].path) - 1,
1559       "unix:%s", path);
1560   listen_fds_num++;
1561
1562   return (0);
1563 } /* }}} int open_listen_socket_unix */
1564
1565 static int open_listen_socket (const char *addr_orig) /* {{{ */
1566 {
1567   struct addrinfo ai_hints;
1568   struct addrinfo *ai_res;
1569   struct addrinfo *ai_ptr;
1570   char addr_copy[NI_MAXHOST];
1571   char *addr;
1572   char *port;
1573   int status;
1574
1575   assert (addr_orig != NULL);
1576
1577   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1578   addr_copy[sizeof (addr_copy) - 1] = 0;
1579   addr = addr_copy;
1580
1581   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1582     return (open_listen_socket_unix (addr + strlen ("unix:")));
1583   else if (addr[0] == '/')
1584     return (open_listen_socket_unix (addr));
1585
1586   memset (&ai_hints, 0, sizeof (ai_hints));
1587   ai_hints.ai_flags = 0;
1588 #ifdef AI_ADDRCONFIG
1589   ai_hints.ai_flags |= AI_ADDRCONFIG;
1590 #endif
1591   ai_hints.ai_family = AF_UNSPEC;
1592   ai_hints.ai_socktype = SOCK_STREAM;
1593
1594   port = NULL;
1595  if (*addr == '[') /* IPv6+port format */
1596   {
1597     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1598     addr++;
1599
1600     port = strchr (addr, ']');
1601     if (port == NULL)
1602     {
1603       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1604           addr_orig);
1605       return (-1);
1606     }
1607     *port = 0;
1608     port++;
1609
1610     if (*port == ':')
1611       port++;
1612     else if (*port == 0)
1613       port = NULL;
1614     else
1615     {
1616       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1617           port);
1618       return (-1);
1619     }
1620   } /* if (*addr = ']') */
1621   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1622   {
1623     port = rindex(addr, ':');
1624     if (port != NULL)
1625     {
1626       *port = 0;
1627       port++;
1628     }
1629   }
1630   ai_res = NULL;
1631   status = getaddrinfo (addr,
1632                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1633                         &ai_hints, &ai_res);
1634   if (status != 0)
1635   {
1636     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1637         "%s", addr, gai_strerror (status));
1638     return (-1);
1639   }
1640
1641   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1642   {
1643     int fd;
1644     listen_socket_t *temp;
1645     int one = 1;
1646
1647     temp = (listen_socket_t *) realloc (listen_fds,
1648         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1649     if (temp == NULL)
1650     {
1651       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1652       continue;
1653     }
1654     listen_fds = temp;
1655     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1656
1657     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1658     if (fd < 0)
1659     {
1660       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1661       continue;
1662     }
1663
1664     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1665
1666     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1667     if (status != 0)
1668     {
1669       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1670       close (fd);
1671       continue;
1672     }
1673
1674     status = listen (fd, /* backlog = */ 10);
1675     if (status != 0)
1676     {
1677       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1678       close (fd);
1679       return (-1);
1680     }
1681
1682     listen_fds[listen_fds_num].fd = fd;
1683     strncpy (listen_fds[listen_fds_num].path, addr,
1684         sizeof (listen_fds[listen_fds_num].path) - 1);
1685     listen_fds_num++;
1686   } /* for (ai_ptr) */
1687
1688   return (0);
1689 } /* }}} int open_listen_socket */
1690
1691 static int close_listen_sockets (void) /* {{{ */
1692 {
1693   size_t i;
1694
1695   for (i = 0; i < listen_fds_num; i++)
1696   {
1697     close (listen_fds[i].fd);
1698     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1699       unlink (listen_fds[i].path + strlen ("unix:"));
1700   }
1701
1702   free (listen_fds);
1703   listen_fds = NULL;
1704   listen_fds_num = 0;
1705
1706   return (0);
1707 } /* }}} int close_listen_sockets */
1708
1709 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1710 {
1711   struct pollfd *pollfds;
1712   int pollfds_num;
1713   int status;
1714   int i;
1715
1716   for (i = 0; i < config_listen_address_list_len; i++)
1717     open_listen_socket (config_listen_address_list[i]);
1718
1719   if (config_listen_address_list_len < 1)
1720     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1721
1722   if (listen_fds_num < 1)
1723   {
1724     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1725         "could be opened. Sorry.");
1726     return (NULL);
1727   }
1728
1729   pollfds_num = listen_fds_num;
1730   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1731   if (pollfds == NULL)
1732   {
1733     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1734     return (NULL);
1735   }
1736   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1737
1738   RRDD_LOG(LOG_INFO, "listening for connections");
1739
1740   while (do_shutdown == 0)
1741   {
1742     assert (pollfds_num == ((int) listen_fds_num));
1743     for (i = 0; i < pollfds_num; i++)
1744     {
1745       pollfds[i].fd = listen_fds[i].fd;
1746       pollfds[i].events = POLLIN | POLLPRI;
1747       pollfds[i].revents = 0;
1748     }
1749
1750     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1751     if (status == 0)
1752     {
1753       continue; /* timeout */
1754     }
1755     else if (status < 0)
1756     {
1757       status = errno;
1758       if (status != EINTR)
1759       {
1760         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1761       }
1762       continue;
1763     }
1764
1765     for (i = 0; i < pollfds_num; i++)
1766     {
1767       int *client_sd;
1768       struct sockaddr_storage client_sa;
1769       socklen_t client_sa_size;
1770       pthread_t tid;
1771       pthread_attr_t attr;
1772
1773       if (pollfds[i].revents == 0)
1774         continue;
1775
1776       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1777       {
1778         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1779             "poll(2) returned something unexpected for listen FD #%i.",
1780             pollfds[i].fd);
1781         continue;
1782       }
1783
1784       client_sd = (int *) malloc (sizeof (int));
1785       if (client_sd == NULL)
1786       {
1787         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1788         continue;
1789       }
1790
1791       client_sa_size = sizeof (client_sa);
1792       *client_sd = accept (pollfds[i].fd,
1793           (struct sockaddr *) &client_sa, &client_sa_size);
1794       if (*client_sd < 0)
1795       {
1796         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1797         continue;
1798       }
1799
1800       pthread_attr_init (&attr);
1801       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1802
1803       status = pthread_create (&tid, &attr, connection_thread_main,
1804           /* args = */ (void *) client_sd);
1805       if (status != 0)
1806       {
1807         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1808         close (*client_sd);
1809         free (client_sd);
1810         continue;
1811       }
1812     } /* for (pollfds_num) */
1813   } /* while (do_shutdown == 0) */
1814
1815   RRDD_LOG(LOG_INFO, "starting shutdown");
1816
1817   close_listen_sockets ();
1818
1819   pthread_mutex_lock (&connection_threads_lock);
1820   while (connection_threads_num > 0)
1821   {
1822     pthread_t wait_for;
1823
1824     wait_for = connection_threads[0];
1825
1826     pthread_mutex_unlock (&connection_threads_lock);
1827     pthread_join (wait_for, /* retval = */ NULL);
1828     pthread_mutex_lock (&connection_threads_lock);
1829   }
1830   pthread_mutex_unlock (&connection_threads_lock);
1831
1832   return (NULL);
1833 } /* }}} void *listen_thread_main */
1834
1835 static int daemonize (void) /* {{{ */
1836 {
1837   int status;
1838
1839   /* These structures are static, because `sigaction' behaves weird if the are
1840    * overwritten.. */
1841   static struct sigaction sa_int;
1842   static struct sigaction sa_term;
1843   static struct sigaction sa_pipe;
1844
1845   if (!stay_foreground)
1846   {
1847     pid_t child;
1848     char *base_dir;
1849
1850     child = fork ();
1851     if (child < 0)
1852     {
1853       fprintf (stderr, "daemonize: fork(2) failed.\n");
1854       return (-1);
1855     }
1856     else if (child > 0)
1857     {
1858       return (1);
1859     }
1860
1861     /* Change into the /tmp directory. */
1862     base_dir = (config_base_dir != NULL)
1863       ? config_base_dir
1864       : "/tmp";
1865     status = chdir (base_dir);
1866     if (status != 0)
1867     {
1868       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1869       return (-1);
1870     }
1871
1872     /* Become session leader */
1873     setsid ();
1874
1875     /* Open the first three file descriptors to /dev/null */
1876     close (2);
1877     close (1);
1878     close (0);
1879
1880     open ("/dev/null", O_RDWR);
1881     dup (0);
1882     dup (0);
1883   } /* if (!stay_foreground) */
1884
1885   /* Install signal handlers */
1886   memset (&sa_int, 0, sizeof (sa_int));
1887   sa_int.sa_handler = sig_int_handler;
1888   sigaction (SIGINT, &sa_int, NULL);
1889
1890   memset (&sa_term, 0, sizeof (sa_term));
1891   sa_term.sa_handler = sig_term_handler;
1892   sigaction (SIGTERM, &sa_term, NULL);
1893
1894   memset (&sa_pipe, 0, sizeof (sa_pipe));
1895   sa_pipe.sa_handler = SIG_IGN;
1896   sigaction (SIGPIPE, &sa_pipe, NULL);
1897
1898   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1899   RRDD_LOG(LOG_INFO, "starting up");
1900
1901   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1902   if (cache_tree == NULL)
1903   {
1904     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1905     return (-1);
1906   }
1907
1908   status = write_pidfile ();
1909   return status;
1910 } /* }}} int daemonize */
1911
1912 static int cleanup (void) /* {{{ */
1913 {
1914   do_shutdown++;
1915
1916   pthread_cond_signal (&cache_cond);
1917   pthread_join (queue_thread, /* return = */ NULL);
1918
1919   remove_pidfile ();
1920
1921   RRDD_LOG(LOG_INFO, "goodbye");
1922   closelog ();
1923
1924   return (0);
1925 } /* }}} int cleanup */
1926
1927 static int read_options (int argc, char **argv) /* {{{ */
1928 {
1929   int option;
1930   int status = 0;
1931
1932   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1933   {
1934     switch (option)
1935     {
1936       case 'g':
1937         stay_foreground=1;
1938         break;
1939
1940       case 'l':
1941       {
1942         char **temp;
1943
1944         temp = (char **) realloc (config_listen_address_list,
1945             sizeof (char *) * (config_listen_address_list_len + 1));
1946         if (temp == NULL)
1947         {
1948           fprintf (stderr, "read_options: realloc failed.\n");
1949           return (2);
1950         }
1951         config_listen_address_list = temp;
1952
1953         temp[config_listen_address_list_len] = strdup (optarg);
1954         if (temp[config_listen_address_list_len] == NULL)
1955         {
1956           fprintf (stderr, "read_options: strdup failed.\n");
1957           return (2);
1958         }
1959         config_listen_address_list_len++;
1960       }
1961       break;
1962
1963       case 'f':
1964       {
1965         int temp;
1966
1967         temp = atoi (optarg);
1968         if (temp > 0)
1969           config_flush_interval = temp;
1970         else
1971         {
1972           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1973           status = 3;
1974         }
1975       }
1976       break;
1977
1978       case 'w':
1979       {
1980         int temp;
1981
1982         temp = atoi (optarg);
1983         if (temp > 0)
1984           config_write_interval = temp;
1985         else
1986         {
1987           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1988           status = 2;
1989         }
1990       }
1991       break;
1992
1993       case 'z':
1994       {
1995         int temp;
1996
1997         temp = atoi(optarg);
1998         if (temp > 0)
1999           config_write_jitter = temp;
2000         else
2001         {
2002           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2003           status = 2;
2004         }
2005
2006         break;
2007       }
2008
2009       case 'b':
2010       {
2011         size_t len;
2012
2013         if (config_base_dir != NULL)
2014           free (config_base_dir);
2015         config_base_dir = strdup (optarg);
2016         if (config_base_dir == NULL)
2017         {
2018           fprintf (stderr, "read_options: strdup failed.\n");
2019           return (3);
2020         }
2021
2022         len = strlen (config_base_dir);
2023         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2024         {
2025           config_base_dir[len - 1] = 0;
2026           len--;
2027         }
2028
2029         if (len < 1)
2030         {
2031           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2032           return (4);
2033         }
2034       }
2035       break;
2036
2037       case 'p':
2038       {
2039         if (config_pid_file != NULL)
2040           free (config_pid_file);
2041         config_pid_file = strdup (optarg);
2042         if (config_pid_file == NULL)
2043         {
2044           fprintf (stderr, "read_options: strdup failed.\n");
2045           return (3);
2046         }
2047       }
2048       break;
2049
2050       case 'j':
2051       {
2052         struct stat statbuf;
2053         const char *dir = optarg;
2054
2055         status = stat(dir, &statbuf);
2056         if (status != 0)
2057         {
2058           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2059           return 6;
2060         }
2061
2062         if (!S_ISDIR(statbuf.st_mode)
2063             || access(dir, R_OK|W_OK|X_OK) != 0)
2064         {
2065           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2066                   errno ? rrd_strerror(errno) : "");
2067           return 6;
2068         }
2069
2070         journal_cur = malloc(PATH_MAX + 1);
2071         journal_old = malloc(PATH_MAX + 1);
2072         if (journal_cur == NULL || journal_old == NULL)
2073         {
2074           fprintf(stderr, "malloc failure for journal files\n");
2075           return 6;
2076         }
2077         else 
2078         {
2079           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2080           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2081         }
2082       }
2083       break;
2084
2085       case 'h':
2086       case '?':
2087         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2088             "\n"
2089             "Usage: rrdcached [options]\n"
2090             "\n"
2091             "Valid options are:\n"
2092             "  -l <address>  Socket address to listen to.\n"
2093             "  -w <seconds>  Interval in which to write data.\n"
2094             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2095             "  -f <seconds>  Interval in which to flush dead data.\n"
2096             "  -p <file>     Location of the PID-file.\n"
2097             "  -b <dir>      Base directory to change to.\n"
2098             "  -g            Do not fork and run in the foreground.\n"
2099             "  -j <dir>      Directory in which to create the journal files.\n"
2100             "\n"
2101             "For more information and a detailed description of all options "
2102             "please refer\n"
2103             "to the rrdcached(1) manual page.\n",
2104             VERSION);
2105         status = -1;
2106         break;
2107     } /* switch (option) */
2108   } /* while (getopt) */
2109
2110   /* advise the user when values are not sane */
2111   if (config_flush_interval < 2 * config_write_interval)
2112     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2113             " 2x write interval (-w) !\n");
2114   if (config_write_jitter > config_write_interval)
2115     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2116             " write interval (-w) !\n");
2117
2118   return (status);
2119 } /* }}} int read_options */
2120
2121 int main (int argc, char **argv)
2122 {
2123   int status;
2124
2125   status = read_options (argc, argv);
2126   if (status != 0)
2127   {
2128     if (status < 0)
2129       status = 0;
2130     return (status);
2131   }
2132
2133   status = daemonize ();
2134   if (status == 1)
2135   {
2136     struct sigaction sigchld;
2137
2138     memset (&sigchld, 0, sizeof (sigchld));
2139     sigchld.sa_handler = SIG_IGN;
2140     sigaction (SIGCHLD, &sigchld, NULL);
2141
2142     return (0);
2143   }
2144   else if (status != 0)
2145   {
2146     fprintf (stderr, "daemonize failed, exiting.\n");
2147     return (1);
2148   }
2149
2150   if (journal_cur != NULL)
2151   {
2152     int had_journal = 0;
2153
2154     pthread_mutex_lock(&journal_lock);
2155
2156     RRDD_LOG(LOG_INFO, "checking for journal files");
2157
2158     had_journal += journal_replay(journal_old);
2159     had_journal += journal_replay(journal_cur);
2160
2161     if (had_journal)
2162       flush_old_values(-1);
2163
2164     pthread_mutex_unlock(&journal_lock);
2165     journal_rotate();
2166
2167     RRDD_LOG(LOG_INFO, "journal processing complete");
2168   }
2169
2170   /* start the queue thread */
2171   memset (&queue_thread, 0, sizeof (queue_thread));
2172   status = pthread_create (&queue_thread,
2173                            NULL, /* attr */
2174                            queue_thread_main,
2175                            NULL); /* args */
2176   if (status != 0)
2177   {
2178     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2179     cleanup();
2180     return (1);
2181   }
2182
2183   listen_thread_main (NULL);
2184   cleanup ();
2185
2186   return (0);
2187 } /* int main */
2188
2189 /*
2190  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2191  */