The PID file is created with open() in the parent process, while we still
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 struct listen_socket_s
105 {
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
110
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
125
126 struct callback_flush_data_s
127 {
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
134
135 enum queue_side_e
136 {
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
141
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
144
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
149
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
152
153 static int do_shutdown = 0;
154
155 static pthread_t queue_thread;
156
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
160
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
167
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
173
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
176
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
185
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
194
195 /* 
196  * Functions
197  */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
199 {
200   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201   do_shutdown++;
202   pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
204
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
206 {
207   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208   do_shutdown++;
209   pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
211
212 static int open_pidfile(void) /* {{{ */
213 {
214   int fd;
215   char *file;
216
217   file = (config_pid_file != NULL)
218     ? config_pid_file
219     : LOCALSTATEDIR "/run/rrdcached.pid";
220
221   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
222   if (fd < 0)
223     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
224             file, rrd_strerror(errno));
225
226   return(fd);
227 }
228
229 static int write_pidfile (int fd) /* {{{ */
230 {
231   pid_t pid;
232   FILE *fh;
233
234   pid = getpid ();
235
236   fh = fdopen (fd, "w");
237   if (fh == NULL)
238   {
239     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
240     close(fd);
241     return (-1);
242   }
243
244   fprintf (fh, "%i\n", (int) pid);
245   fclose (fh);
246
247   return (0);
248 } /* }}} int write_pidfile */
249
250 static int remove_pidfile (void) /* {{{ */
251 {
252   char *file;
253   int status;
254
255   file = (config_pid_file != NULL)
256     ? config_pid_file
257     : LOCALSTATEDIR "/run/rrdcached.pid";
258
259   status = unlink (file);
260   if (status == 0)
261     return (0);
262   return (errno);
263 } /* }}} int remove_pidfile */
264
265 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
266 {
267   char    *buffer;
268   size_t   buffer_used;
269   size_t   buffer_free;
270   ssize_t  status;
271
272   buffer       = (char *) buffer_void;
273   buffer_used  = 0;
274   buffer_free  = buffer_size;
275
276   while (buffer_free > 0)
277   {
278     status = read (fd, buffer + buffer_used, buffer_free);
279     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
280       continue;
281
282     if (status < 0)
283       return (-1);
284
285     if (status == 0)
286       return (0);
287
288     assert ((0 > status) || (buffer_free >= (size_t) status));
289
290     buffer_free = buffer_free - status;
291     buffer_used = buffer_used + status;
292
293     if (buffer[buffer_used - 1] == '\n')
294       break;
295   }
296
297   assert (buffer_used > 0);
298
299   if (buffer[buffer_used - 1] != '\n')
300   {
301     errno = ENOBUFS;
302     return (-1);
303   }
304
305   buffer[buffer_used - 1] = 0;
306
307   /* Fix network line endings. */
308   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
309   {
310     buffer_used--;
311     buffer[buffer_used - 1] = 0;
312   }
313
314   return (buffer_used);
315 } /* }}} ssize_t sread */
316
317 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
318 {
319   const char *ptr;
320   size_t      nleft;
321   ssize_t     status;
322
323   /* special case for journal replay */
324   if (fd < 0) return 0;
325
326   ptr   = (const char *) buf;
327   nleft = count;
328
329   while (nleft > 0)
330   {
331     status = write (fd, (const void *) ptr, nleft);
332
333     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
334       continue;
335
336     if (status < 0)
337       return (status);
338
339     nleft -= status;
340     ptr   += status;
341   }
342
343   return (0);
344 } /* }}} ssize_t swrite */
345
346 static void _wipe_ci_values(cache_item_t *ci, time_t when)
347 {
348   ci->values = NULL;
349   ci->values_num = 0;
350
351   ci->last_flush_time = when;
352   if (config_write_jitter > 0)
353     ci->last_flush_time += (random() % config_write_jitter);
354
355   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
356 }
357
358 /*
359  * enqueue_cache_item:
360  * `cache_lock' must be acquired before calling this function!
361  */
362 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
363     queue_side_t side)
364 {
365   int did_insert = 0;
366
367   if (ci == NULL)
368     return (-1);
369
370   if (ci->values_num == 0)
371     return (0);
372
373   if (side == HEAD)
374   {
375     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
376     {
377       assert (ci->next == NULL);
378       ci->next = cache_queue_head;
379       cache_queue_head = ci;
380
381       if (cache_queue_tail == NULL)
382         cache_queue_tail = cache_queue_head;
383
384       did_insert = 1;
385     }
386     else if (cache_queue_head == ci)
387     {
388       /* do nothing */
389     }
390     else /* enqueued, but not first entry */
391     {
392       cache_item_t *prev;
393
394       /* find previous entry */
395       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
396         if (prev->next == ci)
397           break;
398       assert (prev != NULL);
399
400       /* move to the front */
401       prev->next = ci->next;
402       ci->next = cache_queue_head;
403       cache_queue_head = ci;
404
405       /* check if we need to adapt the tail */
406       if (cache_queue_tail == ci)
407         cache_queue_tail = prev;
408     }
409   }
410   else /* (side == TAIL) */
411   {
412     /* We don't move values back in the list.. */
413     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
414       return (0);
415
416     assert (ci->next == NULL);
417
418     if (cache_queue_tail == NULL)
419       cache_queue_head = ci;
420     else
421       cache_queue_tail->next = ci;
422     cache_queue_tail = ci;
423
424     did_insert = 1;
425   }
426
427   ci->flags |= CI_FLAGS_IN_QUEUE;
428
429   if (did_insert)
430   {
431     pthread_mutex_lock (&stats_lock);
432     stats_queue_length++;
433     pthread_mutex_unlock (&stats_lock);
434   }
435
436   return (0);
437 } /* }}} int enqueue_cache_item */
438
439 /*
440  * tree_callback_flush:
441  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
442  * while this is in progress.
443  */
444 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
445     gpointer data)
446 {
447   cache_item_t *ci;
448   callback_flush_data_t *cfd;
449
450   ci = (cache_item_t *) value;
451   cfd = (callback_flush_data_t *) data;
452
453   if ((ci->last_flush_time <= cfd->abs_timeout)
454       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
455       && (ci->values_num > 0))
456   {
457     enqueue_cache_item (ci, TAIL);
458   }
459   else if ((do_shutdown != 0)
460       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
461       && (ci->values_num > 0))
462   {
463     enqueue_cache_item (ci, TAIL);
464   }
465   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
466       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
467       && (ci->values_num <= 0))
468   {
469     char **temp;
470
471     temp = (char **) realloc (cfd->keys,
472         sizeof (char *) * (cfd->keys_num + 1));
473     if (temp == NULL)
474     {
475       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
476       return (FALSE);
477     }
478     cfd->keys = temp;
479     /* Make really sure this points to the _same_ place */
480     assert ((char *) key == ci->file);
481     cfd->keys[cfd->keys_num] = (char *) key;
482     cfd->keys_num++;
483   }
484
485   return (FALSE);
486 } /* }}} gboolean tree_callback_flush */
487
488 static int flush_old_values (int max_age)
489 {
490   callback_flush_data_t cfd;
491   size_t k;
492
493   memset (&cfd, 0, sizeof (cfd));
494   /* Pass the current time as user data so that we don't need to call
495    * `time' for each node. */
496   cfd.now = time (NULL);
497   cfd.keys = NULL;
498   cfd.keys_num = 0;
499
500   if (max_age > 0)
501     cfd.abs_timeout = cfd.now - max_age;
502   else
503     cfd.abs_timeout = cfd.now + 1;
504
505   /* `tree_callback_flush' will return the keys of all values that haven't
506    * been touched in the last `config_flush_interval' seconds in `cfd'.
507    * The char*'s in this array point to the same memory as ci->file, so we
508    * don't need to free them separately. */
509   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
510
511   for (k = 0; k < cfd.keys_num; k++)
512   {
513     cache_item_t *ci;
514
515     /* This must not fail. */
516     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
517     assert (ci != NULL);
518
519     /* If we end up here with values available, something's seriously
520      * messed up. */
521     assert (ci->values_num == 0);
522
523     /* Remove the node from the tree */
524     g_tree_remove (cache_tree, cfd.keys[k]);
525     cfd.keys[k] = NULL;
526
527     /* Now free and clean up `ci'. */
528     free (ci->file);
529     ci->file = NULL;
530     free (ci);
531     ci = NULL;
532   } /* for (k = 0; k < cfd.keys_num; k++) */
533
534   if (cfd.keys != NULL)
535   {
536     free (cfd.keys);
537     cfd.keys = NULL;
538   }
539
540   return (0);
541 } /* int flush_old_values */
542
543 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
544 {
545   struct timeval now;
546   struct timespec next_flush;
547
548   gettimeofday (&now, NULL);
549   next_flush.tv_sec = now.tv_sec + config_flush_interval;
550   next_flush.tv_nsec = 1000 * now.tv_usec;
551
552   pthread_mutex_lock (&cache_lock);
553   while ((do_shutdown == 0) || (cache_queue_head != NULL))
554   {
555     cache_item_t *ci;
556     char *file;
557     char **values;
558     int values_num;
559     int status;
560     int i;
561
562     /* First, check if it's time to do the cache flush. */
563     gettimeofday (&now, NULL);
564     if ((now.tv_sec > next_flush.tv_sec)
565         || ((now.tv_sec == next_flush.tv_sec)
566           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
567     {
568       /* Flush all values that haven't been written in the last
569        * `config_write_interval' seconds. */
570       flush_old_values (config_write_interval);
571
572       /* Determine the time of the next cache flush. */
573       while (next_flush.tv_sec <= now.tv_sec)
574         next_flush.tv_sec += config_flush_interval;
575
576       /* unlock the cache while we rotate so we don't block incoming
577        * updates if the fsync() blocks on disk I/O */
578       pthread_mutex_unlock(&cache_lock);
579       journal_rotate();
580       pthread_mutex_lock(&cache_lock);
581     }
582
583     /* Now, check if there's something to store away. If not, wait until
584      * something comes in or it's time to do the cache flush. */
585     if (cache_queue_head == NULL)
586     {
587       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
588       if ((status != 0) && (status != ETIMEDOUT))
589       {
590         RRDD_LOG (LOG_ERR, "queue_thread_main: "
591             "pthread_cond_timedwait returned %i.", status);
592       }
593     }
594
595     /* We're about to shut down, so lets flush the entire tree. */
596     if ((do_shutdown != 0) && (cache_queue_head == NULL))
597       flush_old_values (/* max age = */ -1);
598
599     /* Check if a value has arrived. This may be NULL if we timed out or there
600      * was an interrupt such as a signal. */
601     if (cache_queue_head == NULL)
602       continue;
603
604     ci = cache_queue_head;
605
606     /* copy the relevant parts */
607     file = strdup (ci->file);
608     if (file == NULL)
609     {
610       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
611       continue;
612     }
613
614     assert(ci->values != NULL);
615     assert(ci->values_num > 0);
616
617     values = ci->values;
618     values_num = ci->values_num;
619
620     _wipe_ci_values(ci, time(NULL));
621
622     cache_queue_head = ci->next;
623     if (cache_queue_head == NULL)
624       cache_queue_tail = NULL;
625     ci->next = NULL;
626
627     pthread_mutex_lock (&stats_lock);
628     assert (stats_queue_length > 0);
629     stats_queue_length--;
630     pthread_mutex_unlock (&stats_lock);
631
632     pthread_mutex_unlock (&cache_lock);
633
634     rrd_clear_error ();
635     status = rrd_update_r (file, NULL, values_num, (void *) values);
636     if (status != 0)
637     {
638       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
639           "rrd_update_r (%s) failed with status %i. (%s)",
640           file, status, rrd_get_error());
641     }
642
643     journal_write("wrote", file);
644     pthread_cond_broadcast(&ci->flushed);
645
646     for (i = 0; i < values_num; i++)
647       free (values[i]);
648
649     free(values);
650     free(file);
651
652     if (status == 0)
653     {
654       pthread_mutex_lock (&stats_lock);
655       stats_updates_written++;
656       stats_data_sets_written += values_num;
657       pthread_mutex_unlock (&stats_lock);
658     }
659
660     pthread_mutex_lock (&cache_lock);
661
662     /* We're about to shut down, so lets flush the entire tree. */
663     if ((do_shutdown != 0) && (cache_queue_head == NULL))
664       flush_old_values (/* max age = */ -1);
665   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
666   pthread_mutex_unlock (&cache_lock);
667
668   assert(cache_queue_head == NULL);
669   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
670   journal_done();
671
672   return (NULL);
673 } /* }}} void *queue_thread_main */
674
675 static int buffer_get_field (char **buffer_ret, /* {{{ */
676     size_t *buffer_size_ret, char **field_ret)
677 {
678   char *buffer;
679   size_t buffer_pos;
680   size_t buffer_size;
681   char *field;
682   size_t field_size;
683   int status;
684
685   buffer = *buffer_ret;
686   buffer_pos = 0;
687   buffer_size = *buffer_size_ret;
688   field = *buffer_ret;
689   field_size = 0;
690
691   if (buffer_size <= 0)
692     return (-1);
693
694   /* This is ensured by `handle_request'. */
695   assert (buffer[buffer_size - 1] == '\0');
696
697   status = -1;
698   while (buffer_pos < buffer_size)
699   {
700     /* Check for end-of-field or end-of-buffer */
701     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
702     {
703       field[field_size] = 0;
704       field_size++;
705       buffer_pos++;
706       status = 0;
707       break;
708     }
709     /* Handle escaped characters. */
710     else if (buffer[buffer_pos] == '\\')
711     {
712       if (buffer_pos >= (buffer_size - 1))
713         break;
714       buffer_pos++;
715       field[field_size] = buffer[buffer_pos];
716       field_size++;
717       buffer_pos++;
718     }
719     /* Normal operation */ 
720     else
721     {
722       field[field_size] = buffer[buffer_pos];
723       field_size++;
724       buffer_pos++;
725     }
726   } /* while (buffer_pos < buffer_size) */
727
728   if (status != 0)
729     return (status);
730
731   *buffer_ret = buffer + buffer_pos;
732   *buffer_size_ret = buffer_size - buffer_pos;
733   *field_ret = field;
734
735   return (0);
736 } /* }}} int buffer_get_field */
737
738 static int flush_file (const char *filename) /* {{{ */
739 {
740   cache_item_t *ci;
741
742   pthread_mutex_lock (&cache_lock);
743
744   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
745   if (ci == NULL)
746   {
747     pthread_mutex_unlock (&cache_lock);
748     return (ENOENT);
749   }
750
751   /* Enqueue at head */
752   enqueue_cache_item (ci, HEAD);
753   pthread_cond_signal (&cache_cond);
754
755   pthread_cond_wait(&ci->flushed, &cache_lock);
756   pthread_mutex_unlock(&cache_lock);
757
758   return (0);
759 } /* }}} int flush_file */
760
761 static int handle_request_help (int fd, /* {{{ */
762     char *buffer, size_t buffer_size)
763 {
764   int status;
765   char **help_text;
766   size_t help_text_len;
767   char *command;
768   size_t i;
769
770   char *help_help[] =
771   {
772     "4 Command overview\n",
773     "FLUSH <filename>\n",
774     "HELP [<command>]\n",
775     "UPDATE <filename> <values> [<values> ...]\n",
776     "STATS\n"
777   };
778   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
779
780   char *help_flush[] =
781   {
782     "4 Help for FLUSH\n",
783     "Usage: FLUSH <filename>\n",
784     "\n",
785     "Adds the given filename to the head of the update queue and returns\n",
786     "after is has been dequeued.\n"
787   };
788   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
789
790   char *help_update[] =
791   {
792     "9 Help for UPDATE\n",
793     "Usage: UPDATE <filename> <values> [<values> ...]\n"
794     "\n",
795     "Adds the given file to the internal cache if it is not yet known and\n",
796     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
797     "for details.\n",
798     "\n",
799     "Each <values> has the following form:\n",
800     "  <values> = <time>:<value>[:<value>[...]]\n",
801     "See the rrdupdate(1) manpage for details.\n"
802   };
803   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
804
805   char *help_stats[] =
806   {
807     "4 Help for STATS\n",
808     "Usage: STATS\n",
809     "\n",
810     "Returns some performance counters, see the rrdcached(1) manpage for\n",
811     "a description of the values.\n"
812   };
813   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
814
815   status = buffer_get_field (&buffer, &buffer_size, &command);
816   if (status != 0)
817   {
818     help_text = help_help;
819     help_text_len = help_help_len;
820   }
821   else
822   {
823     if (strcasecmp (command, "update") == 0)
824     {
825       help_text = help_update;
826       help_text_len = help_update_len;
827     }
828     else if (strcasecmp (command, "flush") == 0)
829     {
830       help_text = help_flush;
831       help_text_len = help_flush_len;
832     }
833     else if (strcasecmp (command, "stats") == 0)
834     {
835       help_text = help_stats;
836       help_text_len = help_stats_len;
837     }
838     else
839     {
840       help_text = help_help;
841       help_text_len = help_help_len;
842     }
843   }
844
845   for (i = 0; i < help_text_len; i++)
846   {
847     status = swrite (fd, help_text[i], strlen (help_text[i]));
848     if (status < 0)
849     {
850       status = errno;
851       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
852       return (status);
853     }
854   }
855
856   return (0);
857 } /* }}} int handle_request_help */
858
859 static int handle_request_stats (int fd, /* {{{ */
860     char *buffer __attribute__((unused)),
861     size_t buffer_size __attribute__((unused)))
862 {
863   int status;
864   char outbuf[CMD_MAX];
865
866   uint64_t copy_queue_length;
867   uint64_t copy_updates_received;
868   uint64_t copy_flush_received;
869   uint64_t copy_updates_written;
870   uint64_t copy_data_sets_written;
871   uint64_t copy_journal_bytes;
872   uint64_t copy_journal_rotate;
873
874   uint64_t tree_nodes_number;
875   uint64_t tree_depth;
876
877   pthread_mutex_lock (&stats_lock);
878   copy_queue_length       = stats_queue_length;
879   copy_updates_received   = stats_updates_received;
880   copy_flush_received     = stats_flush_received;
881   copy_updates_written    = stats_updates_written;
882   copy_data_sets_written  = stats_data_sets_written;
883   copy_journal_bytes      = stats_journal_bytes;
884   copy_journal_rotate     = stats_journal_rotate;
885   pthread_mutex_unlock (&stats_lock);
886
887   pthread_mutex_lock (&cache_lock);
888   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
889   tree_depth        = (uint64_t) g_tree_height (cache_tree);
890   pthread_mutex_unlock (&cache_lock);
891
892 #define RRDD_STATS_SEND \
893   outbuf[sizeof (outbuf) - 1] = 0; \
894   status = swrite (fd, outbuf, strlen (outbuf)); \
895   if (status < 0) \
896   { \
897     status = errno; \
898     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
899     return (status); \
900   }
901
902   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
903   RRDD_STATS_SEND;
904
905   snprintf (outbuf, sizeof (outbuf),
906       "QueueLength: %"PRIu64"\n", copy_queue_length);
907   RRDD_STATS_SEND;
908
909   snprintf (outbuf, sizeof (outbuf),
910       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
911   RRDD_STATS_SEND;
912
913   snprintf (outbuf, sizeof (outbuf),
914       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
915   RRDD_STATS_SEND;
916
917   snprintf (outbuf, sizeof (outbuf),
918       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
919   RRDD_STATS_SEND;
920
921   snprintf (outbuf, sizeof (outbuf),
922       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
923   RRDD_STATS_SEND;
924
925   snprintf (outbuf, sizeof (outbuf),
926       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
927   RRDD_STATS_SEND;
928
929   snprintf (outbuf, sizeof (outbuf),
930       "TreeDepth: %"PRIu64"\n", tree_depth);
931   RRDD_STATS_SEND;
932
933   snprintf (outbuf, sizeof(outbuf),
934       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
935   RRDD_STATS_SEND;
936
937   snprintf (outbuf, sizeof(outbuf),
938       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
939   RRDD_STATS_SEND;
940
941   return (0);
942 #undef RRDD_STATS_SEND
943 } /* }}} int handle_request_stats */
944
945 static int handle_request_flush (int fd, /* {{{ */
946     char *buffer, size_t buffer_size)
947 {
948   char *file;
949   int status;
950   char result[CMD_MAX];
951
952   status = buffer_get_field (&buffer, &buffer_size, &file);
953   if (status != 0)
954   {
955     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
956   }
957   else
958   {
959     pthread_mutex_lock(&stats_lock);
960     stats_flush_received++;
961     pthread_mutex_unlock(&stats_lock);
962
963     status = flush_file (file);
964     if (status == 0)
965       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
966     else if (status == ENOENT)
967     {
968       /* no file in our tree; see whether it exists at all */
969       struct stat statbuf;
970
971       memset(&statbuf, 0, sizeof(statbuf));
972       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
973         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
974       else
975         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
976     }
977     else if (status < 0)
978       strncpy (result, "-1 Internal error.\n", sizeof (result));
979     else
980       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
981   }
982   result[sizeof (result) - 1] = 0;
983
984   status = swrite (fd, result, strlen (result));
985   if (status < 0)
986   {
987     status = errno;
988     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
989     return (status);
990   }
991
992   return (0);
993 } /* }}} int handle_request_flush */
994
995 static int handle_request_update (int fd, /* {{{ */
996     char *buffer, size_t buffer_size)
997 {
998   char *file;
999   int values_num = 0;
1000   int status;
1001
1002   time_t now;
1003
1004   cache_item_t *ci;
1005   char answer[CMD_MAX];
1006
1007 #define RRDD_UPDATE_SEND \
1008   answer[sizeof (answer) - 1] = 0; \
1009   status = swrite (fd, answer, strlen (answer)); \
1010   if (status < 0) \
1011   { \
1012     status = errno; \
1013     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1014     return (status); \
1015   }
1016
1017   now = time (NULL);
1018
1019   status = buffer_get_field (&buffer, &buffer_size, &file);
1020   if (status != 0)
1021   {
1022     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1023         sizeof (answer));
1024     RRDD_UPDATE_SEND;
1025     return (0);
1026   }
1027
1028   pthread_mutex_lock(&stats_lock);
1029   stats_updates_received++;
1030   pthread_mutex_unlock(&stats_lock);
1031
1032   pthread_mutex_lock (&cache_lock);
1033   ci = g_tree_lookup (cache_tree, file);
1034
1035   if (ci == NULL) /* {{{ */
1036   {
1037     struct stat statbuf;
1038
1039     /* don't hold the lock while we setup; stat(2) might block */
1040     pthread_mutex_unlock(&cache_lock);
1041
1042     memset (&statbuf, 0, sizeof (statbuf));
1043     status = stat (file, &statbuf);
1044     if (status != 0)
1045     {
1046       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1047
1048       status = errno;
1049       if (status == ENOENT)
1050         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1051       else
1052         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1053             status);
1054       RRDD_UPDATE_SEND;
1055       return (0);
1056     }
1057     if (!S_ISREG (statbuf.st_mode))
1058     {
1059       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1060       RRDD_UPDATE_SEND;
1061       return (0);
1062     }
1063     if (access(file, R_OK|W_OK) != 0)
1064     {
1065       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1066                 file, rrd_strerror(errno));
1067       RRDD_UPDATE_SEND;
1068       return (0);
1069     }
1070
1071     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1072     if (ci == NULL)
1073     {
1074       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1075
1076       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1077       RRDD_UPDATE_SEND;
1078       return (0);
1079     }
1080     memset (ci, 0, sizeof (cache_item_t));
1081
1082     ci->file = strdup (file);
1083     if (ci->file == NULL)
1084     {
1085       free (ci);
1086       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1087
1088       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1089       RRDD_UPDATE_SEND;
1090       return (0);
1091     }
1092
1093     _wipe_ci_values(ci, now);
1094     ci->flags = CI_FLAGS_IN_TREE;
1095
1096     pthread_mutex_lock(&cache_lock);
1097     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1098   } /* }}} */
1099   assert (ci != NULL);
1100
1101   while (buffer_size > 0)
1102   {
1103     char **temp;
1104     char *value;
1105
1106     status = buffer_get_field (&buffer, &buffer_size, &value);
1107     if (status != 0)
1108     {
1109       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1110       break;
1111     }
1112
1113     temp = (char **) realloc (ci->values,
1114         sizeof (char *) * (ci->values_num + 1));
1115     if (temp == NULL)
1116     {
1117       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1118       continue;
1119     }
1120     ci->values = temp;
1121
1122     ci->values[ci->values_num] = strdup (value);
1123     if (ci->values[ci->values_num] == NULL)
1124     {
1125       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1126       continue;
1127     }
1128     ci->values_num++;
1129
1130     values_num++;
1131   }
1132
1133   if (((now - ci->last_flush_time) >= config_write_interval)
1134       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1135       && (ci->values_num > 0))
1136   {
1137     enqueue_cache_item (ci, TAIL);
1138     pthread_cond_signal (&cache_cond);
1139   }
1140
1141   pthread_mutex_unlock (&cache_lock);
1142
1143   if (values_num < 1)
1144   {
1145     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1146   }
1147   else
1148   {
1149     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1150         (values_num == 1) ? "" : "s");
1151   }
1152   RRDD_UPDATE_SEND;
1153   return (0);
1154 #undef RRDD_UPDATE_SEND
1155 } /* }}} int handle_request_update */
1156
1157 /* we came across a "WROTE" entry during journal replay.
1158  * throw away any values that we have accumulated for this file
1159  */
1160 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1161                                  const char *buffer,
1162                                  size_t buffer_size __attribute__((unused)))
1163 {
1164   int i;
1165   cache_item_t *ci;
1166   const char *file = buffer;
1167
1168   pthread_mutex_lock(&cache_lock);
1169
1170   ci = g_tree_lookup(cache_tree, file);
1171   if (ci == NULL)
1172   {
1173     pthread_mutex_unlock(&cache_lock);
1174     return (0);
1175   }
1176
1177   if (ci->values)
1178   {
1179     for (i=0; i < ci->values_num; i++)
1180       free(ci->values[i]);
1181
1182     free(ci->values);
1183   }
1184
1185   _wipe_ci_values(ci, time(NULL));
1186
1187   pthread_mutex_unlock(&cache_lock);
1188   return (0);
1189 } /* }}} int handle_request_wrote */
1190
1191 /* if fd < 0, we are in journal replay mode */
1192 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1193 {
1194   char *buffer_ptr;
1195   char *command;
1196   int status;
1197
1198   assert (buffer[buffer_size - 1] == '\0');
1199
1200   buffer_ptr = buffer;
1201   command = NULL;
1202   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1203   if (status != 0)
1204   {
1205     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1206     return (-1);
1207   }
1208
1209   if (strcasecmp (command, "update") == 0)
1210   {
1211     /* don't re-write updates in replay mode */
1212     if (fd >= 0)
1213       journal_write(command, buffer_ptr);
1214
1215     return (handle_request_update (fd, buffer_ptr, buffer_size));
1216   }
1217   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1218   {
1219     /* this is only valid in replay mode */
1220     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1221   }
1222   else if (strcasecmp (command, "flush") == 0)
1223   {
1224     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1225   }
1226   else if (strcasecmp (command, "stats") == 0)
1227   {
1228     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1229   }
1230   else if (strcasecmp (command, "help") == 0)
1231   {
1232     return (handle_request_help (fd, buffer_ptr, buffer_size));
1233   }
1234   else
1235   {
1236     char result[CMD_MAX];
1237
1238     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1239     result[sizeof (result) - 1] = 0;
1240
1241     status = swrite (fd, result, strlen (result));
1242     if (status < 0)
1243     {
1244       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1245       return (-1);
1246     }
1247   }
1248
1249   return (0);
1250 } /* }}} int handle_request */
1251
1252 /* MUST NOT hold journal_lock before calling this */
1253 static void journal_rotate(void) /* {{{ */
1254 {
1255   FILE *old_fh = NULL;
1256
1257   if (journal_cur == NULL || journal_old == NULL)
1258     return;
1259
1260   pthread_mutex_lock(&journal_lock);
1261
1262   /* we rotate this way (rename before close) so that the we can release
1263    * the journal lock as fast as possible.  Journal writes to the new
1264    * journal can proceed immediately after the new file is opened.  The
1265    * fclose can then block without affecting new updates.
1266    */
1267   if (journal_fh != NULL)
1268   {
1269     old_fh = journal_fh;
1270     rename(journal_cur, journal_old);
1271     ++stats_journal_rotate;
1272   }
1273
1274   journal_fh = fopen(journal_cur, "a");
1275   pthread_mutex_unlock(&journal_lock);
1276
1277   if (old_fh != NULL)
1278     fclose(old_fh);
1279
1280   if (journal_fh == NULL)
1281     RRDD_LOG(LOG_CRIT,
1282              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1283              journal_cur, rrd_strerror(errno));
1284
1285 } /* }}} static void journal_rotate */
1286
1287 static void journal_done(void) /* {{{ */
1288 {
1289   if (journal_cur == NULL)
1290     return;
1291
1292   pthread_mutex_lock(&journal_lock);
1293   if (journal_fh != NULL)
1294   {
1295     fclose(journal_fh);
1296     journal_fh = NULL;
1297   }
1298
1299   RRDD_LOG(LOG_INFO, "removing journals");
1300
1301   unlink(journal_old);
1302   unlink(journal_cur);
1303   pthread_mutex_unlock(&journal_lock);
1304
1305 } /* }}} static void journal_done */
1306
1307 static int journal_write(char *cmd, char *args) /* {{{ */
1308 {
1309   int chars;
1310
1311   if (journal_fh == NULL)
1312     return 0;
1313
1314   pthread_mutex_lock(&journal_lock);
1315   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1316   pthread_mutex_unlock(&journal_lock);
1317
1318   if (chars > 0)
1319   {
1320     pthread_mutex_lock(&stats_lock);
1321     stats_journal_bytes += chars;
1322     pthread_mutex_unlock(&stats_lock);
1323   }
1324
1325   return chars;
1326 } /* }}} static int journal_write */
1327
1328 static int journal_replay (const char *file) /* {{{ */
1329 {
1330   FILE *fh;
1331   int entry_cnt = 0;
1332   int fail_cnt = 0;
1333   uint64_t line = 0;
1334   char entry[CMD_MAX];
1335
1336   if (file == NULL) return 0;
1337
1338   fh = fopen(file, "r");
1339   if (fh == NULL)
1340   {
1341     if (errno != ENOENT)
1342       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1343                file, rrd_strerror(errno));
1344     return 0;
1345   }
1346   else
1347     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1348
1349   while(!feof(fh))
1350   {
1351     size_t entry_len;
1352
1353     ++line;
1354     fgets(entry, sizeof(entry), fh);
1355     entry_len = strlen(entry);
1356
1357     /* check \n termination in case journal writing crashed mid-line */
1358     if (entry_len == 0)
1359       continue;
1360     else if (entry[entry_len - 1] != '\n')
1361     {
1362       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1363       ++fail_cnt;
1364       continue;
1365     }
1366
1367     entry[entry_len - 1] = '\0';
1368
1369     if (handle_request(-1, entry, entry_len) == 0)
1370       ++entry_cnt;
1371     else
1372       ++fail_cnt;
1373   }
1374
1375   fclose(fh);
1376
1377   if (entry_cnt > 0)
1378   {
1379     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1380              entry_cnt, fail_cnt);
1381     return 1;
1382   }
1383   else
1384     return 0;
1385
1386 } /* }}} static int journal_replay */
1387
1388 static void *connection_thread_main (void *args) /* {{{ */
1389 {
1390   pthread_t self;
1391   int i;
1392   int fd;
1393   
1394   fd = *((int *) args);
1395   free (args);
1396
1397   pthread_mutex_lock (&connection_threads_lock);
1398   {
1399     pthread_t *temp;
1400
1401     temp = (pthread_t *) realloc (connection_threads,
1402         sizeof (pthread_t) * (connection_threads_num + 1));
1403     if (temp == NULL)
1404     {
1405       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1406     }
1407     else
1408     {
1409       connection_threads = temp;
1410       connection_threads[connection_threads_num] = pthread_self ();
1411       connection_threads_num++;
1412     }
1413   }
1414   pthread_mutex_unlock (&connection_threads_lock);
1415
1416   while (do_shutdown == 0)
1417   {
1418     char buffer[CMD_MAX];
1419
1420     struct pollfd pollfd;
1421     int status;
1422
1423     pollfd.fd = fd;
1424     pollfd.events = POLLIN | POLLPRI;
1425     pollfd.revents = 0;
1426
1427     status = poll (&pollfd, 1, /* timeout = */ 500);
1428     if (status == 0) /* timeout */
1429       continue;
1430     else if (status < 0) /* error */
1431     {
1432       status = errno;
1433       if (status == EINTR)
1434         continue;
1435       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1436       continue;
1437     }
1438
1439     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1440     {
1441       close (fd);
1442       break;
1443     }
1444     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1445     {
1446       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1447           "poll(2) returned something unexpected: %#04hx",
1448           pollfd.revents);
1449       close (fd);
1450       break;
1451     }
1452
1453     status = (int) sread (fd, buffer, sizeof (buffer));
1454     if (status <= 0)
1455     {
1456       close (fd);
1457
1458       if (status < 0)
1459         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1460
1461       break;
1462     }
1463
1464     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1465     if (status != 0)
1466       break;
1467   }
1468
1469   close(fd);
1470
1471   self = pthread_self ();
1472   /* Remove this thread from the connection threads list */
1473   pthread_mutex_lock (&connection_threads_lock);
1474   /* Find out own index in the array */
1475   for (i = 0; i < connection_threads_num; i++)
1476     if (pthread_equal (connection_threads[i], self) != 0)
1477       break;
1478   assert (i < connection_threads_num);
1479
1480   /* Move the trailing threads forward. */
1481   if (i < (connection_threads_num - 1))
1482   {
1483     memmove (connection_threads + i,
1484         connection_threads + i + 1,
1485         sizeof (pthread_t) * (connection_threads_num - i - 1));
1486   }
1487
1488   connection_threads_num--;
1489   pthread_mutex_unlock (&connection_threads_lock);
1490
1491   return (NULL);
1492 } /* }}} void *connection_thread_main */
1493
1494 static int open_listen_socket_unix (const char *path) /* {{{ */
1495 {
1496   int fd;
1497   struct sockaddr_un sa;
1498   listen_socket_t *temp;
1499   int status;
1500
1501   temp = (listen_socket_t *) realloc (listen_fds,
1502       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1503   if (temp == NULL)
1504   {
1505     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1506     return (-1);
1507   }
1508   listen_fds = temp;
1509   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1510
1511   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1512   if (fd < 0)
1513   {
1514     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1515     return (-1);
1516   }
1517
1518   memset (&sa, 0, sizeof (sa));
1519   sa.sun_family = AF_UNIX;
1520   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1521
1522   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1523   if (status != 0)
1524   {
1525     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1526     close (fd);
1527     unlink (path);
1528     return (-1);
1529   }
1530
1531   status = listen (fd, /* backlog = */ 10);
1532   if (status != 0)
1533   {
1534     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1535     close (fd);
1536     unlink (path);
1537     return (-1);
1538   }
1539   
1540   listen_fds[listen_fds_num].fd = fd;
1541   snprintf (listen_fds[listen_fds_num].path,
1542       sizeof (listen_fds[listen_fds_num].path) - 1,
1543       "unix:%s", path);
1544   listen_fds_num++;
1545
1546   return (0);
1547 } /* }}} int open_listen_socket_unix */
1548
1549 static int open_listen_socket (const char *addr_orig) /* {{{ */
1550 {
1551   struct addrinfo ai_hints;
1552   struct addrinfo *ai_res;
1553   struct addrinfo *ai_ptr;
1554   char addr_copy[NI_MAXHOST];
1555   char *addr;
1556   char *port;
1557   int status;
1558
1559   assert (addr_orig != NULL);
1560
1561   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1562   addr_copy[sizeof (addr_copy) - 1] = 0;
1563   addr = addr_copy;
1564
1565   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1566     return (open_listen_socket_unix (addr + strlen ("unix:")));
1567   else if (addr[0] == '/')
1568     return (open_listen_socket_unix (addr));
1569
1570   memset (&ai_hints, 0, sizeof (ai_hints));
1571   ai_hints.ai_flags = 0;
1572 #ifdef AI_ADDRCONFIG
1573   ai_hints.ai_flags |= AI_ADDRCONFIG;
1574 #endif
1575   ai_hints.ai_family = AF_UNSPEC;
1576   ai_hints.ai_socktype = SOCK_STREAM;
1577
1578   port = NULL;
1579  if (*addr == '[') /* IPv6+port format */
1580   {
1581     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1582     addr++;
1583
1584     port = strchr (addr, ']');
1585     if (port == NULL)
1586     {
1587       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1588           addr_orig);
1589       return (-1);
1590     }
1591     *port = 0;
1592     port++;
1593
1594     if (*port == ':')
1595       port++;
1596     else if (*port == 0)
1597       port = NULL;
1598     else
1599     {
1600       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1601           port);
1602       return (-1);
1603     }
1604   } /* if (*addr = ']') */
1605   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1606   {
1607     port = rindex(addr, ':');
1608     if (port != NULL)
1609     {
1610       *port = 0;
1611       port++;
1612     }
1613   }
1614   ai_res = NULL;
1615   status = getaddrinfo (addr,
1616                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1617                         &ai_hints, &ai_res);
1618   if (status != 0)
1619   {
1620     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1621         "%s", addr, gai_strerror (status));
1622     return (-1);
1623   }
1624
1625   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1626   {
1627     int fd;
1628     listen_socket_t *temp;
1629     int one = 1;
1630
1631     temp = (listen_socket_t *) realloc (listen_fds,
1632         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1633     if (temp == NULL)
1634     {
1635       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1636       continue;
1637     }
1638     listen_fds = temp;
1639     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1640
1641     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1642     if (fd < 0)
1643     {
1644       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1645       continue;
1646     }
1647
1648     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1649
1650     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1651     if (status != 0)
1652     {
1653       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1654       close (fd);
1655       continue;
1656     }
1657
1658     status = listen (fd, /* backlog = */ 10);
1659     if (status != 0)
1660     {
1661       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1662       close (fd);
1663       return (-1);
1664     }
1665
1666     listen_fds[listen_fds_num].fd = fd;
1667     strncpy (listen_fds[listen_fds_num].path, addr,
1668         sizeof (listen_fds[listen_fds_num].path) - 1);
1669     listen_fds_num++;
1670   } /* for (ai_ptr) */
1671
1672   return (0);
1673 } /* }}} int open_listen_socket */
1674
1675 static int close_listen_sockets (void) /* {{{ */
1676 {
1677   size_t i;
1678
1679   for (i = 0; i < listen_fds_num; i++)
1680   {
1681     close (listen_fds[i].fd);
1682     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1683       unlink (listen_fds[i].path + strlen ("unix:"));
1684   }
1685
1686   free (listen_fds);
1687   listen_fds = NULL;
1688   listen_fds_num = 0;
1689
1690   return (0);
1691 } /* }}} int close_listen_sockets */
1692
1693 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1694 {
1695   struct pollfd *pollfds;
1696   int pollfds_num;
1697   int status;
1698   int i;
1699
1700   for (i = 0; i < config_listen_address_list_len; i++)
1701     open_listen_socket (config_listen_address_list[i]);
1702
1703   if (config_listen_address_list_len < 1)
1704     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1705
1706   if (listen_fds_num < 1)
1707   {
1708     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1709         "could be opened. Sorry.");
1710     return (NULL);
1711   }
1712
1713   pollfds_num = listen_fds_num;
1714   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1715   if (pollfds == NULL)
1716   {
1717     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1718     return (NULL);
1719   }
1720   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1721
1722   RRDD_LOG(LOG_INFO, "listening for connections");
1723
1724   while (do_shutdown == 0)
1725   {
1726     assert (pollfds_num == ((int) listen_fds_num));
1727     for (i = 0; i < pollfds_num; i++)
1728     {
1729       pollfds[i].fd = listen_fds[i].fd;
1730       pollfds[i].events = POLLIN | POLLPRI;
1731       pollfds[i].revents = 0;
1732     }
1733
1734     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1735     if (status == 0)
1736     {
1737       continue; /* timeout */
1738     }
1739     else if (status < 0)
1740     {
1741       status = errno;
1742       if (status != EINTR)
1743       {
1744         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1745       }
1746       continue;
1747     }
1748
1749     for (i = 0; i < pollfds_num; i++)
1750     {
1751       int *client_sd;
1752       struct sockaddr_storage client_sa;
1753       socklen_t client_sa_size;
1754       pthread_t tid;
1755       pthread_attr_t attr;
1756
1757       if (pollfds[i].revents == 0)
1758         continue;
1759
1760       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1761       {
1762         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1763             "poll(2) returned something unexpected for listen FD #%i.",
1764             pollfds[i].fd);
1765         continue;
1766       }
1767
1768       client_sd = (int *) malloc (sizeof (int));
1769       if (client_sd == NULL)
1770       {
1771         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1772         continue;
1773       }
1774
1775       client_sa_size = sizeof (client_sa);
1776       *client_sd = accept (pollfds[i].fd,
1777           (struct sockaddr *) &client_sa, &client_sa_size);
1778       if (*client_sd < 0)
1779       {
1780         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1781         continue;
1782       }
1783
1784       pthread_attr_init (&attr);
1785       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1786
1787       status = pthread_create (&tid, &attr, connection_thread_main,
1788           /* args = */ (void *) client_sd);
1789       if (status != 0)
1790       {
1791         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1792         close (*client_sd);
1793         free (client_sd);
1794         continue;
1795       }
1796     } /* for (pollfds_num) */
1797   } /* while (do_shutdown == 0) */
1798
1799   RRDD_LOG(LOG_INFO, "starting shutdown");
1800
1801   close_listen_sockets ();
1802
1803   pthread_mutex_lock (&connection_threads_lock);
1804   while (connection_threads_num > 0)
1805   {
1806     pthread_t wait_for;
1807
1808     wait_for = connection_threads[0];
1809
1810     pthread_mutex_unlock (&connection_threads_lock);
1811     pthread_join (wait_for, /* retval = */ NULL);
1812     pthread_mutex_lock (&connection_threads_lock);
1813   }
1814   pthread_mutex_unlock (&connection_threads_lock);
1815
1816   return (NULL);
1817 } /* }}} void *listen_thread_main */
1818
1819 static int daemonize (void) /* {{{ */
1820 {
1821   int status;
1822   int fd;
1823
1824   /* These structures are static, because `sigaction' behaves weird if the are
1825    * overwritten.. */
1826   static struct sigaction sa_int;
1827   static struct sigaction sa_term;
1828   static struct sigaction sa_pipe;
1829
1830   fd = open_pidfile();
1831   if (fd < 0) return fd;
1832
1833   if (!stay_foreground)
1834   {
1835     pid_t child;
1836     char *base_dir;
1837
1838     child = fork ();
1839     if (child < 0)
1840     {
1841       fprintf (stderr, "daemonize: fork(2) failed.\n");
1842       return (-1);
1843     }
1844     else if (child > 0)
1845     {
1846       return (1);
1847     }
1848
1849     /* Change into the /tmp directory. */
1850     base_dir = (config_base_dir != NULL)
1851       ? config_base_dir
1852       : "/tmp";
1853     status = chdir (base_dir);
1854     if (status != 0)
1855     {
1856       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1857       return (-1);
1858     }
1859
1860     /* Become session leader */
1861     setsid ();
1862
1863     /* Open the first three file descriptors to /dev/null */
1864     close (2);
1865     close (1);
1866     close (0);
1867
1868     open ("/dev/null", O_RDWR);
1869     dup (0);
1870     dup (0);
1871   } /* if (!stay_foreground) */
1872
1873   /* Install signal handlers */
1874   memset (&sa_int, 0, sizeof (sa_int));
1875   sa_int.sa_handler = sig_int_handler;
1876   sigaction (SIGINT, &sa_int, NULL);
1877
1878   memset (&sa_term, 0, sizeof (sa_term));
1879   sa_term.sa_handler = sig_term_handler;
1880   sigaction (SIGTERM, &sa_term, NULL);
1881
1882   memset (&sa_pipe, 0, sizeof (sa_pipe));
1883   sa_pipe.sa_handler = SIG_IGN;
1884   sigaction (SIGPIPE, &sa_pipe, NULL);
1885
1886   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1887   RRDD_LOG(LOG_INFO, "starting up");
1888
1889   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1890   if (cache_tree == NULL)
1891   {
1892     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1893     return (-1);
1894   }
1895
1896   status = write_pidfile (fd);
1897   return status;
1898 } /* }}} int daemonize */
1899
1900 static int cleanup (void) /* {{{ */
1901 {
1902   do_shutdown++;
1903
1904   pthread_cond_signal (&cache_cond);
1905   pthread_join (queue_thread, /* return = */ NULL);
1906
1907   remove_pidfile ();
1908
1909   RRDD_LOG(LOG_INFO, "goodbye");
1910   closelog ();
1911
1912   return (0);
1913 } /* }}} int cleanup */
1914
1915 static int read_options (int argc, char **argv) /* {{{ */
1916 {
1917   int option;
1918   int status = 0;
1919
1920   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1921   {
1922     switch (option)
1923     {
1924       case 'g':
1925         stay_foreground=1;
1926         break;
1927
1928       case 'l':
1929       {
1930         char **temp;
1931
1932         temp = (char **) realloc (config_listen_address_list,
1933             sizeof (char *) * (config_listen_address_list_len + 1));
1934         if (temp == NULL)
1935         {
1936           fprintf (stderr, "read_options: realloc failed.\n");
1937           return (2);
1938         }
1939         config_listen_address_list = temp;
1940
1941         temp[config_listen_address_list_len] = strdup (optarg);
1942         if (temp[config_listen_address_list_len] == NULL)
1943         {
1944           fprintf (stderr, "read_options: strdup failed.\n");
1945           return (2);
1946         }
1947         config_listen_address_list_len++;
1948       }
1949       break;
1950
1951       case 'f':
1952       {
1953         int temp;
1954
1955         temp = atoi (optarg);
1956         if (temp > 0)
1957           config_flush_interval = temp;
1958         else
1959         {
1960           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1961           status = 3;
1962         }
1963       }
1964       break;
1965
1966       case 'w':
1967       {
1968         int temp;
1969
1970         temp = atoi (optarg);
1971         if (temp > 0)
1972           config_write_interval = temp;
1973         else
1974         {
1975           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1976           status = 2;
1977         }
1978       }
1979       break;
1980
1981       case 'z':
1982       {
1983         int temp;
1984
1985         temp = atoi(optarg);
1986         if (temp > 0)
1987           config_write_jitter = temp;
1988         else
1989         {
1990           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1991           status = 2;
1992         }
1993
1994         break;
1995       }
1996
1997       case 'b':
1998       {
1999         size_t len;
2000
2001         if (config_base_dir != NULL)
2002           free (config_base_dir);
2003         config_base_dir = strdup (optarg);
2004         if (config_base_dir == NULL)
2005         {
2006           fprintf (stderr, "read_options: strdup failed.\n");
2007           return (3);
2008         }
2009
2010         len = strlen (config_base_dir);
2011         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2012         {
2013           config_base_dir[len - 1] = 0;
2014           len--;
2015         }
2016
2017         if (len < 1)
2018         {
2019           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2020           return (4);
2021         }
2022       }
2023       break;
2024
2025       case 'p':
2026       {
2027         if (config_pid_file != NULL)
2028           free (config_pid_file);
2029         config_pid_file = strdup (optarg);
2030         if (config_pid_file == NULL)
2031         {
2032           fprintf (stderr, "read_options: strdup failed.\n");
2033           return (3);
2034         }
2035       }
2036       break;
2037
2038       case 'j':
2039       {
2040         struct stat statbuf;
2041         const char *dir = optarg;
2042
2043         status = stat(dir, &statbuf);
2044         if (status != 0)
2045         {
2046           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2047           return 6;
2048         }
2049
2050         if (!S_ISDIR(statbuf.st_mode)
2051             || access(dir, R_OK|W_OK|X_OK) != 0)
2052         {
2053           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2054                   errno ? rrd_strerror(errno) : "");
2055           return 6;
2056         }
2057
2058         journal_cur = malloc(PATH_MAX + 1);
2059         journal_old = malloc(PATH_MAX + 1);
2060         if (journal_cur == NULL || journal_old == NULL)
2061         {
2062           fprintf(stderr, "malloc failure for journal files\n");
2063           return 6;
2064         }
2065         else 
2066         {
2067           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2068           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2069         }
2070       }
2071       break;
2072
2073       case 'h':
2074       case '?':
2075         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2076             "\n"
2077             "Usage: rrdcached [options]\n"
2078             "\n"
2079             "Valid options are:\n"
2080             "  -l <address>  Socket address to listen to.\n"
2081             "  -w <seconds>  Interval in which to write data.\n"
2082             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2083             "  -f <seconds>  Interval in which to flush dead data.\n"
2084             "  -p <file>     Location of the PID-file.\n"
2085             "  -b <dir>      Base directory to change to.\n"
2086             "  -g            Do not fork and run in the foreground.\n"
2087             "  -j <dir>      Directory in which to create the journal files.\n"
2088             "\n"
2089             "For more information and a detailed description of all options "
2090             "please refer\n"
2091             "to the rrdcached(1) manual page.\n",
2092             VERSION);
2093         status = -1;
2094         break;
2095     } /* switch (option) */
2096   } /* while (getopt) */
2097
2098   /* advise the user when values are not sane */
2099   if (config_flush_interval < 2 * config_write_interval)
2100     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2101             " 2x write interval (-w) !\n");
2102   if (config_write_jitter > config_write_interval)
2103     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2104             " write interval (-w) !\n");
2105
2106   return (status);
2107 } /* }}} int read_options */
2108
2109 int main (int argc, char **argv)
2110 {
2111   int status;
2112
2113   status = read_options (argc, argv);
2114   if (status != 0)
2115   {
2116     if (status < 0)
2117       status = 0;
2118     return (status);
2119   }
2120
2121   status = daemonize ();
2122   if (status == 1)
2123   {
2124     struct sigaction sigchld;
2125
2126     memset (&sigchld, 0, sizeof (sigchld));
2127     sigchld.sa_handler = SIG_IGN;
2128     sigaction (SIGCHLD, &sigchld, NULL);
2129
2130     return (0);
2131   }
2132   else if (status != 0)
2133   {
2134     fprintf (stderr, "daemonize failed, exiting.\n");
2135     return (1);
2136   }
2137
2138   if (journal_cur != NULL)
2139   {
2140     int had_journal = 0;
2141
2142     pthread_mutex_lock(&journal_lock);
2143
2144     RRDD_LOG(LOG_INFO, "checking for journal files");
2145
2146     had_journal += journal_replay(journal_old);
2147     had_journal += journal_replay(journal_cur);
2148
2149     if (had_journal)
2150       flush_old_values(-1);
2151
2152     pthread_mutex_unlock(&journal_lock);
2153     journal_rotate();
2154
2155     RRDD_LOG(LOG_INFO, "journal processing complete");
2156   }
2157
2158   /* start the queue thread */
2159   memset (&queue_thread, 0, sizeof (queue_thread));
2160   status = pthread_create (&queue_thread,
2161                            NULL, /* attr */
2162                            queue_thread_main,
2163                            NULL); /* args */
2164   if (status != 0)
2165   {
2166     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2167     cleanup();
2168     return (1);
2169   }
2170
2171   listen_thread_main (NULL);
2172   cleanup ();
2173
2174   return (0);
2175 } /* int main */
2176
2177 /*
2178  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2179  */