0e29f131ab1387fa657ae0160e2e6d956de4bab7
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 struct listen_socket_s
105 {
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
110
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
125
126 struct callback_flush_data_s
127 {
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
134
135 enum queue_side_e
136 {
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
141
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
144
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
149
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
152
153 static int do_shutdown = 0;
154
155 static pthread_t queue_thread;
156
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
160
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
167
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
173
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
176
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
185
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
194
195 /* 
196  * Functions
197  */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
199 {
200   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201   do_shutdown++;
202   pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
204
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
206 {
207   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208   do_shutdown++;
209   pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
211
212 static int open_pidfile(void) /* {{{ */
213 {
214   int fd;
215   char *file;
216
217   file = (config_pid_file != NULL)
218     ? config_pid_file
219     : LOCALSTATEDIR "/run/rrdcached.pid";
220
221   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
222   if (fd < 0)
223     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
224             file, rrd_strerror(errno));
225
226   return(fd);
227 }
228
229 static int write_pidfile (int fd) /* {{{ */
230 {
231   pid_t pid;
232   FILE *fh;
233
234   pid = getpid ();
235
236   fh = fdopen (fd, "w");
237   if (fh == NULL)
238   {
239     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
240     close(fd);
241     return (-1);
242   }
243
244   fprintf (fh, "%i\n", (int) pid);
245   fclose (fh);
246
247   return (0);
248 } /* }}} int write_pidfile */
249
250 static int remove_pidfile (void) /* {{{ */
251 {
252   char *file;
253   int status;
254
255   file = (config_pid_file != NULL)
256     ? config_pid_file
257     : LOCALSTATEDIR "/run/rrdcached.pid";
258
259   status = unlink (file);
260   if (status == 0)
261     return (0);
262   return (errno);
263 } /* }}} int remove_pidfile */
264
265 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
266 {
267   char    *buffer;
268   size_t   buffer_used;
269   size_t   buffer_free;
270   ssize_t  status;
271
272   buffer       = (char *) buffer_void;
273   buffer_used  = 0;
274   buffer_free  = buffer_size;
275
276   while (buffer_free > 0)
277   {
278     status = read (fd, buffer + buffer_used, buffer_free);
279     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
280       continue;
281
282     if (status < 0)
283       return (-1);
284
285     if (status == 0)
286       return (0);
287
288     assert ((0 > status) || (buffer_free >= (size_t) status));
289
290     buffer_free = buffer_free - status;
291     buffer_used = buffer_used + status;
292
293     if (buffer[buffer_used - 1] == '\n')
294       break;
295   }
296
297   assert (buffer_used > 0);
298
299   if (buffer[buffer_used - 1] != '\n')
300   {
301     errno = ENOBUFS;
302     return (-1);
303   }
304
305   buffer[buffer_used - 1] = 0;
306
307   /* Fix network line endings. */
308   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
309   {
310     buffer_used--;
311     buffer[buffer_used - 1] = 0;
312   }
313
314   return (buffer_used);
315 } /* }}} ssize_t sread */
316
317 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
318 {
319   const char *ptr;
320   size_t      nleft;
321   ssize_t     status;
322
323   /* special case for journal replay */
324   if (fd < 0) return 0;
325
326   ptr   = (const char *) buf;
327   nleft = count;
328
329   while (nleft > 0)
330   {
331     status = write (fd, (const void *) ptr, nleft);
332
333     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
334       continue;
335
336     if (status < 0)
337       return (status);
338
339     nleft -= status;
340     ptr   += status;
341   }
342
343   return (0);
344 } /* }}} ssize_t swrite */
345
346 static void _wipe_ci_values(cache_item_t *ci, time_t when)
347 {
348   ci->values = NULL;
349   ci->values_num = 0;
350
351   ci->last_flush_time = when;
352   if (config_write_jitter > 0)
353     ci->last_flush_time += (random() % config_write_jitter);
354
355   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
356 }
357
358 /*
359  * enqueue_cache_item:
360  * `cache_lock' must be acquired before calling this function!
361  */
362 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
363     queue_side_t side)
364 {
365   int did_insert = 0;
366
367   if (ci == NULL)
368     return (-1);
369
370   if (ci->values_num == 0)
371     return (0);
372
373   if (side == HEAD)
374   {
375     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
376     {
377       assert (ci->next == NULL);
378       ci->next = cache_queue_head;
379       cache_queue_head = ci;
380
381       if (cache_queue_tail == NULL)
382         cache_queue_tail = cache_queue_head;
383
384       did_insert = 1;
385     }
386     else if (cache_queue_head == ci)
387     {
388       /* do nothing */
389     }
390     else /* enqueued, but not first entry */
391     {
392       cache_item_t *prev;
393
394       /* find previous entry */
395       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
396         if (prev->next == ci)
397           break;
398       assert (prev != NULL);
399
400       /* move to the front */
401       prev->next = ci->next;
402       ci->next = cache_queue_head;
403       cache_queue_head = ci;
404
405       /* check if we need to adapt the tail */
406       if (cache_queue_tail == ci)
407         cache_queue_tail = prev;
408     }
409   }
410   else /* (side == TAIL) */
411   {
412     /* We don't move values back in the list.. */
413     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
414       return (0);
415
416     assert (ci->next == NULL);
417
418     if (cache_queue_tail == NULL)
419       cache_queue_head = ci;
420     else
421       cache_queue_tail->next = ci;
422     cache_queue_tail = ci;
423
424     did_insert = 1;
425   }
426
427   ci->flags |= CI_FLAGS_IN_QUEUE;
428
429   if (did_insert)
430   {
431     pthread_cond_broadcast(&cache_cond);
432     pthread_mutex_lock (&stats_lock);
433     stats_queue_length++;
434     pthread_mutex_unlock (&stats_lock);
435   }
436
437   return (0);
438 } /* }}} int enqueue_cache_item */
439
440 /*
441  * tree_callback_flush:
442  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
443  * while this is in progress.
444  */
445 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
446     gpointer data)
447 {
448   cache_item_t *ci;
449   callback_flush_data_t *cfd;
450
451   ci = (cache_item_t *) value;
452   cfd = (callback_flush_data_t *) data;
453
454   if ((ci->last_flush_time <= cfd->abs_timeout)
455       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
456       && (ci->values_num > 0))
457   {
458     enqueue_cache_item (ci, TAIL);
459   }
460   else if ((do_shutdown != 0)
461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
462       && (ci->values_num > 0))
463   {
464     enqueue_cache_item (ci, TAIL);
465   }
466   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
467       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
468       && (ci->values_num <= 0))
469   {
470     char **temp;
471
472     temp = (char **) realloc (cfd->keys,
473         sizeof (char *) * (cfd->keys_num + 1));
474     if (temp == NULL)
475     {
476       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
477       return (FALSE);
478     }
479     cfd->keys = temp;
480     /* Make really sure this points to the _same_ place */
481     assert ((char *) key == ci->file);
482     cfd->keys[cfd->keys_num] = (char *) key;
483     cfd->keys_num++;
484   }
485
486   return (FALSE);
487 } /* }}} gboolean tree_callback_flush */
488
489 static int flush_old_values (int max_age)
490 {
491   callback_flush_data_t cfd;
492   size_t k;
493
494   memset (&cfd, 0, sizeof (cfd));
495   /* Pass the current time as user data so that we don't need to call
496    * `time' for each node. */
497   cfd.now = time (NULL);
498   cfd.keys = NULL;
499   cfd.keys_num = 0;
500
501   if (max_age > 0)
502     cfd.abs_timeout = cfd.now - max_age;
503   else
504     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
505
506   /* `tree_callback_flush' will return the keys of all values that haven't
507    * been touched in the last `config_flush_interval' seconds in `cfd'.
508    * The char*'s in this array point to the same memory as ci->file, so we
509    * don't need to free them separately. */
510   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
511
512   for (k = 0; k < cfd.keys_num; k++)
513   {
514     cache_item_t *ci;
515
516     /* This must not fail. */
517     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
518     assert (ci != NULL);
519
520     /* If we end up here with values available, something's seriously
521      * messed up. */
522     assert (ci->values_num == 0);
523
524     /* Remove the node from the tree */
525     g_tree_remove (cache_tree, cfd.keys[k]);
526     cfd.keys[k] = NULL;
527
528     /* Now free and clean up `ci'. */
529     free (ci->file);
530     ci->file = NULL;
531     free (ci);
532     ci = NULL;
533   } /* for (k = 0; k < cfd.keys_num; k++) */
534
535   if (cfd.keys != NULL)
536   {
537     free (cfd.keys);
538     cfd.keys = NULL;
539   }
540
541   return (0);
542 } /* int flush_old_values */
543
544 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
545 {
546   struct timeval now;
547   struct timespec next_flush;
548
549   gettimeofday (&now, NULL);
550   next_flush.tv_sec = now.tv_sec + config_flush_interval;
551   next_flush.tv_nsec = 1000 * now.tv_usec;
552
553   pthread_mutex_lock (&cache_lock);
554   while ((do_shutdown == 0) || (cache_queue_head != NULL))
555   {
556     cache_item_t *ci;
557     char *file;
558     char **values;
559     int values_num;
560     int status;
561     int i;
562
563     /* First, check if it's time to do the cache flush. */
564     gettimeofday (&now, NULL);
565     if ((now.tv_sec > next_flush.tv_sec)
566         || ((now.tv_sec == next_flush.tv_sec)
567           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
568     {
569       /* Flush all values that haven't been written in the last
570        * `config_write_interval' seconds. */
571       flush_old_values (config_write_interval);
572
573       /* Determine the time of the next cache flush. */
574       while (next_flush.tv_sec <= now.tv_sec)
575         next_flush.tv_sec += config_flush_interval;
576
577       /* unlock the cache while we rotate so we don't block incoming
578        * updates if the fsync() blocks on disk I/O */
579       pthread_mutex_unlock(&cache_lock);
580       journal_rotate();
581       pthread_mutex_lock(&cache_lock);
582     }
583
584     /* Now, check if there's something to store away. If not, wait until
585      * something comes in or it's time to do the cache flush. */
586     if (cache_queue_head == NULL)
587     {
588       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
589       if ((status != 0) && (status != ETIMEDOUT))
590       {
591         RRDD_LOG (LOG_ERR, "queue_thread_main: "
592             "pthread_cond_timedwait returned %i.", status);
593       }
594     }
595
596     /* We're about to shut down, so lets flush the entire tree. */
597     if ((do_shutdown != 0) && (cache_queue_head == NULL))
598       flush_old_values (/* max age = */ -1);
599
600     /* Check if a value has arrived. This may be NULL if we timed out or there
601      * was an interrupt such as a signal. */
602     if (cache_queue_head == NULL)
603       continue;
604
605     ci = cache_queue_head;
606
607     /* copy the relevant parts */
608     file = strdup (ci->file);
609     if (file == NULL)
610     {
611       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
612       continue;
613     }
614
615     assert(ci->values != NULL);
616     assert(ci->values_num > 0);
617
618     values = ci->values;
619     values_num = ci->values_num;
620
621     _wipe_ci_values(ci, time(NULL));
622
623     cache_queue_head = ci->next;
624     if (cache_queue_head == NULL)
625       cache_queue_tail = NULL;
626     ci->next = NULL;
627
628     pthread_mutex_lock (&stats_lock);
629     assert (stats_queue_length > 0);
630     stats_queue_length--;
631     pthread_mutex_unlock (&stats_lock);
632
633     pthread_mutex_unlock (&cache_lock);
634
635     rrd_clear_error ();
636     status = rrd_update_r (file, NULL, values_num, (void *) values);
637     if (status != 0)
638     {
639       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
640           "rrd_update_r (%s) failed with status %i. (%s)",
641           file, status, rrd_get_error());
642     }
643
644     journal_write("wrote", file);
645     pthread_cond_broadcast(&ci->flushed);
646
647     for (i = 0; i < values_num; i++)
648       free (values[i]);
649
650     free(values);
651     free(file);
652
653     if (status == 0)
654     {
655       pthread_mutex_lock (&stats_lock);
656       stats_updates_written++;
657       stats_data_sets_written += values_num;
658       pthread_mutex_unlock (&stats_lock);
659     }
660
661     pthread_mutex_lock (&cache_lock);
662
663     /* We're about to shut down, so lets flush the entire tree. */
664     if ((do_shutdown != 0) && (cache_queue_head == NULL))
665       flush_old_values (/* max age = */ -1);
666   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
667   pthread_mutex_unlock (&cache_lock);
668
669   assert(cache_queue_head == NULL);
670   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
671   journal_done();
672
673   return (NULL);
674 } /* }}} void *queue_thread_main */
675
676 static int buffer_get_field (char **buffer_ret, /* {{{ */
677     size_t *buffer_size_ret, char **field_ret)
678 {
679   char *buffer;
680   size_t buffer_pos;
681   size_t buffer_size;
682   char *field;
683   size_t field_size;
684   int status;
685
686   buffer = *buffer_ret;
687   buffer_pos = 0;
688   buffer_size = *buffer_size_ret;
689   field = *buffer_ret;
690   field_size = 0;
691
692   if (buffer_size <= 0)
693     return (-1);
694
695   /* This is ensured by `handle_request'. */
696   assert (buffer[buffer_size - 1] == '\0');
697
698   status = -1;
699   while (buffer_pos < buffer_size)
700   {
701     /* Check for end-of-field or end-of-buffer */
702     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
703     {
704       field[field_size] = 0;
705       field_size++;
706       buffer_pos++;
707       status = 0;
708       break;
709     }
710     /* Handle escaped characters. */
711     else if (buffer[buffer_pos] == '\\')
712     {
713       if (buffer_pos >= (buffer_size - 1))
714         break;
715       buffer_pos++;
716       field[field_size] = buffer[buffer_pos];
717       field_size++;
718       buffer_pos++;
719     }
720     /* Normal operation */ 
721     else
722     {
723       field[field_size] = buffer[buffer_pos];
724       field_size++;
725       buffer_pos++;
726     }
727   } /* while (buffer_pos < buffer_size) */
728
729   if (status != 0)
730     return (status);
731
732   *buffer_ret = buffer + buffer_pos;
733   *buffer_size_ret = buffer_size - buffer_pos;
734   *field_ret = field;
735
736   return (0);
737 } /* }}} int buffer_get_field */
738
739 static int flush_file (const char *filename) /* {{{ */
740 {
741   cache_item_t *ci;
742
743   pthread_mutex_lock (&cache_lock);
744
745   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
746   if (ci == NULL)
747   {
748     pthread_mutex_unlock (&cache_lock);
749     return (ENOENT);
750   }
751
752   /* Enqueue at head */
753   enqueue_cache_item (ci, HEAD);
754
755   pthread_cond_wait(&ci->flushed, &cache_lock);
756   pthread_mutex_unlock(&cache_lock);
757
758   return (0);
759 } /* }}} int flush_file */
760
761 static int handle_request_help (int fd, /* {{{ */
762     char *buffer, size_t buffer_size)
763 {
764   int status;
765   char **help_text;
766   size_t help_text_len;
767   char *command;
768   size_t i;
769
770   char *help_help[] =
771   {
772     "5 Command overview\n",
773     "FLUSH <filename>\n",
774     "FLUSHALL\n",
775     "HELP [<command>]\n",
776     "UPDATE <filename> <values> [<values> ...]\n",
777     "STATS\n"
778   };
779   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
780
781   char *help_flush[] =
782   {
783     "4 Help for FLUSH\n",
784     "Usage: FLUSH <filename>\n",
785     "\n",
786     "Adds the given filename to the head of the update queue and returns\n",
787     "after is has been dequeued.\n"
788   };
789   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
790
791   char *help_flushall[] =
792   {
793     "3 Help for FLUSHALL\n",
794     "Usage: FLUSHALL\n",
795     "\n",
796     "Triggers writing of all pending updates.  Returns immediately.\n"
797   };
798   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
799
800   char *help_update[] =
801   {
802     "9 Help for UPDATE\n",
803     "Usage: UPDATE <filename> <values> [<values> ...]\n"
804     "\n",
805     "Adds the given file to the internal cache if it is not yet known and\n",
806     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
807     "for details.\n",
808     "\n",
809     "Each <values> has the following form:\n",
810     "  <values> = <time>:<value>[:<value>[...]]\n",
811     "See the rrdupdate(1) manpage for details.\n"
812   };
813   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
814
815   char *help_stats[] =
816   {
817     "4 Help for STATS\n",
818     "Usage: STATS\n",
819     "\n",
820     "Returns some performance counters, see the rrdcached(1) manpage for\n",
821     "a description of the values.\n"
822   };
823   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
824
825   status = buffer_get_field (&buffer, &buffer_size, &command);
826   if (status != 0)
827   {
828     help_text = help_help;
829     help_text_len = help_help_len;
830   }
831   else
832   {
833     if (strcasecmp (command, "update") == 0)
834     {
835       help_text = help_update;
836       help_text_len = help_update_len;
837     }
838     else if (strcasecmp (command, "flush") == 0)
839     {
840       help_text = help_flush;
841       help_text_len = help_flush_len;
842     }
843     else if (strcasecmp (command, "flushall") == 0)
844     {
845       help_text = help_flushall;
846       help_text_len = help_flushall_len;
847     }
848     else if (strcasecmp (command, "stats") == 0)
849     {
850       help_text = help_stats;
851       help_text_len = help_stats_len;
852     }
853     else
854     {
855       help_text = help_help;
856       help_text_len = help_help_len;
857     }
858   }
859
860   for (i = 0; i < help_text_len; i++)
861   {
862     status = swrite (fd, help_text[i], strlen (help_text[i]));
863     if (status < 0)
864     {
865       status = errno;
866       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
867       return (status);
868     }
869   }
870
871   return (0);
872 } /* }}} int handle_request_help */
873
874 static int handle_request_stats (int fd, /* {{{ */
875     char *buffer __attribute__((unused)),
876     size_t buffer_size __attribute__((unused)))
877 {
878   int status;
879   char outbuf[CMD_MAX];
880
881   uint64_t copy_queue_length;
882   uint64_t copy_updates_received;
883   uint64_t copy_flush_received;
884   uint64_t copy_updates_written;
885   uint64_t copy_data_sets_written;
886   uint64_t copy_journal_bytes;
887   uint64_t copy_journal_rotate;
888
889   uint64_t tree_nodes_number;
890   uint64_t tree_depth;
891
892   pthread_mutex_lock (&stats_lock);
893   copy_queue_length       = stats_queue_length;
894   copy_updates_received   = stats_updates_received;
895   copy_flush_received     = stats_flush_received;
896   copy_updates_written    = stats_updates_written;
897   copy_data_sets_written  = stats_data_sets_written;
898   copy_journal_bytes      = stats_journal_bytes;
899   copy_journal_rotate     = stats_journal_rotate;
900   pthread_mutex_unlock (&stats_lock);
901
902   pthread_mutex_lock (&cache_lock);
903   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
904   tree_depth        = (uint64_t) g_tree_height (cache_tree);
905   pthread_mutex_unlock (&cache_lock);
906
907 #define RRDD_STATS_SEND \
908   outbuf[sizeof (outbuf) - 1] = 0; \
909   status = swrite (fd, outbuf, strlen (outbuf)); \
910   if (status < 0) \
911   { \
912     status = errno; \
913     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
914     return (status); \
915   }
916
917   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
918   RRDD_STATS_SEND;
919
920   snprintf (outbuf, sizeof (outbuf),
921       "QueueLength: %"PRIu64"\n", copy_queue_length);
922   RRDD_STATS_SEND;
923
924   snprintf (outbuf, sizeof (outbuf),
925       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
926   RRDD_STATS_SEND;
927
928   snprintf (outbuf, sizeof (outbuf),
929       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
930   RRDD_STATS_SEND;
931
932   snprintf (outbuf, sizeof (outbuf),
933       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
934   RRDD_STATS_SEND;
935
936   snprintf (outbuf, sizeof (outbuf),
937       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
938   RRDD_STATS_SEND;
939
940   snprintf (outbuf, sizeof (outbuf),
941       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
942   RRDD_STATS_SEND;
943
944   snprintf (outbuf, sizeof (outbuf),
945       "TreeDepth: %"PRIu64"\n", tree_depth);
946   RRDD_STATS_SEND;
947
948   snprintf (outbuf, sizeof(outbuf),
949       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
950   RRDD_STATS_SEND;
951
952   snprintf (outbuf, sizeof(outbuf),
953       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
954   RRDD_STATS_SEND;
955
956   return (0);
957 #undef RRDD_STATS_SEND
958 } /* }}} int handle_request_stats */
959
960 static int handle_request_flush (int fd, /* {{{ */
961     char *buffer, size_t buffer_size)
962 {
963   char *file;
964   int status;
965   char result[CMD_MAX];
966
967   status = buffer_get_field (&buffer, &buffer_size, &file);
968   if (status != 0)
969   {
970     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
971   }
972   else
973   {
974     pthread_mutex_lock(&stats_lock);
975     stats_flush_received++;
976     pthread_mutex_unlock(&stats_lock);
977
978     status = flush_file (file);
979     if (status == 0)
980       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
981     else if (status == ENOENT)
982     {
983       /* no file in our tree; see whether it exists at all */
984       struct stat statbuf;
985
986       memset(&statbuf, 0, sizeof(statbuf));
987       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
988         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
989       else
990         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
991     }
992     else if (status < 0)
993       strncpy (result, "-1 Internal error.\n", sizeof (result));
994     else
995       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
996   }
997   result[sizeof (result) - 1] = 0;
998
999   status = swrite (fd, result, strlen (result));
1000   if (status < 0)
1001   {
1002     status = errno;
1003     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1004     return (status);
1005   }
1006
1007   return (0);
1008 } /* }}} int handle_request_flush */
1009
1010 static int handle_request_flushall(int fd) /* {{{ */
1011 {
1012   int status;
1013   char answer[] ="0 Started flush.\n";
1014
1015   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1016
1017   pthread_mutex_lock(&cache_lock);
1018   flush_old_values(-1);
1019   pthread_mutex_unlock(&cache_lock);
1020
1021   status = swrite(fd, answer, strlen(answer));
1022   if (status < 0)
1023   {
1024     status = errno;
1025     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1026   }
1027
1028   return (status);
1029 }
1030
1031 static int handle_request_update (int fd, /* {{{ */
1032     char *buffer, size_t buffer_size)
1033 {
1034   char *file;
1035   int values_num = 0;
1036   int status;
1037
1038   time_t now;
1039
1040   cache_item_t *ci;
1041   char answer[CMD_MAX];
1042
1043 #define RRDD_UPDATE_SEND \
1044   answer[sizeof (answer) - 1] = 0; \
1045   status = swrite (fd, answer, strlen (answer)); \
1046   if (status < 0) \
1047   { \
1048     status = errno; \
1049     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1050     return (status); \
1051   }
1052
1053   now = time (NULL);
1054
1055   status = buffer_get_field (&buffer, &buffer_size, &file);
1056   if (status != 0)
1057   {
1058     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1059         sizeof (answer));
1060     RRDD_UPDATE_SEND;
1061     return (0);
1062   }
1063
1064   pthread_mutex_lock(&stats_lock);
1065   stats_updates_received++;
1066   pthread_mutex_unlock(&stats_lock);
1067
1068   pthread_mutex_lock (&cache_lock);
1069   ci = g_tree_lookup (cache_tree, file);
1070
1071   if (ci == NULL) /* {{{ */
1072   {
1073     struct stat statbuf;
1074
1075     /* don't hold the lock while we setup; stat(2) might block */
1076     pthread_mutex_unlock(&cache_lock);
1077
1078     memset (&statbuf, 0, sizeof (statbuf));
1079     status = stat (file, &statbuf);
1080     if (status != 0)
1081     {
1082       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1083
1084       status = errno;
1085       if (status == ENOENT)
1086         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1087       else
1088         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1089             status);
1090       RRDD_UPDATE_SEND;
1091       return (0);
1092     }
1093     if (!S_ISREG (statbuf.st_mode))
1094     {
1095       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1096       RRDD_UPDATE_SEND;
1097       return (0);
1098     }
1099     if (access(file, R_OK|W_OK) != 0)
1100     {
1101       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1102                 file, rrd_strerror(errno));
1103       RRDD_UPDATE_SEND;
1104       return (0);
1105     }
1106
1107     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1108     if (ci == NULL)
1109     {
1110       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1111
1112       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1113       RRDD_UPDATE_SEND;
1114       return (0);
1115     }
1116     memset (ci, 0, sizeof (cache_item_t));
1117
1118     ci->file = strdup (file);
1119     if (ci->file == NULL)
1120     {
1121       free (ci);
1122       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1123
1124       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1125       RRDD_UPDATE_SEND;
1126       return (0);
1127     }
1128
1129     _wipe_ci_values(ci, now);
1130     ci->flags = CI_FLAGS_IN_TREE;
1131
1132     pthread_mutex_lock(&cache_lock);
1133     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1134   } /* }}} */
1135   assert (ci != NULL);
1136
1137   while (buffer_size > 0)
1138   {
1139     char **temp;
1140     char *value;
1141
1142     status = buffer_get_field (&buffer, &buffer_size, &value);
1143     if (status != 0)
1144     {
1145       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1146       break;
1147     }
1148
1149     temp = (char **) realloc (ci->values,
1150         sizeof (char *) * (ci->values_num + 1));
1151     if (temp == NULL)
1152     {
1153       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1154       continue;
1155     }
1156     ci->values = temp;
1157
1158     ci->values[ci->values_num] = strdup (value);
1159     if (ci->values[ci->values_num] == NULL)
1160     {
1161       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1162       continue;
1163     }
1164     ci->values_num++;
1165
1166     values_num++;
1167   }
1168
1169   if (((now - ci->last_flush_time) >= config_write_interval)
1170       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1171       && (ci->values_num > 0))
1172   {
1173     enqueue_cache_item (ci, TAIL);
1174   }
1175
1176   pthread_mutex_unlock (&cache_lock);
1177
1178   if (values_num < 1)
1179   {
1180     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1181   }
1182   else
1183   {
1184     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1185         (values_num == 1) ? "" : "s");
1186   }
1187   RRDD_UPDATE_SEND;
1188   return (0);
1189 #undef RRDD_UPDATE_SEND
1190 } /* }}} int handle_request_update */
1191
1192 /* we came across a "WROTE" entry during journal replay.
1193  * throw away any values that we have accumulated for this file
1194  */
1195 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1196                                  const char *buffer,
1197                                  size_t buffer_size __attribute__((unused)))
1198 {
1199   int i;
1200   cache_item_t *ci;
1201   const char *file = buffer;
1202
1203   pthread_mutex_lock(&cache_lock);
1204
1205   ci = g_tree_lookup(cache_tree, file);
1206   if (ci == NULL)
1207   {
1208     pthread_mutex_unlock(&cache_lock);
1209     return (0);
1210   }
1211
1212   if (ci->values)
1213   {
1214     for (i=0; i < ci->values_num; i++)
1215       free(ci->values[i]);
1216
1217     free(ci->values);
1218   }
1219
1220   _wipe_ci_values(ci, time(NULL));
1221
1222   pthread_mutex_unlock(&cache_lock);
1223   return (0);
1224 } /* }}} int handle_request_wrote */
1225
1226 /* if fd < 0, we are in journal replay mode */
1227 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1228 {
1229   char *buffer_ptr;
1230   char *command;
1231   int status;
1232
1233   assert (buffer[buffer_size - 1] == '\0');
1234
1235   buffer_ptr = buffer;
1236   command = NULL;
1237   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1238   if (status != 0)
1239   {
1240     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1241     return (-1);
1242   }
1243
1244   if (strcasecmp (command, "update") == 0)
1245   {
1246     /* don't re-write updates in replay mode */
1247     if (fd >= 0)
1248       journal_write(command, buffer_ptr);
1249
1250     return (handle_request_update (fd, buffer_ptr, buffer_size));
1251   }
1252   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1253   {
1254     /* this is only valid in replay mode */
1255     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1256   }
1257   else if (strcasecmp (command, "flush") == 0)
1258   {
1259     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1260   }
1261   else if (strcasecmp (command, "flushall") == 0)
1262   {
1263     return (handle_request_flushall(fd));
1264   }
1265   else if (strcasecmp (command, "stats") == 0)
1266   {
1267     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1268   }
1269   else if (strcasecmp (command, "help") == 0)
1270   {
1271     return (handle_request_help (fd, buffer_ptr, buffer_size));
1272   }
1273   else
1274   {
1275     char result[CMD_MAX];
1276
1277     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1278     result[sizeof (result) - 1] = 0;
1279
1280     status = swrite (fd, result, strlen (result));
1281     if (status < 0)
1282     {
1283       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1284       return (-1);
1285     }
1286   }
1287
1288   return (0);
1289 } /* }}} int handle_request */
1290
1291 /* MUST NOT hold journal_lock before calling this */
1292 static void journal_rotate(void) /* {{{ */
1293 {
1294   FILE *old_fh = NULL;
1295
1296   if (journal_cur == NULL || journal_old == NULL)
1297     return;
1298
1299   pthread_mutex_lock(&journal_lock);
1300
1301   /* we rotate this way (rename before close) so that the we can release
1302    * the journal lock as fast as possible.  Journal writes to the new
1303    * journal can proceed immediately after the new file is opened.  The
1304    * fclose can then block without affecting new updates.
1305    */
1306   if (journal_fh != NULL)
1307   {
1308     old_fh = journal_fh;
1309     rename(journal_cur, journal_old);
1310     ++stats_journal_rotate;
1311   }
1312
1313   journal_fh = fopen(journal_cur, "a");
1314   pthread_mutex_unlock(&journal_lock);
1315
1316   if (old_fh != NULL)
1317     fclose(old_fh);
1318
1319   if (journal_fh == NULL)
1320     RRDD_LOG(LOG_CRIT,
1321              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1322              journal_cur, rrd_strerror(errno));
1323
1324 } /* }}} static void journal_rotate */
1325
1326 static void journal_done(void) /* {{{ */
1327 {
1328   if (journal_cur == NULL)
1329     return;
1330
1331   pthread_mutex_lock(&journal_lock);
1332   if (journal_fh != NULL)
1333   {
1334     fclose(journal_fh);
1335     journal_fh = NULL;
1336   }
1337
1338   RRDD_LOG(LOG_INFO, "removing journals");
1339
1340   unlink(journal_old);
1341   unlink(journal_cur);
1342   pthread_mutex_unlock(&journal_lock);
1343
1344 } /* }}} static void journal_done */
1345
1346 static int journal_write(char *cmd, char *args) /* {{{ */
1347 {
1348   int chars;
1349
1350   if (journal_fh == NULL)
1351     return 0;
1352
1353   pthread_mutex_lock(&journal_lock);
1354   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1355   pthread_mutex_unlock(&journal_lock);
1356
1357   if (chars > 0)
1358   {
1359     pthread_mutex_lock(&stats_lock);
1360     stats_journal_bytes += chars;
1361     pthread_mutex_unlock(&stats_lock);
1362   }
1363
1364   return chars;
1365 } /* }}} static int journal_write */
1366
1367 static int journal_replay (const char *file) /* {{{ */
1368 {
1369   FILE *fh;
1370   int entry_cnt = 0;
1371   int fail_cnt = 0;
1372   uint64_t line = 0;
1373   char entry[CMD_MAX];
1374
1375   if (file == NULL) return 0;
1376
1377   fh = fopen(file, "r");
1378   if (fh == NULL)
1379   {
1380     if (errno != ENOENT)
1381       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1382                file, rrd_strerror(errno));
1383     return 0;
1384   }
1385   else
1386     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1387
1388   while(!feof(fh))
1389   {
1390     size_t entry_len;
1391
1392     ++line;
1393     fgets(entry, sizeof(entry), fh);
1394     entry_len = strlen(entry);
1395
1396     /* check \n termination in case journal writing crashed mid-line */
1397     if (entry_len == 0)
1398       continue;
1399     else if (entry[entry_len - 1] != '\n')
1400     {
1401       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1402       ++fail_cnt;
1403       continue;
1404     }
1405
1406     entry[entry_len - 1] = '\0';
1407
1408     if (handle_request(-1, entry, entry_len) == 0)
1409       ++entry_cnt;
1410     else
1411       ++fail_cnt;
1412   }
1413
1414   fclose(fh);
1415
1416   if (entry_cnt > 0)
1417   {
1418     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1419              entry_cnt, fail_cnt);
1420     return 1;
1421   }
1422   else
1423     return 0;
1424
1425 } /* }}} static int journal_replay */
1426
1427 static void *connection_thread_main (void *args) /* {{{ */
1428 {
1429   pthread_t self;
1430   int i;
1431   int fd;
1432   
1433   fd = *((int *) args);
1434   free (args);
1435
1436   pthread_mutex_lock (&connection_threads_lock);
1437   {
1438     pthread_t *temp;
1439
1440     temp = (pthread_t *) realloc (connection_threads,
1441         sizeof (pthread_t) * (connection_threads_num + 1));
1442     if (temp == NULL)
1443     {
1444       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1445     }
1446     else
1447     {
1448       connection_threads = temp;
1449       connection_threads[connection_threads_num] = pthread_self ();
1450       connection_threads_num++;
1451     }
1452   }
1453   pthread_mutex_unlock (&connection_threads_lock);
1454
1455   while (do_shutdown == 0)
1456   {
1457     char buffer[CMD_MAX];
1458
1459     struct pollfd pollfd;
1460     int status;
1461
1462     pollfd.fd = fd;
1463     pollfd.events = POLLIN | POLLPRI;
1464     pollfd.revents = 0;
1465
1466     status = poll (&pollfd, 1, /* timeout = */ 500);
1467     if (status == 0) /* timeout */
1468       continue;
1469     else if (status < 0) /* error */
1470     {
1471       status = errno;
1472       if (status == EINTR)
1473         continue;
1474       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1475       continue;
1476     }
1477
1478     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1479     {
1480       close (fd);
1481       break;
1482     }
1483     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1484     {
1485       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1486           "poll(2) returned something unexpected: %#04hx",
1487           pollfd.revents);
1488       close (fd);
1489       break;
1490     }
1491
1492     status = (int) sread (fd, buffer, sizeof (buffer));
1493     if (status <= 0)
1494     {
1495       close (fd);
1496
1497       if (status < 0)
1498         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1499
1500       break;
1501     }
1502
1503     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1504     if (status != 0)
1505       break;
1506   }
1507
1508   close(fd);
1509
1510   self = pthread_self ();
1511   /* Remove this thread from the connection threads list */
1512   pthread_mutex_lock (&connection_threads_lock);
1513   /* Find out own index in the array */
1514   for (i = 0; i < connection_threads_num; i++)
1515     if (pthread_equal (connection_threads[i], self) != 0)
1516       break;
1517   assert (i < connection_threads_num);
1518
1519   /* Move the trailing threads forward. */
1520   if (i < (connection_threads_num - 1))
1521   {
1522     memmove (connection_threads + i,
1523         connection_threads + i + 1,
1524         sizeof (pthread_t) * (connection_threads_num - i - 1));
1525   }
1526
1527   connection_threads_num--;
1528   pthread_mutex_unlock (&connection_threads_lock);
1529
1530   return (NULL);
1531 } /* }}} void *connection_thread_main */
1532
1533 static int open_listen_socket_unix (const char *path) /* {{{ */
1534 {
1535   int fd;
1536   struct sockaddr_un sa;
1537   listen_socket_t *temp;
1538   int status;
1539
1540   temp = (listen_socket_t *) realloc (listen_fds,
1541       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1542   if (temp == NULL)
1543   {
1544     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1545     return (-1);
1546   }
1547   listen_fds = temp;
1548   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1549
1550   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1551   if (fd < 0)
1552   {
1553     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1554     return (-1);
1555   }
1556
1557   memset (&sa, 0, sizeof (sa));
1558   sa.sun_family = AF_UNIX;
1559   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1560
1561   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1562   if (status != 0)
1563   {
1564     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1565     close (fd);
1566     unlink (path);
1567     return (-1);
1568   }
1569
1570   status = listen (fd, /* backlog = */ 10);
1571   if (status != 0)
1572   {
1573     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1574     close (fd);
1575     unlink (path);
1576     return (-1);
1577   }
1578   
1579   listen_fds[listen_fds_num].fd = fd;
1580   snprintf (listen_fds[listen_fds_num].path,
1581       sizeof (listen_fds[listen_fds_num].path) - 1,
1582       "unix:%s", path);
1583   listen_fds_num++;
1584
1585   return (0);
1586 } /* }}} int open_listen_socket_unix */
1587
1588 static int open_listen_socket (const char *addr_orig) /* {{{ */
1589 {
1590   struct addrinfo ai_hints;
1591   struct addrinfo *ai_res;
1592   struct addrinfo *ai_ptr;
1593   char addr_copy[NI_MAXHOST];
1594   char *addr;
1595   char *port;
1596   int status;
1597
1598   assert (addr_orig != NULL);
1599
1600   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1601   addr_copy[sizeof (addr_copy) - 1] = 0;
1602   addr = addr_copy;
1603
1604   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1605     return (open_listen_socket_unix (addr + strlen ("unix:")));
1606   else if (addr[0] == '/')
1607     return (open_listen_socket_unix (addr));
1608
1609   memset (&ai_hints, 0, sizeof (ai_hints));
1610   ai_hints.ai_flags = 0;
1611 #ifdef AI_ADDRCONFIG
1612   ai_hints.ai_flags |= AI_ADDRCONFIG;
1613 #endif
1614   ai_hints.ai_family = AF_UNSPEC;
1615   ai_hints.ai_socktype = SOCK_STREAM;
1616
1617   port = NULL;
1618  if (*addr == '[') /* IPv6+port format */
1619   {
1620     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1621     addr++;
1622
1623     port = strchr (addr, ']');
1624     if (port == NULL)
1625     {
1626       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1627           addr_orig);
1628       return (-1);
1629     }
1630     *port = 0;
1631     port++;
1632
1633     if (*port == ':')
1634       port++;
1635     else if (*port == 0)
1636       port = NULL;
1637     else
1638     {
1639       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1640           port);
1641       return (-1);
1642     }
1643   } /* if (*addr = ']') */
1644   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1645   {
1646     port = rindex(addr, ':');
1647     if (port != NULL)
1648     {
1649       *port = 0;
1650       port++;
1651     }
1652   }
1653   ai_res = NULL;
1654   status = getaddrinfo (addr,
1655                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1656                         &ai_hints, &ai_res);
1657   if (status != 0)
1658   {
1659     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1660         "%s", addr, gai_strerror (status));
1661     return (-1);
1662   }
1663
1664   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1665   {
1666     int fd;
1667     listen_socket_t *temp;
1668     int one = 1;
1669
1670     temp = (listen_socket_t *) realloc (listen_fds,
1671         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1672     if (temp == NULL)
1673     {
1674       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1675       continue;
1676     }
1677     listen_fds = temp;
1678     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1679
1680     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1681     if (fd < 0)
1682     {
1683       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1684       continue;
1685     }
1686
1687     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1688
1689     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1690     if (status != 0)
1691     {
1692       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1693       close (fd);
1694       continue;
1695     }
1696
1697     status = listen (fd, /* backlog = */ 10);
1698     if (status != 0)
1699     {
1700       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1701       close (fd);
1702       return (-1);
1703     }
1704
1705     listen_fds[listen_fds_num].fd = fd;
1706     strncpy (listen_fds[listen_fds_num].path, addr,
1707         sizeof (listen_fds[listen_fds_num].path) - 1);
1708     listen_fds_num++;
1709   } /* for (ai_ptr) */
1710
1711   return (0);
1712 } /* }}} int open_listen_socket */
1713
1714 static int close_listen_sockets (void) /* {{{ */
1715 {
1716   size_t i;
1717
1718   for (i = 0; i < listen_fds_num; i++)
1719   {
1720     close (listen_fds[i].fd);
1721     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1722       unlink (listen_fds[i].path + strlen ("unix:"));
1723   }
1724
1725   free (listen_fds);
1726   listen_fds = NULL;
1727   listen_fds_num = 0;
1728
1729   return (0);
1730 } /* }}} int close_listen_sockets */
1731
1732 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1733 {
1734   struct pollfd *pollfds;
1735   int pollfds_num;
1736   int status;
1737   int i;
1738
1739   for (i = 0; i < config_listen_address_list_len; i++)
1740     open_listen_socket (config_listen_address_list[i]);
1741
1742   if (config_listen_address_list_len < 1)
1743     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1744
1745   if (listen_fds_num < 1)
1746   {
1747     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1748         "could be opened. Sorry.");
1749     return (NULL);
1750   }
1751
1752   pollfds_num = listen_fds_num;
1753   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1754   if (pollfds == NULL)
1755   {
1756     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1757     return (NULL);
1758   }
1759   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1760
1761   RRDD_LOG(LOG_INFO, "listening for connections");
1762
1763   while (do_shutdown == 0)
1764   {
1765     assert (pollfds_num == ((int) listen_fds_num));
1766     for (i = 0; i < pollfds_num; i++)
1767     {
1768       pollfds[i].fd = listen_fds[i].fd;
1769       pollfds[i].events = POLLIN | POLLPRI;
1770       pollfds[i].revents = 0;
1771     }
1772
1773     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1774     if (status == 0)
1775     {
1776       continue; /* timeout */
1777     }
1778     else if (status < 0)
1779     {
1780       status = errno;
1781       if (status != EINTR)
1782       {
1783         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1784       }
1785       continue;
1786     }
1787
1788     for (i = 0; i < pollfds_num; i++)
1789     {
1790       int *client_sd;
1791       struct sockaddr_storage client_sa;
1792       socklen_t client_sa_size;
1793       pthread_t tid;
1794       pthread_attr_t attr;
1795
1796       if (pollfds[i].revents == 0)
1797         continue;
1798
1799       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1800       {
1801         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1802             "poll(2) returned something unexpected for listen FD #%i.",
1803             pollfds[i].fd);
1804         continue;
1805       }
1806
1807       client_sd = (int *) malloc (sizeof (int));
1808       if (client_sd == NULL)
1809       {
1810         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1811         continue;
1812       }
1813
1814       client_sa_size = sizeof (client_sa);
1815       *client_sd = accept (pollfds[i].fd,
1816           (struct sockaddr *) &client_sa, &client_sa_size);
1817       if (*client_sd < 0)
1818       {
1819         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1820         continue;
1821       }
1822
1823       pthread_attr_init (&attr);
1824       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1825
1826       status = pthread_create (&tid, &attr, connection_thread_main,
1827           /* args = */ (void *) client_sd);
1828       if (status != 0)
1829       {
1830         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1831         close (*client_sd);
1832         free (client_sd);
1833         continue;
1834       }
1835     } /* for (pollfds_num) */
1836   } /* while (do_shutdown == 0) */
1837
1838   RRDD_LOG(LOG_INFO, "starting shutdown");
1839
1840   close_listen_sockets ();
1841
1842   pthread_mutex_lock (&connection_threads_lock);
1843   while (connection_threads_num > 0)
1844   {
1845     pthread_t wait_for;
1846
1847     wait_for = connection_threads[0];
1848
1849     pthread_mutex_unlock (&connection_threads_lock);
1850     pthread_join (wait_for, /* retval = */ NULL);
1851     pthread_mutex_lock (&connection_threads_lock);
1852   }
1853   pthread_mutex_unlock (&connection_threads_lock);
1854
1855   return (NULL);
1856 } /* }}} void *listen_thread_main */
1857
1858 static int daemonize (void) /* {{{ */
1859 {
1860   int status;
1861   int fd;
1862
1863   /* These structures are static, because `sigaction' behaves weird if the are
1864    * overwritten.. */
1865   static struct sigaction sa_int;
1866   static struct sigaction sa_term;
1867   static struct sigaction sa_pipe;
1868
1869   fd = open_pidfile();
1870   if (fd < 0) return fd;
1871
1872   if (!stay_foreground)
1873   {
1874     pid_t child;
1875     char *base_dir;
1876
1877     child = fork ();
1878     if (child < 0)
1879     {
1880       fprintf (stderr, "daemonize: fork(2) failed.\n");
1881       return (-1);
1882     }
1883     else if (child > 0)
1884     {
1885       return (1);
1886     }
1887
1888     /* Change into the /tmp directory. */
1889     base_dir = (config_base_dir != NULL)
1890       ? config_base_dir
1891       : "/tmp";
1892     status = chdir (base_dir);
1893     if (status != 0)
1894     {
1895       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1896       return (-1);
1897     }
1898
1899     /* Become session leader */
1900     setsid ();
1901
1902     /* Open the first three file descriptors to /dev/null */
1903     close (2);
1904     close (1);
1905     close (0);
1906
1907     open ("/dev/null", O_RDWR);
1908     dup (0);
1909     dup (0);
1910   } /* if (!stay_foreground) */
1911
1912   /* Install signal handlers */
1913   memset (&sa_int, 0, sizeof (sa_int));
1914   sa_int.sa_handler = sig_int_handler;
1915   sigaction (SIGINT, &sa_int, NULL);
1916
1917   memset (&sa_term, 0, sizeof (sa_term));
1918   sa_term.sa_handler = sig_term_handler;
1919   sigaction (SIGTERM, &sa_term, NULL);
1920
1921   memset (&sa_pipe, 0, sizeof (sa_pipe));
1922   sa_pipe.sa_handler = SIG_IGN;
1923   sigaction (SIGPIPE, &sa_pipe, NULL);
1924
1925   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1926   RRDD_LOG(LOG_INFO, "starting up");
1927
1928   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1929   if (cache_tree == NULL)
1930   {
1931     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1932     return (-1);
1933   }
1934
1935   status = write_pidfile (fd);
1936   return status;
1937 } /* }}} int daemonize */
1938
1939 static int cleanup (void) /* {{{ */
1940 {
1941   do_shutdown++;
1942
1943   pthread_cond_signal (&cache_cond);
1944   pthread_join (queue_thread, /* return = */ NULL);
1945
1946   remove_pidfile ();
1947
1948   RRDD_LOG(LOG_INFO, "goodbye");
1949   closelog ();
1950
1951   return (0);
1952 } /* }}} int cleanup */
1953
1954 static int read_options (int argc, char **argv) /* {{{ */
1955 {
1956   int option;
1957   int status = 0;
1958
1959   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1960   {
1961     switch (option)
1962     {
1963       case 'g':
1964         stay_foreground=1;
1965         break;
1966
1967       case 'l':
1968       {
1969         char **temp;
1970
1971         temp = (char **) realloc (config_listen_address_list,
1972             sizeof (char *) * (config_listen_address_list_len + 1));
1973         if (temp == NULL)
1974         {
1975           fprintf (stderr, "read_options: realloc failed.\n");
1976           return (2);
1977         }
1978         config_listen_address_list = temp;
1979
1980         temp[config_listen_address_list_len] = strdup (optarg);
1981         if (temp[config_listen_address_list_len] == NULL)
1982         {
1983           fprintf (stderr, "read_options: strdup failed.\n");
1984           return (2);
1985         }
1986         config_listen_address_list_len++;
1987       }
1988       break;
1989
1990       case 'f':
1991       {
1992         int temp;
1993
1994         temp = atoi (optarg);
1995         if (temp > 0)
1996           config_flush_interval = temp;
1997         else
1998         {
1999           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2000           status = 3;
2001         }
2002       }
2003       break;
2004
2005       case 'w':
2006       {
2007         int temp;
2008
2009         temp = atoi (optarg);
2010         if (temp > 0)
2011           config_write_interval = temp;
2012         else
2013         {
2014           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2015           status = 2;
2016         }
2017       }
2018       break;
2019
2020       case 'z':
2021       {
2022         int temp;
2023
2024         temp = atoi(optarg);
2025         if (temp > 0)
2026           config_write_jitter = temp;
2027         else
2028         {
2029           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2030           status = 2;
2031         }
2032
2033         break;
2034       }
2035
2036       case 'b':
2037       {
2038         size_t len;
2039
2040         if (config_base_dir != NULL)
2041           free (config_base_dir);
2042         config_base_dir = strdup (optarg);
2043         if (config_base_dir == NULL)
2044         {
2045           fprintf (stderr, "read_options: strdup failed.\n");
2046           return (3);
2047         }
2048
2049         len = strlen (config_base_dir);
2050         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2051         {
2052           config_base_dir[len - 1] = 0;
2053           len--;
2054         }
2055
2056         if (len < 1)
2057         {
2058           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2059           return (4);
2060         }
2061       }
2062       break;
2063
2064       case 'p':
2065       {
2066         if (config_pid_file != NULL)
2067           free (config_pid_file);
2068         config_pid_file = strdup (optarg);
2069         if (config_pid_file == NULL)
2070         {
2071           fprintf (stderr, "read_options: strdup failed.\n");
2072           return (3);
2073         }
2074       }
2075       break;
2076
2077       case 'j':
2078       {
2079         struct stat statbuf;
2080         const char *dir = optarg;
2081
2082         status = stat(dir, &statbuf);
2083         if (status != 0)
2084         {
2085           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2086           return 6;
2087         }
2088
2089         if (!S_ISDIR(statbuf.st_mode)
2090             || access(dir, R_OK|W_OK|X_OK) != 0)
2091         {
2092           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2093                   errno ? rrd_strerror(errno) : "");
2094           return 6;
2095         }
2096
2097         journal_cur = malloc(PATH_MAX + 1);
2098         journal_old = malloc(PATH_MAX + 1);
2099         if (journal_cur == NULL || journal_old == NULL)
2100         {
2101           fprintf(stderr, "malloc failure for journal files\n");
2102           return 6;
2103         }
2104         else 
2105         {
2106           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2107           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2108         }
2109       }
2110       break;
2111
2112       case 'h':
2113       case '?':
2114         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2115             "\n"
2116             "Usage: rrdcached [options]\n"
2117             "\n"
2118             "Valid options are:\n"
2119             "  -l <address>  Socket address to listen to.\n"
2120             "  -w <seconds>  Interval in which to write data.\n"
2121             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2122             "  -f <seconds>  Interval in which to flush dead data.\n"
2123             "  -p <file>     Location of the PID-file.\n"
2124             "  -b <dir>      Base directory to change to.\n"
2125             "  -g            Do not fork and run in the foreground.\n"
2126             "  -j <dir>      Directory in which to create the journal files.\n"
2127             "\n"
2128             "For more information and a detailed description of all options "
2129             "please refer\n"
2130             "to the rrdcached(1) manual page.\n",
2131             VERSION);
2132         status = -1;
2133         break;
2134     } /* switch (option) */
2135   } /* while (getopt) */
2136
2137   /* advise the user when values are not sane */
2138   if (config_flush_interval < 2 * config_write_interval)
2139     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2140             " 2x write interval (-w) !\n");
2141   if (config_write_jitter > config_write_interval)
2142     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2143             " write interval (-w) !\n");
2144
2145   return (status);
2146 } /* }}} int read_options */
2147
2148 int main (int argc, char **argv)
2149 {
2150   int status;
2151
2152   status = read_options (argc, argv);
2153   if (status != 0)
2154   {
2155     if (status < 0)
2156       status = 0;
2157     return (status);
2158   }
2159
2160   status = daemonize ();
2161   if (status == 1)
2162   {
2163     struct sigaction sigchld;
2164
2165     memset (&sigchld, 0, sizeof (sigchld));
2166     sigchld.sa_handler = SIG_IGN;
2167     sigaction (SIGCHLD, &sigchld, NULL);
2168
2169     return (0);
2170   }
2171   else if (status != 0)
2172   {
2173     fprintf (stderr, "daemonize failed, exiting.\n");
2174     return (1);
2175   }
2176
2177   if (journal_cur != NULL)
2178   {
2179     int had_journal = 0;
2180
2181     pthread_mutex_lock(&journal_lock);
2182
2183     RRDD_LOG(LOG_INFO, "checking for journal files");
2184
2185     had_journal += journal_replay(journal_old);
2186     had_journal += journal_replay(journal_cur);
2187
2188     if (had_journal)
2189       flush_old_values(-1);
2190
2191     pthread_mutex_unlock(&journal_lock);
2192     journal_rotate();
2193
2194     RRDD_LOG(LOG_INFO, "journal processing complete");
2195   }
2196
2197   /* start the queue thread */
2198   memset (&queue_thread, 0, sizeof (queue_thread));
2199   status = pthread_create (&queue_thread,
2200                            NULL, /* attr */
2201                            queue_thread_main,
2202                            NULL); /* args */
2203   if (status != 0)
2204   {
2205     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2206     cleanup();
2207     return (1);
2208   }
2209
2210   listen_thread_main (NULL);
2211   cleanup ();
2212
2213   return (0);
2214 } /* int main */
2215
2216 /*
2217  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2218  */