move cache broadcast into enqueue_cache_item -- kevin brintnall
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 struct listen_socket_s
105 {
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
110
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
125
126 struct callback_flush_data_s
127 {
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
134
135 enum queue_side_e
136 {
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
141
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
144
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
149
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
152
153 static int do_shutdown = 0;
154
155 static pthread_t queue_thread;
156
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
160
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
167
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
173
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
176
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
185
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
194
195 /* 
196  * Functions
197  */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
199 {
200   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201   do_shutdown++;
202   pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
204
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
206 {
207   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208   do_shutdown++;
209   pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
211
212 static int open_pidfile(void) /* {{{ */
213 {
214   int fd;
215   char *file;
216
217   file = (config_pid_file != NULL)
218     ? config_pid_file
219     : LOCALSTATEDIR "/run/rrdcached.pid";
220
221   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
222   if (fd < 0)
223     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
224             file, rrd_strerror(errno));
225
226   return(fd);
227 }
228
229 static int write_pidfile (int fd) /* {{{ */
230 {
231   pid_t pid;
232   FILE *fh;
233
234   pid = getpid ();
235
236   fh = fdopen (fd, "w");
237   if (fh == NULL)
238   {
239     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
240     close(fd);
241     return (-1);
242   }
243
244   fprintf (fh, "%i\n", (int) pid);
245   fclose (fh);
246
247   return (0);
248 } /* }}} int write_pidfile */
249
250 static int remove_pidfile (void) /* {{{ */
251 {
252   char *file;
253   int status;
254
255   file = (config_pid_file != NULL)
256     ? config_pid_file
257     : LOCALSTATEDIR "/run/rrdcached.pid";
258
259   status = unlink (file);
260   if (status == 0)
261     return (0);
262   return (errno);
263 } /* }}} int remove_pidfile */
264
265 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
266 {
267   char    *buffer;
268   size_t   buffer_used;
269   size_t   buffer_free;
270   ssize_t  status;
271
272   buffer       = (char *) buffer_void;
273   buffer_used  = 0;
274   buffer_free  = buffer_size;
275
276   while (buffer_free > 0)
277   {
278     status = read (fd, buffer + buffer_used, buffer_free);
279     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
280       continue;
281
282     if (status < 0)
283       return (-1);
284
285     if (status == 0)
286       return (0);
287
288     assert ((0 > status) || (buffer_free >= (size_t) status));
289
290     buffer_free = buffer_free - status;
291     buffer_used = buffer_used + status;
292
293     if (buffer[buffer_used - 1] == '\n')
294       break;
295   }
296
297   assert (buffer_used > 0);
298
299   if (buffer[buffer_used - 1] != '\n')
300   {
301     errno = ENOBUFS;
302     return (-1);
303   }
304
305   buffer[buffer_used - 1] = 0;
306
307   /* Fix network line endings. */
308   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
309   {
310     buffer_used--;
311     buffer[buffer_used - 1] = 0;
312   }
313
314   return (buffer_used);
315 } /* }}} ssize_t sread */
316
317 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
318 {
319   const char *ptr;
320   size_t      nleft;
321   ssize_t     status;
322
323   /* special case for journal replay */
324   if (fd < 0) return 0;
325
326   ptr   = (const char *) buf;
327   nleft = count;
328
329   while (nleft > 0)
330   {
331     status = write (fd, (const void *) ptr, nleft);
332
333     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
334       continue;
335
336     if (status < 0)
337       return (status);
338
339     nleft -= status;
340     ptr   += status;
341   }
342
343   return (0);
344 } /* }}} ssize_t swrite */
345
346 static void _wipe_ci_values(cache_item_t *ci, time_t when)
347 {
348   ci->values = NULL;
349   ci->values_num = 0;
350
351   ci->last_flush_time = when;
352   if (config_write_jitter > 0)
353     ci->last_flush_time += (random() % config_write_jitter);
354
355   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
356 }
357
358 /*
359  * enqueue_cache_item:
360  * `cache_lock' must be acquired before calling this function!
361  */
362 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
363     queue_side_t side)
364 {
365   int did_insert = 0;
366
367   if (ci == NULL)
368     return (-1);
369
370   if (ci->values_num == 0)
371     return (0);
372
373   if (side == HEAD)
374   {
375     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
376     {
377       assert (ci->next == NULL);
378       ci->next = cache_queue_head;
379       cache_queue_head = ci;
380
381       if (cache_queue_tail == NULL)
382         cache_queue_tail = cache_queue_head;
383
384       did_insert = 1;
385     }
386     else if (cache_queue_head == ci)
387     {
388       /* do nothing */
389     }
390     else /* enqueued, but not first entry */
391     {
392       cache_item_t *prev;
393
394       /* find previous entry */
395       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
396         if (prev->next == ci)
397           break;
398       assert (prev != NULL);
399
400       /* move to the front */
401       prev->next = ci->next;
402       ci->next = cache_queue_head;
403       cache_queue_head = ci;
404
405       /* check if we need to adapt the tail */
406       if (cache_queue_tail == ci)
407         cache_queue_tail = prev;
408     }
409   }
410   else /* (side == TAIL) */
411   {
412     /* We don't move values back in the list.. */
413     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
414       return (0);
415
416     assert (ci->next == NULL);
417
418     if (cache_queue_tail == NULL)
419       cache_queue_head = ci;
420     else
421       cache_queue_tail->next = ci;
422     cache_queue_tail = ci;
423
424     did_insert = 1;
425   }
426
427   ci->flags |= CI_FLAGS_IN_QUEUE;
428
429   if (did_insert)
430   {
431     pthread_cond_broadcast(&cache_cond);
432     pthread_mutex_lock (&stats_lock);
433     stats_queue_length++;
434     pthread_mutex_unlock (&stats_lock);
435   }
436
437   return (0);
438 } /* }}} int enqueue_cache_item */
439
440 /*
441  * tree_callback_flush:
442  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
443  * while this is in progress.
444  */
445 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
446     gpointer data)
447 {
448   cache_item_t *ci;
449   callback_flush_data_t *cfd;
450
451   ci = (cache_item_t *) value;
452   cfd = (callback_flush_data_t *) data;
453
454   if ((ci->last_flush_time <= cfd->abs_timeout)
455       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
456       && (ci->values_num > 0))
457   {
458     enqueue_cache_item (ci, TAIL);
459   }
460   else if ((do_shutdown != 0)
461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
462       && (ci->values_num > 0))
463   {
464     enqueue_cache_item (ci, TAIL);
465   }
466   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
467       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
468       && (ci->values_num <= 0))
469   {
470     char **temp;
471
472     temp = (char **) realloc (cfd->keys,
473         sizeof (char *) * (cfd->keys_num + 1));
474     if (temp == NULL)
475     {
476       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
477       return (FALSE);
478     }
479     cfd->keys = temp;
480     /* Make really sure this points to the _same_ place */
481     assert ((char *) key == ci->file);
482     cfd->keys[cfd->keys_num] = (char *) key;
483     cfd->keys_num++;
484   }
485
486   return (FALSE);
487 } /* }}} gboolean tree_callback_flush */
488
489 static int flush_old_values (int max_age)
490 {
491   callback_flush_data_t cfd;
492   size_t k;
493
494   memset (&cfd, 0, sizeof (cfd));
495   /* Pass the current time as user data so that we don't need to call
496    * `time' for each node. */
497   cfd.now = time (NULL);
498   cfd.keys = NULL;
499   cfd.keys_num = 0;
500
501   if (max_age > 0)
502     cfd.abs_timeout = cfd.now - max_age;
503   else
504     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
505
506   /* `tree_callback_flush' will return the keys of all values that haven't
507    * been touched in the last `config_flush_interval' seconds in `cfd'.
508    * The char*'s in this array point to the same memory as ci->file, so we
509    * don't need to free them separately. */
510   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
511
512   for (k = 0; k < cfd.keys_num; k++)
513   {
514     cache_item_t *ci;
515
516     /* This must not fail. */
517     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
518     assert (ci != NULL);
519
520     /* If we end up here with values available, something's seriously
521      * messed up. */
522     assert (ci->values_num == 0);
523
524     /* Remove the node from the tree */
525     g_tree_remove (cache_tree, cfd.keys[k]);
526     cfd.keys[k] = NULL;
527
528     /* Now free and clean up `ci'. */
529     free (ci->file);
530     ci->file = NULL;
531     free (ci);
532     ci = NULL;
533   } /* for (k = 0; k < cfd.keys_num; k++) */
534
535   if (cfd.keys != NULL)
536   {
537     free (cfd.keys);
538     cfd.keys = NULL;
539   }
540
541   return (0);
542 } /* int flush_old_values */
543
544 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
545 {
546   struct timeval now;
547   struct timespec next_flush;
548
549   gettimeofday (&now, NULL);
550   next_flush.tv_sec = now.tv_sec + config_flush_interval;
551   next_flush.tv_nsec = 1000 * now.tv_usec;
552
553   pthread_mutex_lock (&cache_lock);
554   while ((do_shutdown == 0) || (cache_queue_head != NULL))
555   {
556     cache_item_t *ci;
557     char *file;
558     char **values;
559     int values_num;
560     int status;
561     int i;
562
563     /* First, check if it's time to do the cache flush. */
564     gettimeofday (&now, NULL);
565     if ((now.tv_sec > next_flush.tv_sec)
566         || ((now.tv_sec == next_flush.tv_sec)
567           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
568     {
569       /* Flush all values that haven't been written in the last
570        * `config_write_interval' seconds. */
571       flush_old_values (config_write_interval);
572
573       /* Determine the time of the next cache flush. */
574       while (next_flush.tv_sec <= now.tv_sec)
575         next_flush.tv_sec += config_flush_interval;
576
577       /* unlock the cache while we rotate so we don't block incoming
578        * updates if the fsync() blocks on disk I/O */
579       pthread_mutex_unlock(&cache_lock);
580       journal_rotate();
581       pthread_mutex_lock(&cache_lock);
582     }
583
584     /* Now, check if there's something to store away. If not, wait until
585      * something comes in or it's time to do the cache flush. */
586     if (cache_queue_head == NULL)
587     {
588       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
589       if ((status != 0) && (status != ETIMEDOUT))
590       {
591         RRDD_LOG (LOG_ERR, "queue_thread_main: "
592             "pthread_cond_timedwait returned %i.", status);
593       }
594     }
595
596     /* We're about to shut down, so lets flush the entire tree. */
597     if ((do_shutdown != 0) && (cache_queue_head == NULL))
598       flush_old_values (/* max age = */ -1);
599
600     /* Check if a value has arrived. This may be NULL if we timed out or there
601      * was an interrupt such as a signal. */
602     if (cache_queue_head == NULL)
603       continue;
604
605     ci = cache_queue_head;
606
607     /* copy the relevant parts */
608     file = strdup (ci->file);
609     if (file == NULL)
610     {
611       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
612       continue;
613     }
614
615     assert(ci->values != NULL);
616     assert(ci->values_num > 0);
617
618     values = ci->values;
619     values_num = ci->values_num;
620
621     _wipe_ci_values(ci, time(NULL));
622
623     cache_queue_head = ci->next;
624     if (cache_queue_head == NULL)
625       cache_queue_tail = NULL;
626     ci->next = NULL;
627
628     pthread_mutex_lock (&stats_lock);
629     assert (stats_queue_length > 0);
630     stats_queue_length--;
631     pthread_mutex_unlock (&stats_lock);
632
633     pthread_mutex_unlock (&cache_lock);
634
635     rrd_clear_error ();
636     status = rrd_update_r (file, NULL, values_num, (void *) values);
637     if (status != 0)
638     {
639       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
640           "rrd_update_r (%s) failed with status %i. (%s)",
641           file, status, rrd_get_error());
642     }
643
644     journal_write("wrote", file);
645     pthread_cond_broadcast(&ci->flushed);
646
647     for (i = 0; i < values_num; i++)
648       free (values[i]);
649
650     free(values);
651     free(file);
652
653     if (status == 0)
654     {
655       pthread_mutex_lock (&stats_lock);
656       stats_updates_written++;
657       stats_data_sets_written += values_num;
658       pthread_mutex_unlock (&stats_lock);
659     }
660
661     pthread_mutex_lock (&cache_lock);
662
663     /* We're about to shut down, so lets flush the entire tree. */
664     if ((do_shutdown != 0) && (cache_queue_head == NULL))
665       flush_old_values (/* max age = */ -1);
666   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
667   pthread_mutex_unlock (&cache_lock);
668
669   assert(cache_queue_head == NULL);
670   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
671   journal_done();
672
673   return (NULL);
674 } /* }}} void *queue_thread_main */
675
676 static int buffer_get_field (char **buffer_ret, /* {{{ */
677     size_t *buffer_size_ret, char **field_ret)
678 {
679   char *buffer;
680   size_t buffer_pos;
681   size_t buffer_size;
682   char *field;
683   size_t field_size;
684   int status;
685
686   buffer = *buffer_ret;
687   buffer_pos = 0;
688   buffer_size = *buffer_size_ret;
689   field = *buffer_ret;
690   field_size = 0;
691
692   if (buffer_size <= 0)
693     return (-1);
694
695   /* This is ensured by `handle_request'. */
696   assert (buffer[buffer_size - 1] == '\0');
697
698   status = -1;
699   while (buffer_pos < buffer_size)
700   {
701     /* Check for end-of-field or end-of-buffer */
702     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
703     {
704       field[field_size] = 0;
705       field_size++;
706       buffer_pos++;
707       status = 0;
708       break;
709     }
710     /* Handle escaped characters. */
711     else if (buffer[buffer_pos] == '\\')
712     {
713       if (buffer_pos >= (buffer_size - 1))
714         break;
715       buffer_pos++;
716       field[field_size] = buffer[buffer_pos];
717       field_size++;
718       buffer_pos++;
719     }
720     /* Normal operation */ 
721     else
722     {
723       field[field_size] = buffer[buffer_pos];
724       field_size++;
725       buffer_pos++;
726     }
727   } /* while (buffer_pos < buffer_size) */
728
729   if (status != 0)
730     return (status);
731
732   *buffer_ret = buffer + buffer_pos;
733   *buffer_size_ret = buffer_size - buffer_pos;
734   *field_ret = field;
735
736   return (0);
737 } /* }}} int buffer_get_field */
738
739 static int flush_file (const char *filename) /* {{{ */
740 {
741   cache_item_t *ci;
742
743   pthread_mutex_lock (&cache_lock);
744
745   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
746   if (ci == NULL)
747   {
748     pthread_mutex_unlock (&cache_lock);
749     return (ENOENT);
750   }
751
752   /* Enqueue at head */
753   enqueue_cache_item (ci, HEAD);
754
755   pthread_cond_wait(&ci->flushed, &cache_lock);
756   pthread_mutex_unlock(&cache_lock);
757
758   return (0);
759 } /* }}} int flush_file */
760
761 static int handle_request_help (int fd, /* {{{ */
762     char *buffer, size_t buffer_size)
763 {
764   int status;
765   char **help_text;
766   size_t help_text_len;
767   char *command;
768   size_t i;
769
770   char *help_help[] =
771   {
772     "4 Command overview\n",
773     "FLUSH <filename>\n",
774     "HELP [<command>]\n",
775     "UPDATE <filename> <values> [<values> ...]\n",
776     "STATS\n"
777   };
778   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
779
780   char *help_flush[] =
781   {
782     "4 Help for FLUSH\n",
783     "Usage: FLUSH <filename>\n",
784     "\n",
785     "Adds the given filename to the head of the update queue and returns\n",
786     "after is has been dequeued.\n"
787   };
788   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
789
790   char *help_update[] =
791   {
792     "9 Help for UPDATE\n",
793     "Usage: UPDATE <filename> <values> [<values> ...]\n"
794     "\n",
795     "Adds the given file to the internal cache if it is not yet known and\n",
796     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
797     "for details.\n",
798     "\n",
799     "Each <values> has the following form:\n",
800     "  <values> = <time>:<value>[:<value>[...]]\n",
801     "See the rrdupdate(1) manpage for details.\n"
802   };
803   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
804
805   char *help_stats[] =
806   {
807     "4 Help for STATS\n",
808     "Usage: STATS\n",
809     "\n",
810     "Returns some performance counters, see the rrdcached(1) manpage for\n",
811     "a description of the values.\n"
812   };
813   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
814
815   status = buffer_get_field (&buffer, &buffer_size, &command);
816   if (status != 0)
817   {
818     help_text = help_help;
819     help_text_len = help_help_len;
820   }
821   else
822   {
823     if (strcasecmp (command, "update") == 0)
824     {
825       help_text = help_update;
826       help_text_len = help_update_len;
827     }
828     else if (strcasecmp (command, "flush") == 0)
829     {
830       help_text = help_flush;
831       help_text_len = help_flush_len;
832     }
833     else if (strcasecmp (command, "stats") == 0)
834     {
835       help_text = help_stats;
836       help_text_len = help_stats_len;
837     }
838     else
839     {
840       help_text = help_help;
841       help_text_len = help_help_len;
842     }
843   }
844
845   for (i = 0; i < help_text_len; i++)
846   {
847     status = swrite (fd, help_text[i], strlen (help_text[i]));
848     if (status < 0)
849     {
850       status = errno;
851       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
852       return (status);
853     }
854   }
855
856   return (0);
857 } /* }}} int handle_request_help */
858
859 static int handle_request_stats (int fd, /* {{{ */
860     char *buffer __attribute__((unused)),
861     size_t buffer_size __attribute__((unused)))
862 {
863   int status;
864   char outbuf[CMD_MAX];
865
866   uint64_t copy_queue_length;
867   uint64_t copy_updates_received;
868   uint64_t copy_flush_received;
869   uint64_t copy_updates_written;
870   uint64_t copy_data_sets_written;
871   uint64_t copy_journal_bytes;
872   uint64_t copy_journal_rotate;
873
874   uint64_t tree_nodes_number;
875   uint64_t tree_depth;
876
877   pthread_mutex_lock (&stats_lock);
878   copy_queue_length       = stats_queue_length;
879   copy_updates_received   = stats_updates_received;
880   copy_flush_received     = stats_flush_received;
881   copy_updates_written    = stats_updates_written;
882   copy_data_sets_written  = stats_data_sets_written;
883   copy_journal_bytes      = stats_journal_bytes;
884   copy_journal_rotate     = stats_journal_rotate;
885   pthread_mutex_unlock (&stats_lock);
886
887   pthread_mutex_lock (&cache_lock);
888   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
889   tree_depth        = (uint64_t) g_tree_height (cache_tree);
890   pthread_mutex_unlock (&cache_lock);
891
892 #define RRDD_STATS_SEND \
893   outbuf[sizeof (outbuf) - 1] = 0; \
894   status = swrite (fd, outbuf, strlen (outbuf)); \
895   if (status < 0) \
896   { \
897     status = errno; \
898     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
899     return (status); \
900   }
901
902   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
903   RRDD_STATS_SEND;
904
905   snprintf (outbuf, sizeof (outbuf),
906       "QueueLength: %"PRIu64"\n", copy_queue_length);
907   RRDD_STATS_SEND;
908
909   snprintf (outbuf, sizeof (outbuf),
910       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
911   RRDD_STATS_SEND;
912
913   snprintf (outbuf, sizeof (outbuf),
914       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
915   RRDD_STATS_SEND;
916
917   snprintf (outbuf, sizeof (outbuf),
918       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
919   RRDD_STATS_SEND;
920
921   snprintf (outbuf, sizeof (outbuf),
922       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
923   RRDD_STATS_SEND;
924
925   snprintf (outbuf, sizeof (outbuf),
926       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
927   RRDD_STATS_SEND;
928
929   snprintf (outbuf, sizeof (outbuf),
930       "TreeDepth: %"PRIu64"\n", tree_depth);
931   RRDD_STATS_SEND;
932
933   snprintf (outbuf, sizeof(outbuf),
934       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
935   RRDD_STATS_SEND;
936
937   snprintf (outbuf, sizeof(outbuf),
938       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
939   RRDD_STATS_SEND;
940
941   return (0);
942 #undef RRDD_STATS_SEND
943 } /* }}} int handle_request_stats */
944
945 static int handle_request_flush (int fd, /* {{{ */
946     char *buffer, size_t buffer_size)
947 {
948   char *file;
949   int status;
950   char result[CMD_MAX];
951
952   status = buffer_get_field (&buffer, &buffer_size, &file);
953   if (status != 0)
954   {
955     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
956   }
957   else
958   {
959     pthread_mutex_lock(&stats_lock);
960     stats_flush_received++;
961     pthread_mutex_unlock(&stats_lock);
962
963     status = flush_file (file);
964     if (status == 0)
965       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
966     else if (status == ENOENT)
967     {
968       /* no file in our tree; see whether it exists at all */
969       struct stat statbuf;
970
971       memset(&statbuf, 0, sizeof(statbuf));
972       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
973         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
974       else
975         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
976     }
977     else if (status < 0)
978       strncpy (result, "-1 Internal error.\n", sizeof (result));
979     else
980       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
981   }
982   result[sizeof (result) - 1] = 0;
983
984   status = swrite (fd, result, strlen (result));
985   if (status < 0)
986   {
987     status = errno;
988     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
989     return (status);
990   }
991
992   return (0);
993 } /* }}} int handle_request_flush */
994
995 static int handle_request_update (int fd, /* {{{ */
996     char *buffer, size_t buffer_size)
997 {
998   char *file;
999   int values_num = 0;
1000   int status;
1001
1002   time_t now;
1003
1004   cache_item_t *ci;
1005   char answer[CMD_MAX];
1006
1007 #define RRDD_UPDATE_SEND \
1008   answer[sizeof (answer) - 1] = 0; \
1009   status = swrite (fd, answer, strlen (answer)); \
1010   if (status < 0) \
1011   { \
1012     status = errno; \
1013     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1014     return (status); \
1015   }
1016
1017   now = time (NULL);
1018
1019   status = buffer_get_field (&buffer, &buffer_size, &file);
1020   if (status != 0)
1021   {
1022     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1023         sizeof (answer));
1024     RRDD_UPDATE_SEND;
1025     return (0);
1026   }
1027
1028   pthread_mutex_lock(&stats_lock);
1029   stats_updates_received++;
1030   pthread_mutex_unlock(&stats_lock);
1031
1032   pthread_mutex_lock (&cache_lock);
1033   ci = g_tree_lookup (cache_tree, file);
1034
1035   if (ci == NULL) /* {{{ */
1036   {
1037     struct stat statbuf;
1038
1039     /* don't hold the lock while we setup; stat(2) might block */
1040     pthread_mutex_unlock(&cache_lock);
1041
1042     memset (&statbuf, 0, sizeof (statbuf));
1043     status = stat (file, &statbuf);
1044     if (status != 0)
1045     {
1046       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1047
1048       status = errno;
1049       if (status == ENOENT)
1050         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1051       else
1052         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1053             status);
1054       RRDD_UPDATE_SEND;
1055       return (0);
1056     }
1057     if (!S_ISREG (statbuf.st_mode))
1058     {
1059       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1060       RRDD_UPDATE_SEND;
1061       return (0);
1062     }
1063     if (access(file, R_OK|W_OK) != 0)
1064     {
1065       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1066                 file, rrd_strerror(errno));
1067       RRDD_UPDATE_SEND;
1068       return (0);
1069     }
1070
1071     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1072     if (ci == NULL)
1073     {
1074       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1075
1076       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1077       RRDD_UPDATE_SEND;
1078       return (0);
1079     }
1080     memset (ci, 0, sizeof (cache_item_t));
1081
1082     ci->file = strdup (file);
1083     if (ci->file == NULL)
1084     {
1085       free (ci);
1086       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1087
1088       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1089       RRDD_UPDATE_SEND;
1090       return (0);
1091     }
1092
1093     _wipe_ci_values(ci, now);
1094     ci->flags = CI_FLAGS_IN_TREE;
1095
1096     pthread_mutex_lock(&cache_lock);
1097     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1098   } /* }}} */
1099   assert (ci != NULL);
1100
1101   while (buffer_size > 0)
1102   {
1103     char **temp;
1104     char *value;
1105
1106     status = buffer_get_field (&buffer, &buffer_size, &value);
1107     if (status != 0)
1108     {
1109       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1110       break;
1111     }
1112
1113     temp = (char **) realloc (ci->values,
1114         sizeof (char *) * (ci->values_num + 1));
1115     if (temp == NULL)
1116     {
1117       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1118       continue;
1119     }
1120     ci->values = temp;
1121
1122     ci->values[ci->values_num] = strdup (value);
1123     if (ci->values[ci->values_num] == NULL)
1124     {
1125       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1126       continue;
1127     }
1128     ci->values_num++;
1129
1130     values_num++;
1131   }
1132
1133   if (((now - ci->last_flush_time) >= config_write_interval)
1134       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1135       && (ci->values_num > 0))
1136   {
1137     enqueue_cache_item (ci, TAIL);
1138   }
1139
1140   pthread_mutex_unlock (&cache_lock);
1141
1142   if (values_num < 1)
1143   {
1144     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1145   }
1146   else
1147   {
1148     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1149         (values_num == 1) ? "" : "s");
1150   }
1151   RRDD_UPDATE_SEND;
1152   return (0);
1153 #undef RRDD_UPDATE_SEND
1154 } /* }}} int handle_request_update */
1155
1156 /* we came across a "WROTE" entry during journal replay.
1157  * throw away any values that we have accumulated for this file
1158  */
1159 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1160                                  const char *buffer,
1161                                  size_t buffer_size __attribute__((unused)))
1162 {
1163   int i;
1164   cache_item_t *ci;
1165   const char *file = buffer;
1166
1167   pthread_mutex_lock(&cache_lock);
1168
1169   ci = g_tree_lookup(cache_tree, file);
1170   if (ci == NULL)
1171   {
1172     pthread_mutex_unlock(&cache_lock);
1173     return (0);
1174   }
1175
1176   if (ci->values)
1177   {
1178     for (i=0; i < ci->values_num; i++)
1179       free(ci->values[i]);
1180
1181     free(ci->values);
1182   }
1183
1184   _wipe_ci_values(ci, time(NULL));
1185
1186   pthread_mutex_unlock(&cache_lock);
1187   return (0);
1188 } /* }}} int handle_request_wrote */
1189
1190 /* if fd < 0, we are in journal replay mode */
1191 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1192 {
1193   char *buffer_ptr;
1194   char *command;
1195   int status;
1196
1197   assert (buffer[buffer_size - 1] == '\0');
1198
1199   buffer_ptr = buffer;
1200   command = NULL;
1201   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1202   if (status != 0)
1203   {
1204     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1205     return (-1);
1206   }
1207
1208   if (strcasecmp (command, "update") == 0)
1209   {
1210     /* don't re-write updates in replay mode */
1211     if (fd >= 0)
1212       journal_write(command, buffer_ptr);
1213
1214     return (handle_request_update (fd, buffer_ptr, buffer_size));
1215   }
1216   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1217   {
1218     /* this is only valid in replay mode */
1219     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1220   }
1221   else if (strcasecmp (command, "flush") == 0)
1222   {
1223     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1224   }
1225   else if (strcasecmp (command, "stats") == 0)
1226   {
1227     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1228   }
1229   else if (strcasecmp (command, "help") == 0)
1230   {
1231     return (handle_request_help (fd, buffer_ptr, buffer_size));
1232   }
1233   else
1234   {
1235     char result[CMD_MAX];
1236
1237     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1238     result[sizeof (result) - 1] = 0;
1239
1240     status = swrite (fd, result, strlen (result));
1241     if (status < 0)
1242     {
1243       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1244       return (-1);
1245     }
1246   }
1247
1248   return (0);
1249 } /* }}} int handle_request */
1250
1251 /* MUST NOT hold journal_lock before calling this */
1252 static void journal_rotate(void) /* {{{ */
1253 {
1254   FILE *old_fh = NULL;
1255
1256   if (journal_cur == NULL || journal_old == NULL)
1257     return;
1258
1259   pthread_mutex_lock(&journal_lock);
1260
1261   /* we rotate this way (rename before close) so that the we can release
1262    * the journal lock as fast as possible.  Journal writes to the new
1263    * journal can proceed immediately after the new file is opened.  The
1264    * fclose can then block without affecting new updates.
1265    */
1266   if (journal_fh != NULL)
1267   {
1268     old_fh = journal_fh;
1269     rename(journal_cur, journal_old);
1270     ++stats_journal_rotate;
1271   }
1272
1273   journal_fh = fopen(journal_cur, "a");
1274   pthread_mutex_unlock(&journal_lock);
1275
1276   if (old_fh != NULL)
1277     fclose(old_fh);
1278
1279   if (journal_fh == NULL)
1280     RRDD_LOG(LOG_CRIT,
1281              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1282              journal_cur, rrd_strerror(errno));
1283
1284 } /* }}} static void journal_rotate */
1285
1286 static void journal_done(void) /* {{{ */
1287 {
1288   if (journal_cur == NULL)
1289     return;
1290
1291   pthread_mutex_lock(&journal_lock);
1292   if (journal_fh != NULL)
1293   {
1294     fclose(journal_fh);
1295     journal_fh = NULL;
1296   }
1297
1298   RRDD_LOG(LOG_INFO, "removing journals");
1299
1300   unlink(journal_old);
1301   unlink(journal_cur);
1302   pthread_mutex_unlock(&journal_lock);
1303
1304 } /* }}} static void journal_done */
1305
1306 static int journal_write(char *cmd, char *args) /* {{{ */
1307 {
1308   int chars;
1309
1310   if (journal_fh == NULL)
1311     return 0;
1312
1313   pthread_mutex_lock(&journal_lock);
1314   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1315   pthread_mutex_unlock(&journal_lock);
1316
1317   if (chars > 0)
1318   {
1319     pthread_mutex_lock(&stats_lock);
1320     stats_journal_bytes += chars;
1321     pthread_mutex_unlock(&stats_lock);
1322   }
1323
1324   return chars;
1325 } /* }}} static int journal_write */
1326
1327 static int journal_replay (const char *file) /* {{{ */
1328 {
1329   FILE *fh;
1330   int entry_cnt = 0;
1331   int fail_cnt = 0;
1332   uint64_t line = 0;
1333   char entry[CMD_MAX];
1334
1335   if (file == NULL) return 0;
1336
1337   fh = fopen(file, "r");
1338   if (fh == NULL)
1339   {
1340     if (errno != ENOENT)
1341       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1342                file, rrd_strerror(errno));
1343     return 0;
1344   }
1345   else
1346     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1347
1348   while(!feof(fh))
1349   {
1350     size_t entry_len;
1351
1352     ++line;
1353     fgets(entry, sizeof(entry), fh);
1354     entry_len = strlen(entry);
1355
1356     /* check \n termination in case journal writing crashed mid-line */
1357     if (entry_len == 0)
1358       continue;
1359     else if (entry[entry_len - 1] != '\n')
1360     {
1361       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1362       ++fail_cnt;
1363       continue;
1364     }
1365
1366     entry[entry_len - 1] = '\0';
1367
1368     if (handle_request(-1, entry, entry_len) == 0)
1369       ++entry_cnt;
1370     else
1371       ++fail_cnt;
1372   }
1373
1374   fclose(fh);
1375
1376   if (entry_cnt > 0)
1377   {
1378     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1379              entry_cnt, fail_cnt);
1380     return 1;
1381   }
1382   else
1383     return 0;
1384
1385 } /* }}} static int journal_replay */
1386
1387 static void *connection_thread_main (void *args) /* {{{ */
1388 {
1389   pthread_t self;
1390   int i;
1391   int fd;
1392   
1393   fd = *((int *) args);
1394   free (args);
1395
1396   pthread_mutex_lock (&connection_threads_lock);
1397   {
1398     pthread_t *temp;
1399
1400     temp = (pthread_t *) realloc (connection_threads,
1401         sizeof (pthread_t) * (connection_threads_num + 1));
1402     if (temp == NULL)
1403     {
1404       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1405     }
1406     else
1407     {
1408       connection_threads = temp;
1409       connection_threads[connection_threads_num] = pthread_self ();
1410       connection_threads_num++;
1411     }
1412   }
1413   pthread_mutex_unlock (&connection_threads_lock);
1414
1415   while (do_shutdown == 0)
1416   {
1417     char buffer[CMD_MAX];
1418
1419     struct pollfd pollfd;
1420     int status;
1421
1422     pollfd.fd = fd;
1423     pollfd.events = POLLIN | POLLPRI;
1424     pollfd.revents = 0;
1425
1426     status = poll (&pollfd, 1, /* timeout = */ 500);
1427     if (status == 0) /* timeout */
1428       continue;
1429     else if (status < 0) /* error */
1430     {
1431       status = errno;
1432       if (status == EINTR)
1433         continue;
1434       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1435       continue;
1436     }
1437
1438     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1439     {
1440       close (fd);
1441       break;
1442     }
1443     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1444     {
1445       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1446           "poll(2) returned something unexpected: %#04hx",
1447           pollfd.revents);
1448       close (fd);
1449       break;
1450     }
1451
1452     status = (int) sread (fd, buffer, sizeof (buffer));
1453     if (status <= 0)
1454     {
1455       close (fd);
1456
1457       if (status < 0)
1458         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1459
1460       break;
1461     }
1462
1463     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1464     if (status != 0)
1465       break;
1466   }
1467
1468   close(fd);
1469
1470   self = pthread_self ();
1471   /* Remove this thread from the connection threads list */
1472   pthread_mutex_lock (&connection_threads_lock);
1473   /* Find out own index in the array */
1474   for (i = 0; i < connection_threads_num; i++)
1475     if (pthread_equal (connection_threads[i], self) != 0)
1476       break;
1477   assert (i < connection_threads_num);
1478
1479   /* Move the trailing threads forward. */
1480   if (i < (connection_threads_num - 1))
1481   {
1482     memmove (connection_threads + i,
1483         connection_threads + i + 1,
1484         sizeof (pthread_t) * (connection_threads_num - i - 1));
1485   }
1486
1487   connection_threads_num--;
1488   pthread_mutex_unlock (&connection_threads_lock);
1489
1490   return (NULL);
1491 } /* }}} void *connection_thread_main */
1492
1493 static int open_listen_socket_unix (const char *path) /* {{{ */
1494 {
1495   int fd;
1496   struct sockaddr_un sa;
1497   listen_socket_t *temp;
1498   int status;
1499
1500   temp = (listen_socket_t *) realloc (listen_fds,
1501       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1502   if (temp == NULL)
1503   {
1504     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1505     return (-1);
1506   }
1507   listen_fds = temp;
1508   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1509
1510   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1511   if (fd < 0)
1512   {
1513     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1514     return (-1);
1515   }
1516
1517   memset (&sa, 0, sizeof (sa));
1518   sa.sun_family = AF_UNIX;
1519   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1520
1521   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1522   if (status != 0)
1523   {
1524     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1525     close (fd);
1526     unlink (path);
1527     return (-1);
1528   }
1529
1530   status = listen (fd, /* backlog = */ 10);
1531   if (status != 0)
1532   {
1533     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1534     close (fd);
1535     unlink (path);
1536     return (-1);
1537   }
1538   
1539   listen_fds[listen_fds_num].fd = fd;
1540   snprintf (listen_fds[listen_fds_num].path,
1541       sizeof (listen_fds[listen_fds_num].path) - 1,
1542       "unix:%s", path);
1543   listen_fds_num++;
1544
1545   return (0);
1546 } /* }}} int open_listen_socket_unix */
1547
1548 static int open_listen_socket (const char *addr_orig) /* {{{ */
1549 {
1550   struct addrinfo ai_hints;
1551   struct addrinfo *ai_res;
1552   struct addrinfo *ai_ptr;
1553   char addr_copy[NI_MAXHOST];
1554   char *addr;
1555   char *port;
1556   int status;
1557
1558   assert (addr_orig != NULL);
1559
1560   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1561   addr_copy[sizeof (addr_copy) - 1] = 0;
1562   addr = addr_copy;
1563
1564   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1565     return (open_listen_socket_unix (addr + strlen ("unix:")));
1566   else if (addr[0] == '/')
1567     return (open_listen_socket_unix (addr));
1568
1569   memset (&ai_hints, 0, sizeof (ai_hints));
1570   ai_hints.ai_flags = 0;
1571 #ifdef AI_ADDRCONFIG
1572   ai_hints.ai_flags |= AI_ADDRCONFIG;
1573 #endif
1574   ai_hints.ai_family = AF_UNSPEC;
1575   ai_hints.ai_socktype = SOCK_STREAM;
1576
1577   port = NULL;
1578  if (*addr == '[') /* IPv6+port format */
1579   {
1580     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1581     addr++;
1582
1583     port = strchr (addr, ']');
1584     if (port == NULL)
1585     {
1586       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1587           addr_orig);
1588       return (-1);
1589     }
1590     *port = 0;
1591     port++;
1592
1593     if (*port == ':')
1594       port++;
1595     else if (*port == 0)
1596       port = NULL;
1597     else
1598     {
1599       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1600           port);
1601       return (-1);
1602     }
1603   } /* if (*addr = ']') */
1604   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1605   {
1606     port = rindex(addr, ':');
1607     if (port != NULL)
1608     {
1609       *port = 0;
1610       port++;
1611     }
1612   }
1613   ai_res = NULL;
1614   status = getaddrinfo (addr,
1615                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1616                         &ai_hints, &ai_res);
1617   if (status != 0)
1618   {
1619     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1620         "%s", addr, gai_strerror (status));
1621     return (-1);
1622   }
1623
1624   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1625   {
1626     int fd;
1627     listen_socket_t *temp;
1628     int one = 1;
1629
1630     temp = (listen_socket_t *) realloc (listen_fds,
1631         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1632     if (temp == NULL)
1633     {
1634       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1635       continue;
1636     }
1637     listen_fds = temp;
1638     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1639
1640     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1641     if (fd < 0)
1642     {
1643       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1644       continue;
1645     }
1646
1647     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1648
1649     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1650     if (status != 0)
1651     {
1652       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1653       close (fd);
1654       continue;
1655     }
1656
1657     status = listen (fd, /* backlog = */ 10);
1658     if (status != 0)
1659     {
1660       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1661       close (fd);
1662       return (-1);
1663     }
1664
1665     listen_fds[listen_fds_num].fd = fd;
1666     strncpy (listen_fds[listen_fds_num].path, addr,
1667         sizeof (listen_fds[listen_fds_num].path) - 1);
1668     listen_fds_num++;
1669   } /* for (ai_ptr) */
1670
1671   return (0);
1672 } /* }}} int open_listen_socket */
1673
1674 static int close_listen_sockets (void) /* {{{ */
1675 {
1676   size_t i;
1677
1678   for (i = 0; i < listen_fds_num; i++)
1679   {
1680     close (listen_fds[i].fd);
1681     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1682       unlink (listen_fds[i].path + strlen ("unix:"));
1683   }
1684
1685   free (listen_fds);
1686   listen_fds = NULL;
1687   listen_fds_num = 0;
1688
1689   return (0);
1690 } /* }}} int close_listen_sockets */
1691
1692 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1693 {
1694   struct pollfd *pollfds;
1695   int pollfds_num;
1696   int status;
1697   int i;
1698
1699   for (i = 0; i < config_listen_address_list_len; i++)
1700     open_listen_socket (config_listen_address_list[i]);
1701
1702   if (config_listen_address_list_len < 1)
1703     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1704
1705   if (listen_fds_num < 1)
1706   {
1707     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1708         "could be opened. Sorry.");
1709     return (NULL);
1710   }
1711
1712   pollfds_num = listen_fds_num;
1713   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1714   if (pollfds == NULL)
1715   {
1716     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1717     return (NULL);
1718   }
1719   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1720
1721   RRDD_LOG(LOG_INFO, "listening for connections");
1722
1723   while (do_shutdown == 0)
1724   {
1725     assert (pollfds_num == ((int) listen_fds_num));
1726     for (i = 0; i < pollfds_num; i++)
1727     {
1728       pollfds[i].fd = listen_fds[i].fd;
1729       pollfds[i].events = POLLIN | POLLPRI;
1730       pollfds[i].revents = 0;
1731     }
1732
1733     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1734     if (status == 0)
1735     {
1736       continue; /* timeout */
1737     }
1738     else if (status < 0)
1739     {
1740       status = errno;
1741       if (status != EINTR)
1742       {
1743         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1744       }
1745       continue;
1746     }
1747
1748     for (i = 0; i < pollfds_num; i++)
1749     {
1750       int *client_sd;
1751       struct sockaddr_storage client_sa;
1752       socklen_t client_sa_size;
1753       pthread_t tid;
1754       pthread_attr_t attr;
1755
1756       if (pollfds[i].revents == 0)
1757         continue;
1758
1759       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1760       {
1761         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1762             "poll(2) returned something unexpected for listen FD #%i.",
1763             pollfds[i].fd);
1764         continue;
1765       }
1766
1767       client_sd = (int *) malloc (sizeof (int));
1768       if (client_sd == NULL)
1769       {
1770         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1771         continue;
1772       }
1773
1774       client_sa_size = sizeof (client_sa);
1775       *client_sd = accept (pollfds[i].fd,
1776           (struct sockaddr *) &client_sa, &client_sa_size);
1777       if (*client_sd < 0)
1778       {
1779         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1780         continue;
1781       }
1782
1783       pthread_attr_init (&attr);
1784       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1785
1786       status = pthread_create (&tid, &attr, connection_thread_main,
1787           /* args = */ (void *) client_sd);
1788       if (status != 0)
1789       {
1790         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1791         close (*client_sd);
1792         free (client_sd);
1793         continue;
1794       }
1795     } /* for (pollfds_num) */
1796   } /* while (do_shutdown == 0) */
1797
1798   RRDD_LOG(LOG_INFO, "starting shutdown");
1799
1800   close_listen_sockets ();
1801
1802   pthread_mutex_lock (&connection_threads_lock);
1803   while (connection_threads_num > 0)
1804   {
1805     pthread_t wait_for;
1806
1807     wait_for = connection_threads[0];
1808
1809     pthread_mutex_unlock (&connection_threads_lock);
1810     pthread_join (wait_for, /* retval = */ NULL);
1811     pthread_mutex_lock (&connection_threads_lock);
1812   }
1813   pthread_mutex_unlock (&connection_threads_lock);
1814
1815   return (NULL);
1816 } /* }}} void *listen_thread_main */
1817
1818 static int daemonize (void) /* {{{ */
1819 {
1820   int status;
1821   int fd;
1822
1823   /* These structures are static, because `sigaction' behaves weird if the are
1824    * overwritten.. */
1825   static struct sigaction sa_int;
1826   static struct sigaction sa_term;
1827   static struct sigaction sa_pipe;
1828
1829   fd = open_pidfile();
1830   if (fd < 0) return fd;
1831
1832   if (!stay_foreground)
1833   {
1834     pid_t child;
1835     char *base_dir;
1836
1837     child = fork ();
1838     if (child < 0)
1839     {
1840       fprintf (stderr, "daemonize: fork(2) failed.\n");
1841       return (-1);
1842     }
1843     else if (child > 0)
1844     {
1845       return (1);
1846     }
1847
1848     /* Change into the /tmp directory. */
1849     base_dir = (config_base_dir != NULL)
1850       ? config_base_dir
1851       : "/tmp";
1852     status = chdir (base_dir);
1853     if (status != 0)
1854     {
1855       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1856       return (-1);
1857     }
1858
1859     /* Become session leader */
1860     setsid ();
1861
1862     /* Open the first three file descriptors to /dev/null */
1863     close (2);
1864     close (1);
1865     close (0);
1866
1867     open ("/dev/null", O_RDWR);
1868     dup (0);
1869     dup (0);
1870   } /* if (!stay_foreground) */
1871
1872   /* Install signal handlers */
1873   memset (&sa_int, 0, sizeof (sa_int));
1874   sa_int.sa_handler = sig_int_handler;
1875   sigaction (SIGINT, &sa_int, NULL);
1876
1877   memset (&sa_term, 0, sizeof (sa_term));
1878   sa_term.sa_handler = sig_term_handler;
1879   sigaction (SIGTERM, &sa_term, NULL);
1880
1881   memset (&sa_pipe, 0, sizeof (sa_pipe));
1882   sa_pipe.sa_handler = SIG_IGN;
1883   sigaction (SIGPIPE, &sa_pipe, NULL);
1884
1885   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1886   RRDD_LOG(LOG_INFO, "starting up");
1887
1888   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1889   if (cache_tree == NULL)
1890   {
1891     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1892     return (-1);
1893   }
1894
1895   status = write_pidfile (fd);
1896   return status;
1897 } /* }}} int daemonize */
1898
1899 static int cleanup (void) /* {{{ */
1900 {
1901   do_shutdown++;
1902
1903   pthread_cond_signal (&cache_cond);
1904   pthread_join (queue_thread, /* return = */ NULL);
1905
1906   remove_pidfile ();
1907
1908   RRDD_LOG(LOG_INFO, "goodbye");
1909   closelog ();
1910
1911   return (0);
1912 } /* }}} int cleanup */
1913
1914 static int read_options (int argc, char **argv) /* {{{ */
1915 {
1916   int option;
1917   int status = 0;
1918
1919   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1920   {
1921     switch (option)
1922     {
1923       case 'g':
1924         stay_foreground=1;
1925         break;
1926
1927       case 'l':
1928       {
1929         char **temp;
1930
1931         temp = (char **) realloc (config_listen_address_list,
1932             sizeof (char *) * (config_listen_address_list_len + 1));
1933         if (temp == NULL)
1934         {
1935           fprintf (stderr, "read_options: realloc failed.\n");
1936           return (2);
1937         }
1938         config_listen_address_list = temp;
1939
1940         temp[config_listen_address_list_len] = strdup (optarg);
1941         if (temp[config_listen_address_list_len] == NULL)
1942         {
1943           fprintf (stderr, "read_options: strdup failed.\n");
1944           return (2);
1945         }
1946         config_listen_address_list_len++;
1947       }
1948       break;
1949
1950       case 'f':
1951       {
1952         int temp;
1953
1954         temp = atoi (optarg);
1955         if (temp > 0)
1956           config_flush_interval = temp;
1957         else
1958         {
1959           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1960           status = 3;
1961         }
1962       }
1963       break;
1964
1965       case 'w':
1966       {
1967         int temp;
1968
1969         temp = atoi (optarg);
1970         if (temp > 0)
1971           config_write_interval = temp;
1972         else
1973         {
1974           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1975           status = 2;
1976         }
1977       }
1978       break;
1979
1980       case 'z':
1981       {
1982         int temp;
1983
1984         temp = atoi(optarg);
1985         if (temp > 0)
1986           config_write_jitter = temp;
1987         else
1988         {
1989           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1990           status = 2;
1991         }
1992
1993         break;
1994       }
1995
1996       case 'b':
1997       {
1998         size_t len;
1999
2000         if (config_base_dir != NULL)
2001           free (config_base_dir);
2002         config_base_dir = strdup (optarg);
2003         if (config_base_dir == NULL)
2004         {
2005           fprintf (stderr, "read_options: strdup failed.\n");
2006           return (3);
2007         }
2008
2009         len = strlen (config_base_dir);
2010         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2011         {
2012           config_base_dir[len - 1] = 0;
2013           len--;
2014         }
2015
2016         if (len < 1)
2017         {
2018           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2019           return (4);
2020         }
2021       }
2022       break;
2023
2024       case 'p':
2025       {
2026         if (config_pid_file != NULL)
2027           free (config_pid_file);
2028         config_pid_file = strdup (optarg);
2029         if (config_pid_file == NULL)
2030         {
2031           fprintf (stderr, "read_options: strdup failed.\n");
2032           return (3);
2033         }
2034       }
2035       break;
2036
2037       case 'j':
2038       {
2039         struct stat statbuf;
2040         const char *dir = optarg;
2041
2042         status = stat(dir, &statbuf);
2043         if (status != 0)
2044         {
2045           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2046           return 6;
2047         }
2048
2049         if (!S_ISDIR(statbuf.st_mode)
2050             || access(dir, R_OK|W_OK|X_OK) != 0)
2051         {
2052           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2053                   errno ? rrd_strerror(errno) : "");
2054           return 6;
2055         }
2056
2057         journal_cur = malloc(PATH_MAX + 1);
2058         journal_old = malloc(PATH_MAX + 1);
2059         if (journal_cur == NULL || journal_old == NULL)
2060         {
2061           fprintf(stderr, "malloc failure for journal files\n");
2062           return 6;
2063         }
2064         else 
2065         {
2066           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2067           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2068         }
2069       }
2070       break;
2071
2072       case 'h':
2073       case '?':
2074         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2075             "\n"
2076             "Usage: rrdcached [options]\n"
2077             "\n"
2078             "Valid options are:\n"
2079             "  -l <address>  Socket address to listen to.\n"
2080             "  -w <seconds>  Interval in which to write data.\n"
2081             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2082             "  -f <seconds>  Interval in which to flush dead data.\n"
2083             "  -p <file>     Location of the PID-file.\n"
2084             "  -b <dir>      Base directory to change to.\n"
2085             "  -g            Do not fork and run in the foreground.\n"
2086             "  -j <dir>      Directory in which to create the journal files.\n"
2087             "\n"
2088             "For more information and a detailed description of all options "
2089             "please refer\n"
2090             "to the rrdcached(1) manual page.\n",
2091             VERSION);
2092         status = -1;
2093         break;
2094     } /* switch (option) */
2095   } /* while (getopt) */
2096
2097   /* advise the user when values are not sane */
2098   if (config_flush_interval < 2 * config_write_interval)
2099     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2100             " 2x write interval (-w) !\n");
2101   if (config_write_jitter > config_write_interval)
2102     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2103             " write interval (-w) !\n");
2104
2105   return (status);
2106 } /* }}} int read_options */
2107
2108 int main (int argc, char **argv)
2109 {
2110   int status;
2111
2112   status = read_options (argc, argv);
2113   if (status != 0)
2114   {
2115     if (status < 0)
2116       status = 0;
2117     return (status);
2118   }
2119
2120   status = daemonize ();
2121   if (status == 1)
2122   {
2123     struct sigaction sigchld;
2124
2125     memset (&sigchld, 0, sizeof (sigchld));
2126     sigchld.sa_handler = SIG_IGN;
2127     sigaction (SIGCHLD, &sigchld, NULL);
2128
2129     return (0);
2130   }
2131   else if (status != 0)
2132   {
2133     fprintf (stderr, "daemonize failed, exiting.\n");
2134     return (1);
2135   }
2136
2137   if (journal_cur != NULL)
2138   {
2139     int had_journal = 0;
2140
2141     pthread_mutex_lock(&journal_lock);
2142
2143     RRDD_LOG(LOG_INFO, "checking for journal files");
2144
2145     had_journal += journal_replay(journal_old);
2146     had_journal += journal_replay(journal_cur);
2147
2148     if (had_journal)
2149       flush_old_values(-1);
2150
2151     pthread_mutex_unlock(&journal_lock);
2152     journal_rotate();
2153
2154     RRDD_LOG(LOG_INFO, "journal processing complete");
2155   }
2156
2157   /* start the queue thread */
2158   memset (&queue_thread, 0, sizeof (queue_thread));
2159   status = pthread_create (&queue_thread,
2160                            NULL, /* attr */
2161                            queue_thread_main,
2162                            NULL); /* args */
2163   if (status != 0)
2164   {
2165     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2166     cleanup();
2167     return (1);
2168   }
2169
2170   listen_thread_main (NULL);
2171   cleanup ();
2172
2173   return (0);
2174 } /* int main */
2175
2176 /*
2177  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2178  */