This patch removes an extra "SIGNALS" section in the rrdcached.pod and
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 struct listen_socket_s
105 {
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
110
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
125
126 struct callback_flush_data_s
127 {
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
134
135 enum queue_side_e
136 {
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
141
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
144
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
149
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
152
153 static int do_shutdown = 0;
154
155 static pthread_t queue_thread;
156
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
160
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
167
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static int config_flush_at_shutdown = 0;
172 static char *config_pid_file = NULL;
173 static char *config_base_dir = NULL;
174
175 static char **config_listen_address_list = NULL;
176 static int config_listen_address_list_len = 0;
177
178 static uint64_t stats_queue_length = 0;
179 static uint64_t stats_updates_received = 0;
180 static uint64_t stats_flush_received = 0;
181 static uint64_t stats_updates_written = 0;
182 static uint64_t stats_data_sets_written = 0;
183 static uint64_t stats_journal_bytes = 0;
184 static uint64_t stats_journal_rotate = 0;
185 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186
187 /* Journaled updates */
188 static char *journal_cur = NULL;
189 static char *journal_old = NULL;
190 static FILE *journal_fh = NULL;
191 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
192 static int journal_write(char *cmd, char *args);
193 static void journal_done(void);
194 static void journal_rotate(void);
195
196 /* 
197  * Functions
198  */
199 static void sig_common (const char *sig) /* {{{ */
200 {
201   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
202   do_shutdown++;
203   pthread_cond_broadcast(&cache_cond);
204 } /* }}} void sig_common */
205
206 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
207 {
208   sig_common("INT");
209 } /* }}} void sig_int_handler */
210
211 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
212 {
213   sig_common("TERM");
214 } /* }}} void sig_term_handler */
215
216 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
217 {
218   config_flush_at_shutdown = 1;
219   sig_common("USR1");
220 } /* }}} void sig_usr1_handler */
221
222 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
223 {
224   config_flush_at_shutdown = 0;
225   sig_common("USR2");
226 } /* }}} void sig_usr2_handler */
227
228 static void install_signal_handlers(void) /* {{{ */
229 {
230   /* These structures are static, because `sigaction' behaves weird if the are
231    * overwritten.. */
232   static struct sigaction sa_int;
233   static struct sigaction sa_term;
234   static struct sigaction sa_pipe;
235   static struct sigaction sa_usr1;
236   static struct sigaction sa_usr2;
237
238   /* Install signal handlers */
239   memset (&sa_int, 0, sizeof (sa_int));
240   sa_int.sa_handler = sig_int_handler;
241   sigaction (SIGINT, &sa_int, NULL);
242
243   memset (&sa_term, 0, sizeof (sa_term));
244   sa_term.sa_handler = sig_term_handler;
245   sigaction (SIGTERM, &sa_term, NULL);
246
247   memset (&sa_pipe, 0, sizeof (sa_pipe));
248   sa_pipe.sa_handler = SIG_IGN;
249   sigaction (SIGPIPE, &sa_pipe, NULL);
250
251   memset (&sa_pipe, 0, sizeof (sa_usr1));
252   sa_usr1.sa_handler = sig_usr1_handler;
253   sigaction (SIGUSR1, &sa_usr1, NULL);
254
255   memset (&sa_usr2, 0, sizeof (sa_usr2));
256   sa_usr2.sa_handler = sig_usr2_handler;
257   sigaction (SIGUSR2, &sa_usr2, NULL);
258
259 } /* }}} void install_signal_handlers */
260
261 static int open_pidfile(void) /* {{{ */
262 {
263   int fd;
264   char *file;
265
266   file = (config_pid_file != NULL)
267     ? config_pid_file
268     : LOCALSTATEDIR "/run/rrdcached.pid";
269
270   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
271   if (fd < 0)
272     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
273             file, rrd_strerror(errno));
274
275   return(fd);
276 } /* }}} static int open_pidfile */
277
278 static int write_pidfile (int fd) /* {{{ */
279 {
280   pid_t pid;
281   FILE *fh;
282
283   pid = getpid ();
284
285   fh = fdopen (fd, "w");
286   if (fh == NULL)
287   {
288     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
289     close(fd);
290     return (-1);
291   }
292
293   fprintf (fh, "%i\n", (int) pid);
294   fclose (fh);
295
296   return (0);
297 } /* }}} int write_pidfile */
298
299 static int remove_pidfile (void) /* {{{ */
300 {
301   char *file;
302   int status;
303
304   file = (config_pid_file != NULL)
305     ? config_pid_file
306     : LOCALSTATEDIR "/run/rrdcached.pid";
307
308   status = unlink (file);
309   if (status == 0)
310     return (0);
311   return (errno);
312 } /* }}} int remove_pidfile */
313
314 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
315 {
316   char    *buffer;
317   size_t   buffer_used;
318   size_t   buffer_free;
319   ssize_t  status;
320
321   buffer       = (char *) buffer_void;
322   buffer_used  = 0;
323   buffer_free  = buffer_size;
324
325   while (buffer_free > 0)
326   {
327     status = read (fd, buffer + buffer_used, buffer_free);
328     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
329       continue;
330
331     if (status < 0)
332       return (-1);
333
334     if (status == 0)
335       return (0);
336
337     assert ((0 > status) || (buffer_free >= (size_t) status));
338
339     buffer_free = buffer_free - status;
340     buffer_used = buffer_used + status;
341
342     if (buffer[buffer_used - 1] == '\n')
343       break;
344   }
345
346   assert (buffer_used > 0);
347
348   if (buffer[buffer_used - 1] != '\n')
349   {
350     errno = ENOBUFS;
351     return (-1);
352   }
353
354   buffer[buffer_used - 1] = 0;
355
356   /* Fix network line endings. */
357   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
358   {
359     buffer_used--;
360     buffer[buffer_used - 1] = 0;
361   }
362
363   return (buffer_used);
364 } /* }}} ssize_t sread */
365
366 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
367 {
368   const char *ptr;
369   size_t      nleft;
370   ssize_t     status;
371
372   /* special case for journal replay */
373   if (fd < 0) return 0;
374
375   ptr   = (const char *) buf;
376   nleft = count;
377
378   while (nleft > 0)
379   {
380     status = write (fd, (const void *) ptr, nleft);
381
382     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
383       continue;
384
385     if (status < 0)
386       return (status);
387
388     nleft -= status;
389     ptr   += status;
390   }
391
392   return (0);
393 } /* }}} ssize_t swrite */
394
395 static void _wipe_ci_values(cache_item_t *ci, time_t when)
396 {
397   ci->values = NULL;
398   ci->values_num = 0;
399
400   ci->last_flush_time = when;
401   if (config_write_jitter > 0)
402     ci->last_flush_time += (random() % config_write_jitter);
403
404   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
405 }
406
407 /*
408  * enqueue_cache_item:
409  * `cache_lock' must be acquired before calling this function!
410  */
411 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
412     queue_side_t side)
413 {
414   int did_insert = 0;
415
416   if (ci == NULL)
417     return (-1);
418
419   if (ci->values_num == 0)
420     return (0);
421
422   if (side == HEAD)
423   {
424     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
425     {
426       assert (ci->next == NULL);
427       ci->next = cache_queue_head;
428       cache_queue_head = ci;
429
430       if (cache_queue_tail == NULL)
431         cache_queue_tail = cache_queue_head;
432
433       did_insert = 1;
434     }
435     else if (cache_queue_head == ci)
436     {
437       /* do nothing */
438     }
439     else /* enqueued, but not first entry */
440     {
441       cache_item_t *prev;
442
443       /* find previous entry */
444       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
445         if (prev->next == ci)
446           break;
447       assert (prev != NULL);
448
449       /* move to the front */
450       prev->next = ci->next;
451       ci->next = cache_queue_head;
452       cache_queue_head = ci;
453
454       /* check if we need to adapt the tail */
455       if (cache_queue_tail == ci)
456         cache_queue_tail = prev;
457     }
458   }
459   else /* (side == TAIL) */
460   {
461     /* We don't move values back in the list.. */
462     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
463       return (0);
464
465     assert (ci->next == NULL);
466
467     if (cache_queue_tail == NULL)
468       cache_queue_head = ci;
469     else
470       cache_queue_tail->next = ci;
471     cache_queue_tail = ci;
472
473     did_insert = 1;
474   }
475
476   ci->flags |= CI_FLAGS_IN_QUEUE;
477
478   if (did_insert)
479   {
480     pthread_cond_broadcast(&cache_cond);
481     pthread_mutex_lock (&stats_lock);
482     stats_queue_length++;
483     pthread_mutex_unlock (&stats_lock);
484   }
485
486   return (0);
487 } /* }}} int enqueue_cache_item */
488
489 /*
490  * tree_callback_flush:
491  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
492  * while this is in progress.
493  */
494 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
495     gpointer data)
496 {
497   cache_item_t *ci;
498   callback_flush_data_t *cfd;
499
500   ci = (cache_item_t *) value;
501   cfd = (callback_flush_data_t *) data;
502
503   if ((ci->last_flush_time <= cfd->abs_timeout)
504       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
505       && (ci->values_num > 0))
506   {
507     enqueue_cache_item (ci, TAIL);
508   }
509   else if ((do_shutdown != 0)
510       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
511       && (ci->values_num > 0))
512   {
513     enqueue_cache_item (ci, TAIL);
514   }
515   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
516       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
517       && (ci->values_num <= 0))
518   {
519     char **temp;
520
521     temp = (char **) realloc (cfd->keys,
522         sizeof (char *) * (cfd->keys_num + 1));
523     if (temp == NULL)
524     {
525       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
526       return (FALSE);
527     }
528     cfd->keys = temp;
529     /* Make really sure this points to the _same_ place */
530     assert ((char *) key == ci->file);
531     cfd->keys[cfd->keys_num] = (char *) key;
532     cfd->keys_num++;
533   }
534
535   return (FALSE);
536 } /* }}} gboolean tree_callback_flush */
537
538 static int flush_old_values (int max_age)
539 {
540   callback_flush_data_t cfd;
541   size_t k;
542
543   memset (&cfd, 0, sizeof (cfd));
544   /* Pass the current time as user data so that we don't need to call
545    * `time' for each node. */
546   cfd.now = time (NULL);
547   cfd.keys = NULL;
548   cfd.keys_num = 0;
549
550   if (max_age > 0)
551     cfd.abs_timeout = cfd.now - max_age;
552   else
553     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
554
555   /* `tree_callback_flush' will return the keys of all values that haven't
556    * been touched in the last `config_flush_interval' seconds in `cfd'.
557    * The char*'s in this array point to the same memory as ci->file, so we
558    * don't need to free them separately. */
559   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
560
561   for (k = 0; k < cfd.keys_num; k++)
562   {
563     cache_item_t *ci;
564
565     /* This must not fail. */
566     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
567     assert (ci != NULL);
568
569     /* If we end up here with values available, something's seriously
570      * messed up. */
571     assert (ci->values_num == 0);
572
573     /* Remove the node from the tree */
574     g_tree_remove (cache_tree, cfd.keys[k]);
575     cfd.keys[k] = NULL;
576
577     /* Now free and clean up `ci'. */
578     free (ci->file);
579     ci->file = NULL;
580     free (ci);
581     ci = NULL;
582   } /* for (k = 0; k < cfd.keys_num; k++) */
583
584   if (cfd.keys != NULL)
585   {
586     free (cfd.keys);
587     cfd.keys = NULL;
588   }
589
590   return (0);
591 } /* int flush_old_values */
592
593 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
594 {
595   struct timeval now;
596   struct timespec next_flush;
597   int final_flush = 0; /* make sure we only flush once on shutdown */
598
599   gettimeofday (&now, NULL);
600   next_flush.tv_sec = now.tv_sec + config_flush_interval;
601   next_flush.tv_nsec = 1000 * now.tv_usec;
602
603   pthread_mutex_lock (&cache_lock);
604   while ((do_shutdown == 0) || (cache_queue_head != NULL))
605   {
606     cache_item_t *ci;
607     char *file;
608     char **values;
609     int values_num;
610     int status;
611     int i;
612
613     /* First, check if it's time to do the cache flush. */
614     gettimeofday (&now, NULL);
615     if ((now.tv_sec > next_flush.tv_sec)
616         || ((now.tv_sec == next_flush.tv_sec)
617           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
618     {
619       /* Flush all values that haven't been written in the last
620        * `config_write_interval' seconds. */
621       flush_old_values (config_write_interval);
622
623       /* Determine the time of the next cache flush. */
624       while (next_flush.tv_sec <= now.tv_sec)
625         next_flush.tv_sec += config_flush_interval;
626
627       /* unlock the cache while we rotate so we don't block incoming
628        * updates if the fsync() blocks on disk I/O */
629       pthread_mutex_unlock(&cache_lock);
630       journal_rotate();
631       pthread_mutex_lock(&cache_lock);
632     }
633
634     /* Now, check if there's something to store away. If not, wait until
635      * something comes in or it's time to do the cache flush.  if we are
636      * shutting down, do not wait around.  */
637     if (cache_queue_head == NULL && !do_shutdown)
638     {
639       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
640       if ((status != 0) && (status != ETIMEDOUT))
641       {
642         RRDD_LOG (LOG_ERR, "queue_thread_main: "
643             "pthread_cond_timedwait returned %i.", status);
644       }
645     }
646
647     /* We're about to shut down */
648     if (do_shutdown != 0 && !final_flush++)
649     {
650       if (config_flush_at_shutdown)
651         flush_old_values (-1); /* flush everything */
652       else
653         break;
654     }
655
656     /* Check if a value has arrived. This may be NULL if we timed out or there
657      * was an interrupt such as a signal. */
658     if (cache_queue_head == NULL)
659       continue;
660
661     ci = cache_queue_head;
662
663     /* copy the relevant parts */
664     file = strdup (ci->file);
665     if (file == NULL)
666     {
667       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
668       continue;
669     }
670
671     assert(ci->values != NULL);
672     assert(ci->values_num > 0);
673
674     values = ci->values;
675     values_num = ci->values_num;
676
677     _wipe_ci_values(ci, time(NULL));
678
679     cache_queue_head = ci->next;
680     if (cache_queue_head == NULL)
681       cache_queue_tail = NULL;
682     ci->next = NULL;
683
684     pthread_mutex_lock (&stats_lock);
685     assert (stats_queue_length > 0);
686     stats_queue_length--;
687     pthread_mutex_unlock (&stats_lock);
688
689     pthread_mutex_unlock (&cache_lock);
690
691     rrd_clear_error ();
692     status = rrd_update_r (file, NULL, values_num, (void *) values);
693     if (status != 0)
694     {
695       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
696           "rrd_update_r (%s) failed with status %i. (%s)",
697           file, status, rrd_get_error());
698     }
699
700     journal_write("wrote", file);
701     pthread_cond_broadcast(&ci->flushed);
702
703     for (i = 0; i < values_num; i++)
704       free (values[i]);
705
706     free(values);
707     free(file);
708
709     if (status == 0)
710     {
711       pthread_mutex_lock (&stats_lock);
712       stats_updates_written++;
713       stats_data_sets_written += values_num;
714       pthread_mutex_unlock (&stats_lock);
715     }
716
717     pthread_mutex_lock (&cache_lock);
718
719     /* We're about to shut down */
720     if (do_shutdown != 0 && !final_flush++)
721     {
722       if (config_flush_at_shutdown)
723           flush_old_values (-1); /* flush everything */
724       else
725         break;
726     }
727   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
728   pthread_mutex_unlock (&cache_lock);
729
730   if (config_flush_at_shutdown)
731   {
732     assert(cache_queue_head == NULL);
733     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
734   }
735
736   journal_done();
737
738   return (NULL);
739 } /* }}} void *queue_thread_main */
740
741 static int buffer_get_field (char **buffer_ret, /* {{{ */
742     size_t *buffer_size_ret, char **field_ret)
743 {
744   char *buffer;
745   size_t buffer_pos;
746   size_t buffer_size;
747   char *field;
748   size_t field_size;
749   int status;
750
751   buffer = *buffer_ret;
752   buffer_pos = 0;
753   buffer_size = *buffer_size_ret;
754   field = *buffer_ret;
755   field_size = 0;
756
757   if (buffer_size <= 0)
758     return (-1);
759
760   /* This is ensured by `handle_request'. */
761   assert (buffer[buffer_size - 1] == '\0');
762
763   status = -1;
764   while (buffer_pos < buffer_size)
765   {
766     /* Check for end-of-field or end-of-buffer */
767     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
768     {
769       field[field_size] = 0;
770       field_size++;
771       buffer_pos++;
772       status = 0;
773       break;
774     }
775     /* Handle escaped characters. */
776     else if (buffer[buffer_pos] == '\\')
777     {
778       if (buffer_pos >= (buffer_size - 1))
779         break;
780       buffer_pos++;
781       field[field_size] = buffer[buffer_pos];
782       field_size++;
783       buffer_pos++;
784     }
785     /* Normal operation */ 
786     else
787     {
788       field[field_size] = buffer[buffer_pos];
789       field_size++;
790       buffer_pos++;
791     }
792   } /* while (buffer_pos < buffer_size) */
793
794   if (status != 0)
795     return (status);
796
797   *buffer_ret = buffer + buffer_pos;
798   *buffer_size_ret = buffer_size - buffer_pos;
799   *field_ret = field;
800
801   return (0);
802 } /* }}} int buffer_get_field */
803
804 static int flush_file (const char *filename) /* {{{ */
805 {
806   cache_item_t *ci;
807
808   pthread_mutex_lock (&cache_lock);
809
810   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
811   if (ci == NULL)
812   {
813     pthread_mutex_unlock (&cache_lock);
814     return (ENOENT);
815   }
816
817   if (ci->values_num > 0)
818   {
819     /* Enqueue at head */
820     enqueue_cache_item (ci, HEAD);
821     pthread_cond_wait(&ci->flushed, &cache_lock);
822   }
823
824   pthread_mutex_unlock(&cache_lock);
825
826   return (0);
827 } /* }}} int flush_file */
828
829 static int handle_request_help (int fd, /* {{{ */
830     char *buffer, size_t buffer_size)
831 {
832   int status;
833   char **help_text;
834   size_t help_text_len;
835   char *command;
836   size_t i;
837
838   char *help_help[] =
839   {
840     "5 Command overview\n",
841     "FLUSH <filename>\n",
842     "FLUSHALL\n",
843     "HELP [<command>]\n",
844     "UPDATE <filename> <values> [<values> ...]\n",
845     "STATS\n"
846   };
847   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
848
849   char *help_flush[] =
850   {
851     "4 Help for FLUSH\n",
852     "Usage: FLUSH <filename>\n",
853     "\n",
854     "Adds the given filename to the head of the update queue and returns\n",
855     "after is has been dequeued.\n"
856   };
857   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
858
859   char *help_flushall[] =
860   {
861     "3 Help for FLUSHALL\n",
862     "Usage: FLUSHALL\n",
863     "\n",
864     "Triggers writing of all pending updates.  Returns immediately.\n"
865   };
866   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
867
868   char *help_update[] =
869   {
870     "9 Help for UPDATE\n",
871     "Usage: UPDATE <filename> <values> [<values> ...]\n"
872     "\n",
873     "Adds the given file to the internal cache if it is not yet known and\n",
874     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
875     "for details.\n",
876     "\n",
877     "Each <values> has the following form:\n",
878     "  <values> = <time>:<value>[:<value>[...]]\n",
879     "See the rrdupdate(1) manpage for details.\n"
880   };
881   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
882
883   char *help_stats[] =
884   {
885     "4 Help for STATS\n",
886     "Usage: STATS\n",
887     "\n",
888     "Returns some performance counters, see the rrdcached(1) manpage for\n",
889     "a description of the values.\n"
890   };
891   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
892
893   status = buffer_get_field (&buffer, &buffer_size, &command);
894   if (status != 0)
895   {
896     help_text = help_help;
897     help_text_len = help_help_len;
898   }
899   else
900   {
901     if (strcasecmp (command, "update") == 0)
902     {
903       help_text = help_update;
904       help_text_len = help_update_len;
905     }
906     else if (strcasecmp (command, "flush") == 0)
907     {
908       help_text = help_flush;
909       help_text_len = help_flush_len;
910     }
911     else if (strcasecmp (command, "flushall") == 0)
912     {
913       help_text = help_flushall;
914       help_text_len = help_flushall_len;
915     }
916     else if (strcasecmp (command, "stats") == 0)
917     {
918       help_text = help_stats;
919       help_text_len = help_stats_len;
920     }
921     else
922     {
923       help_text = help_help;
924       help_text_len = help_help_len;
925     }
926   }
927
928   for (i = 0; i < help_text_len; i++)
929   {
930     status = swrite (fd, help_text[i], strlen (help_text[i]));
931     if (status < 0)
932     {
933       status = errno;
934       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
935       return (status);
936     }
937   }
938
939   return (0);
940 } /* }}} int handle_request_help */
941
942 static int handle_request_stats (int fd, /* {{{ */
943     char *buffer __attribute__((unused)),
944     size_t buffer_size __attribute__((unused)))
945 {
946   int status;
947   char outbuf[CMD_MAX];
948
949   uint64_t copy_queue_length;
950   uint64_t copy_updates_received;
951   uint64_t copy_flush_received;
952   uint64_t copy_updates_written;
953   uint64_t copy_data_sets_written;
954   uint64_t copy_journal_bytes;
955   uint64_t copy_journal_rotate;
956
957   uint64_t tree_nodes_number;
958   uint64_t tree_depth;
959
960   pthread_mutex_lock (&stats_lock);
961   copy_queue_length       = stats_queue_length;
962   copy_updates_received   = stats_updates_received;
963   copy_flush_received     = stats_flush_received;
964   copy_updates_written    = stats_updates_written;
965   copy_data_sets_written  = stats_data_sets_written;
966   copy_journal_bytes      = stats_journal_bytes;
967   copy_journal_rotate     = stats_journal_rotate;
968   pthread_mutex_unlock (&stats_lock);
969
970   pthread_mutex_lock (&cache_lock);
971   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
972   tree_depth        = (uint64_t) g_tree_height (cache_tree);
973   pthread_mutex_unlock (&cache_lock);
974
975 #define RRDD_STATS_SEND \
976   outbuf[sizeof (outbuf) - 1] = 0; \
977   status = swrite (fd, outbuf, strlen (outbuf)); \
978   if (status < 0) \
979   { \
980     status = errno; \
981     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
982     return (status); \
983   }
984
985   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
986   RRDD_STATS_SEND;
987
988   snprintf (outbuf, sizeof (outbuf),
989       "QueueLength: %"PRIu64"\n", copy_queue_length);
990   RRDD_STATS_SEND;
991
992   snprintf (outbuf, sizeof (outbuf),
993       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
994   RRDD_STATS_SEND;
995
996   snprintf (outbuf, sizeof (outbuf),
997       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
998   RRDD_STATS_SEND;
999
1000   snprintf (outbuf, sizeof (outbuf),
1001       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1002   RRDD_STATS_SEND;
1003
1004   snprintf (outbuf, sizeof (outbuf),
1005       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1006   RRDD_STATS_SEND;
1007
1008   snprintf (outbuf, sizeof (outbuf),
1009       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1010   RRDD_STATS_SEND;
1011
1012   snprintf (outbuf, sizeof (outbuf),
1013       "TreeDepth: %"PRIu64"\n", tree_depth);
1014   RRDD_STATS_SEND;
1015
1016   snprintf (outbuf, sizeof(outbuf),
1017       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1018   RRDD_STATS_SEND;
1019
1020   snprintf (outbuf, sizeof(outbuf),
1021       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1022   RRDD_STATS_SEND;
1023
1024   return (0);
1025 #undef RRDD_STATS_SEND
1026 } /* }}} int handle_request_stats */
1027
1028 static int handle_request_flush (int fd, /* {{{ */
1029     char *buffer, size_t buffer_size)
1030 {
1031   char *file;
1032   int status;
1033   char result[CMD_MAX];
1034
1035   status = buffer_get_field (&buffer, &buffer_size, &file);
1036   if (status != 0)
1037   {
1038     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1039   }
1040   else
1041   {
1042     pthread_mutex_lock(&stats_lock);
1043     stats_flush_received++;
1044     pthread_mutex_unlock(&stats_lock);
1045
1046     status = flush_file (file);
1047     if (status == 0)
1048       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1049     else if (status == ENOENT)
1050     {
1051       /* no file in our tree; see whether it exists at all */
1052       struct stat statbuf;
1053
1054       memset(&statbuf, 0, sizeof(statbuf));
1055       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1056         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1057       else
1058         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1059     }
1060     else if (status < 0)
1061       strncpy (result, "-1 Internal error.\n", sizeof (result));
1062     else
1063       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1064   }
1065   result[sizeof (result) - 1] = 0;
1066
1067   status = swrite (fd, result, strlen (result));
1068   if (status < 0)
1069   {
1070     status = errno;
1071     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1072     return (status);
1073   }
1074
1075   return (0);
1076 } /* }}} int handle_request_flush */
1077
1078 static int handle_request_flushall(int fd) /* {{{ */
1079 {
1080   int status;
1081   char answer[] ="0 Started flush.\n";
1082
1083   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1084
1085   pthread_mutex_lock(&cache_lock);
1086   flush_old_values(-1);
1087   pthread_mutex_unlock(&cache_lock);
1088
1089   status = swrite(fd, answer, strlen(answer));
1090   if (status < 0)
1091   {
1092     status = errno;
1093     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1094   }
1095
1096   return (status);
1097 } /* }}} static int handle_request_flushall */
1098
1099 static int handle_request_update (int fd, /* {{{ */
1100     char *buffer, size_t buffer_size)
1101 {
1102   char *file;
1103   int values_num = 0;
1104   int status;
1105
1106   time_t now;
1107
1108   cache_item_t *ci;
1109   char answer[CMD_MAX];
1110
1111 #define RRDD_UPDATE_SEND \
1112   answer[sizeof (answer) - 1] = 0; \
1113   status = swrite (fd, answer, strlen (answer)); \
1114   if (status < 0) \
1115   { \
1116     status = errno; \
1117     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1118     return (status); \
1119   }
1120
1121   now = time (NULL);
1122
1123   status = buffer_get_field (&buffer, &buffer_size, &file);
1124   if (status != 0)
1125   {
1126     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1127         sizeof (answer));
1128     RRDD_UPDATE_SEND;
1129     return (0);
1130   }
1131
1132   pthread_mutex_lock(&stats_lock);
1133   stats_updates_received++;
1134   pthread_mutex_unlock(&stats_lock);
1135
1136   pthread_mutex_lock (&cache_lock);
1137   ci = g_tree_lookup (cache_tree, file);
1138
1139   if (ci == NULL) /* {{{ */
1140   {
1141     struct stat statbuf;
1142
1143     /* don't hold the lock while we setup; stat(2) might block */
1144     pthread_mutex_unlock(&cache_lock);
1145
1146     memset (&statbuf, 0, sizeof (statbuf));
1147     status = stat (file, &statbuf);
1148     if (status != 0)
1149     {
1150       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1151
1152       status = errno;
1153       if (status == ENOENT)
1154         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1155       else
1156         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1157             status);
1158       RRDD_UPDATE_SEND;
1159       return (0);
1160     }
1161     if (!S_ISREG (statbuf.st_mode))
1162     {
1163       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1164       RRDD_UPDATE_SEND;
1165       return (0);
1166     }
1167     if (access(file, R_OK|W_OK) != 0)
1168     {
1169       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1170                 file, rrd_strerror(errno));
1171       RRDD_UPDATE_SEND;
1172       return (0);
1173     }
1174
1175     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1176     if (ci == NULL)
1177     {
1178       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1179
1180       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1181       RRDD_UPDATE_SEND;
1182       return (0);
1183     }
1184     memset (ci, 0, sizeof (cache_item_t));
1185
1186     ci->file = strdup (file);
1187     if (ci->file == NULL)
1188     {
1189       free (ci);
1190       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1191
1192       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1193       RRDD_UPDATE_SEND;
1194       return (0);
1195     }
1196
1197     _wipe_ci_values(ci, now);
1198     ci->flags = CI_FLAGS_IN_TREE;
1199
1200     pthread_mutex_lock(&cache_lock);
1201     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1202   } /* }}} */
1203   assert (ci != NULL);
1204
1205   while (buffer_size > 0)
1206   {
1207     char **temp;
1208     char *value;
1209
1210     status = buffer_get_field (&buffer, &buffer_size, &value);
1211     if (status != 0)
1212     {
1213       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1214       break;
1215     }
1216
1217     temp = (char **) realloc (ci->values,
1218         sizeof (char *) * (ci->values_num + 1));
1219     if (temp == NULL)
1220     {
1221       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1222       continue;
1223     }
1224     ci->values = temp;
1225
1226     ci->values[ci->values_num] = strdup (value);
1227     if (ci->values[ci->values_num] == NULL)
1228     {
1229       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1230       continue;
1231     }
1232     ci->values_num++;
1233
1234     values_num++;
1235   }
1236
1237   if (((now - ci->last_flush_time) >= config_write_interval)
1238       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1239       && (ci->values_num > 0))
1240   {
1241     enqueue_cache_item (ci, TAIL);
1242   }
1243
1244   pthread_mutex_unlock (&cache_lock);
1245
1246   if (values_num < 1)
1247   {
1248     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1249   }
1250   else
1251   {
1252     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1253         (values_num == 1) ? "" : "s");
1254   }
1255   RRDD_UPDATE_SEND;
1256   return (0);
1257 #undef RRDD_UPDATE_SEND
1258 } /* }}} int handle_request_update */
1259
1260 /* we came across a "WROTE" entry during journal replay.
1261  * throw away any values that we have accumulated for this file
1262  */
1263 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1264                                  const char *buffer,
1265                                  size_t buffer_size __attribute__((unused)))
1266 {
1267   int i;
1268   cache_item_t *ci;
1269   const char *file = buffer;
1270
1271   pthread_mutex_lock(&cache_lock);
1272
1273   ci = g_tree_lookup(cache_tree, file);
1274   if (ci == NULL)
1275   {
1276     pthread_mutex_unlock(&cache_lock);
1277     return (0);
1278   }
1279
1280   if (ci->values)
1281   {
1282     for (i=0; i < ci->values_num; i++)
1283       free(ci->values[i]);
1284
1285     free(ci->values);
1286   }
1287
1288   _wipe_ci_values(ci, time(NULL));
1289
1290   pthread_mutex_unlock(&cache_lock);
1291   return (0);
1292 } /* }}} int handle_request_wrote */
1293
1294 /* if fd < 0, we are in journal replay mode */
1295 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1296 {
1297   char *buffer_ptr;
1298   char *command;
1299   int status;
1300
1301   assert (buffer[buffer_size - 1] == '\0');
1302
1303   buffer_ptr = buffer;
1304   command = NULL;
1305   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1306   if (status != 0)
1307   {
1308     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1309     return (-1);
1310   }
1311
1312   if (strcasecmp (command, "update") == 0)
1313   {
1314     /* don't re-write updates in replay mode */
1315     if (fd >= 0)
1316       journal_write(command, buffer_ptr);
1317
1318     return (handle_request_update (fd, buffer_ptr, buffer_size));
1319   }
1320   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1321   {
1322     /* this is only valid in replay mode */
1323     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1324   }
1325   else if (strcasecmp (command, "flush") == 0)
1326   {
1327     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1328   }
1329   else if (strcasecmp (command, "flushall") == 0)
1330   {
1331     return (handle_request_flushall(fd));
1332   }
1333   else if (strcasecmp (command, "stats") == 0)
1334   {
1335     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1336   }
1337   else if (strcasecmp (command, "help") == 0)
1338   {
1339     return (handle_request_help (fd, buffer_ptr, buffer_size));
1340   }
1341   else
1342   {
1343     char result[CMD_MAX];
1344
1345     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1346     result[sizeof (result) - 1] = 0;
1347
1348     status = swrite (fd, result, strlen (result));
1349     if (status < 0)
1350     {
1351       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1352       return (-1);
1353     }
1354   }
1355
1356   return (0);
1357 } /* }}} int handle_request */
1358
1359 /* MUST NOT hold journal_lock before calling this */
1360 static void journal_rotate(void) /* {{{ */
1361 {
1362   FILE *old_fh = NULL;
1363
1364   if (journal_cur == NULL || journal_old == NULL)
1365     return;
1366
1367   pthread_mutex_lock(&journal_lock);
1368
1369   /* we rotate this way (rename before close) so that the we can release
1370    * the journal lock as fast as possible.  Journal writes to the new
1371    * journal can proceed immediately after the new file is opened.  The
1372    * fclose can then block without affecting new updates.
1373    */
1374   if (journal_fh != NULL)
1375   {
1376     old_fh = journal_fh;
1377     rename(journal_cur, journal_old);
1378     ++stats_journal_rotate;
1379   }
1380
1381   journal_fh = fopen(journal_cur, "a");
1382   pthread_mutex_unlock(&journal_lock);
1383
1384   if (old_fh != NULL)
1385     fclose(old_fh);
1386
1387   if (journal_fh == NULL)
1388   {
1389     RRDD_LOG(LOG_CRIT,
1390              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1391              journal_cur, rrd_strerror(errno));
1392
1393     RRDD_LOG(LOG_ERR,
1394              "JOURNALING DISABLED: All values will be flushed at shutdown");
1395     config_flush_at_shutdown = 1;
1396   }
1397
1398 } /* }}} static void journal_rotate */
1399
1400 static void journal_done(void) /* {{{ */
1401 {
1402   if (journal_cur == NULL)
1403     return;
1404
1405   pthread_mutex_lock(&journal_lock);
1406   if (journal_fh != NULL)
1407   {
1408     fclose(journal_fh);
1409     journal_fh = NULL;
1410   }
1411
1412   if (config_flush_at_shutdown)
1413   {
1414     RRDD_LOG(LOG_INFO, "removing journals");
1415     unlink(journal_old);
1416     unlink(journal_cur);
1417   }
1418   else
1419   {
1420     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1421              "journals will be used at next startup");
1422   }
1423
1424   pthread_mutex_unlock(&journal_lock);
1425
1426 } /* }}} static void journal_done */
1427
1428 static int journal_write(char *cmd, char *args) /* {{{ */
1429 {
1430   int chars;
1431
1432   if (journal_fh == NULL)
1433     return 0;
1434
1435   pthread_mutex_lock(&journal_lock);
1436   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1437   pthread_mutex_unlock(&journal_lock);
1438
1439   if (chars > 0)
1440   {
1441     pthread_mutex_lock(&stats_lock);
1442     stats_journal_bytes += chars;
1443     pthread_mutex_unlock(&stats_lock);
1444   }
1445
1446   return chars;
1447 } /* }}} static int journal_write */
1448
1449 static int journal_replay (const char *file) /* {{{ */
1450 {
1451   FILE *fh;
1452   int entry_cnt = 0;
1453   int fail_cnt = 0;
1454   uint64_t line = 0;
1455   char entry[CMD_MAX];
1456
1457   if (file == NULL) return 0;
1458
1459   fh = fopen(file, "r");
1460   if (fh == NULL)
1461   {
1462     if (errno != ENOENT)
1463       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1464                file, rrd_strerror(errno));
1465     return 0;
1466   }
1467   else
1468     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1469
1470   while(!feof(fh))
1471   {
1472     size_t entry_len;
1473
1474     ++line;
1475     fgets(entry, sizeof(entry), fh);
1476     entry_len = strlen(entry);
1477
1478     /* check \n termination in case journal writing crashed mid-line */
1479     if (entry_len == 0)
1480       continue;
1481     else if (entry[entry_len - 1] != '\n')
1482     {
1483       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1484       ++fail_cnt;
1485       continue;
1486     }
1487
1488     entry[entry_len - 1] = '\0';
1489
1490     if (handle_request(-1, entry, entry_len) == 0)
1491       ++entry_cnt;
1492     else
1493       ++fail_cnt;
1494   }
1495
1496   fclose(fh);
1497
1498   if (entry_cnt > 0)
1499   {
1500     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1501              entry_cnt, fail_cnt);
1502     return 1;
1503   }
1504   else
1505     return 0;
1506
1507 } /* }}} static int journal_replay */
1508
1509 static void *connection_thread_main (void *args) /* {{{ */
1510 {
1511   pthread_t self;
1512   int i;
1513   int fd;
1514   
1515   fd = *((int *) args);
1516   free (args);
1517
1518   pthread_mutex_lock (&connection_threads_lock);
1519   {
1520     pthread_t *temp;
1521
1522     temp = (pthread_t *) realloc (connection_threads,
1523         sizeof (pthread_t) * (connection_threads_num + 1));
1524     if (temp == NULL)
1525     {
1526       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1527     }
1528     else
1529     {
1530       connection_threads = temp;
1531       connection_threads[connection_threads_num] = pthread_self ();
1532       connection_threads_num++;
1533     }
1534   }
1535   pthread_mutex_unlock (&connection_threads_lock);
1536
1537   while (do_shutdown == 0)
1538   {
1539     char buffer[CMD_MAX];
1540
1541     struct pollfd pollfd;
1542     int status;
1543
1544     pollfd.fd = fd;
1545     pollfd.events = POLLIN | POLLPRI;
1546     pollfd.revents = 0;
1547
1548     status = poll (&pollfd, 1, /* timeout = */ 500);
1549     if (do_shutdown)
1550       break;
1551     else if (status == 0) /* timeout */
1552       continue;
1553     else if (status < 0) /* error */
1554     {
1555       status = errno;
1556       if (status == EINTR)
1557         continue;
1558       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1559       continue;
1560     }
1561
1562     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1563     {
1564       close (fd);
1565       break;
1566     }
1567     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1568     {
1569       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1570           "poll(2) returned something unexpected: %#04hx",
1571           pollfd.revents);
1572       close (fd);
1573       break;
1574     }
1575
1576     status = (int) sread (fd, buffer, sizeof (buffer));
1577     if (status <= 0)
1578     {
1579       close (fd);
1580
1581       if (status < 0)
1582         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1583
1584       break;
1585     }
1586
1587     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1588     if (status != 0)
1589       break;
1590   }
1591
1592   close(fd);
1593
1594   self = pthread_self ();
1595   /* Remove this thread from the connection threads list */
1596   pthread_mutex_lock (&connection_threads_lock);
1597   /* Find out own index in the array */
1598   for (i = 0; i < connection_threads_num; i++)
1599     if (pthread_equal (connection_threads[i], self) != 0)
1600       break;
1601   assert (i < connection_threads_num);
1602
1603   /* Move the trailing threads forward. */
1604   if (i < (connection_threads_num - 1))
1605   {
1606     memmove (connection_threads + i,
1607         connection_threads + i + 1,
1608         sizeof (pthread_t) * (connection_threads_num - i - 1));
1609   }
1610
1611   connection_threads_num--;
1612   pthread_mutex_unlock (&connection_threads_lock);
1613
1614   return (NULL);
1615 } /* }}} void *connection_thread_main */
1616
1617 static int open_listen_socket_unix (const char *path) /* {{{ */
1618 {
1619   int fd;
1620   struct sockaddr_un sa;
1621   listen_socket_t *temp;
1622   int status;
1623
1624   temp = (listen_socket_t *) realloc (listen_fds,
1625       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1626   if (temp == NULL)
1627   {
1628     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1629     return (-1);
1630   }
1631   listen_fds = temp;
1632   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1633
1634   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1635   if (fd < 0)
1636   {
1637     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1638     return (-1);
1639   }
1640
1641   memset (&sa, 0, sizeof (sa));
1642   sa.sun_family = AF_UNIX;
1643   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1644
1645   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1646   if (status != 0)
1647   {
1648     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1649     close (fd);
1650     unlink (path);
1651     return (-1);
1652   }
1653
1654   status = listen (fd, /* backlog = */ 10);
1655   if (status != 0)
1656   {
1657     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1658     close (fd);
1659     unlink (path);
1660     return (-1);
1661   }
1662   
1663   listen_fds[listen_fds_num].fd = fd;
1664   snprintf (listen_fds[listen_fds_num].path,
1665       sizeof (listen_fds[listen_fds_num].path) - 1,
1666       "unix:%s", path);
1667   listen_fds_num++;
1668
1669   return (0);
1670 } /* }}} int open_listen_socket_unix */
1671
1672 static int open_listen_socket (const char *addr_orig) /* {{{ */
1673 {
1674   struct addrinfo ai_hints;
1675   struct addrinfo *ai_res;
1676   struct addrinfo *ai_ptr;
1677   char addr_copy[NI_MAXHOST];
1678   char *addr;
1679   char *port;
1680   int status;
1681
1682   assert (addr_orig != NULL);
1683
1684   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1685   addr_copy[sizeof (addr_copy) - 1] = 0;
1686   addr = addr_copy;
1687
1688   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1689     return (open_listen_socket_unix (addr + strlen ("unix:")));
1690   else if (addr[0] == '/')
1691     return (open_listen_socket_unix (addr));
1692
1693   memset (&ai_hints, 0, sizeof (ai_hints));
1694   ai_hints.ai_flags = 0;
1695 #ifdef AI_ADDRCONFIG
1696   ai_hints.ai_flags |= AI_ADDRCONFIG;
1697 #endif
1698   ai_hints.ai_family = AF_UNSPEC;
1699   ai_hints.ai_socktype = SOCK_STREAM;
1700
1701   port = NULL;
1702  if (*addr == '[') /* IPv6+port format */
1703   {
1704     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1705     addr++;
1706
1707     port = strchr (addr, ']');
1708     if (port == NULL)
1709     {
1710       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1711           addr_orig);
1712       return (-1);
1713     }
1714     *port = 0;
1715     port++;
1716
1717     if (*port == ':')
1718       port++;
1719     else if (*port == 0)
1720       port = NULL;
1721     else
1722     {
1723       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1724           port);
1725       return (-1);
1726     }
1727   } /* if (*addr = ']') */
1728   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1729   {
1730     port = rindex(addr, ':');
1731     if (port != NULL)
1732     {
1733       *port = 0;
1734       port++;
1735     }
1736   }
1737   ai_res = NULL;
1738   status = getaddrinfo (addr,
1739                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1740                         &ai_hints, &ai_res);
1741   if (status != 0)
1742   {
1743     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1744         "%s", addr, gai_strerror (status));
1745     return (-1);
1746   }
1747
1748   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1749   {
1750     int fd;
1751     listen_socket_t *temp;
1752     int one = 1;
1753
1754     temp = (listen_socket_t *) realloc (listen_fds,
1755         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1756     if (temp == NULL)
1757     {
1758       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1759       continue;
1760     }
1761     listen_fds = temp;
1762     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1763
1764     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1765     if (fd < 0)
1766     {
1767       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1768       continue;
1769     }
1770
1771     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1772
1773     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1774     if (status != 0)
1775     {
1776       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1777       close (fd);
1778       continue;
1779     }
1780
1781     status = listen (fd, /* backlog = */ 10);
1782     if (status != 0)
1783     {
1784       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1785       close (fd);
1786       return (-1);
1787     }
1788
1789     listen_fds[listen_fds_num].fd = fd;
1790     strncpy (listen_fds[listen_fds_num].path, addr,
1791         sizeof (listen_fds[listen_fds_num].path) - 1);
1792     listen_fds_num++;
1793   } /* for (ai_ptr) */
1794
1795   return (0);
1796 } /* }}} int open_listen_socket */
1797
1798 static int close_listen_sockets (void) /* {{{ */
1799 {
1800   size_t i;
1801
1802   for (i = 0; i < listen_fds_num; i++)
1803   {
1804     close (listen_fds[i].fd);
1805     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1806       unlink (listen_fds[i].path + strlen ("unix:"));
1807   }
1808
1809   free (listen_fds);
1810   listen_fds = NULL;
1811   listen_fds_num = 0;
1812
1813   return (0);
1814 } /* }}} int close_listen_sockets */
1815
1816 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1817 {
1818   struct pollfd *pollfds;
1819   int pollfds_num;
1820   int status;
1821   int i;
1822
1823   for (i = 0; i < config_listen_address_list_len; i++)
1824     open_listen_socket (config_listen_address_list[i]);
1825
1826   if (config_listen_address_list_len < 1)
1827     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1828
1829   if (listen_fds_num < 1)
1830   {
1831     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1832         "could be opened. Sorry.");
1833     return (NULL);
1834   }
1835
1836   pollfds_num = listen_fds_num;
1837   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1838   if (pollfds == NULL)
1839   {
1840     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1841     return (NULL);
1842   }
1843   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1844
1845   RRDD_LOG(LOG_INFO, "listening for connections");
1846
1847   while (do_shutdown == 0)
1848   {
1849     assert (pollfds_num == ((int) listen_fds_num));
1850     for (i = 0; i < pollfds_num; i++)
1851     {
1852       pollfds[i].fd = listen_fds[i].fd;
1853       pollfds[i].events = POLLIN | POLLPRI;
1854       pollfds[i].revents = 0;
1855     }
1856
1857     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1858     if (do_shutdown)
1859       break;
1860     else if (status == 0) /* timeout */
1861       continue;
1862     else if (status < 0) /* error */
1863     {
1864       status = errno;
1865       if (status != EINTR)
1866       {
1867         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1868       }
1869       continue;
1870     }
1871
1872     for (i = 0; i < pollfds_num; i++)
1873     {
1874       int *client_sd;
1875       struct sockaddr_storage client_sa;
1876       socklen_t client_sa_size;
1877       pthread_t tid;
1878       pthread_attr_t attr;
1879
1880       if (pollfds[i].revents == 0)
1881         continue;
1882
1883       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1884       {
1885         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1886             "poll(2) returned something unexpected for listen FD #%i.",
1887             pollfds[i].fd);
1888         continue;
1889       }
1890
1891       client_sd = (int *) malloc (sizeof (int));
1892       if (client_sd == NULL)
1893       {
1894         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1895         continue;
1896       }
1897
1898       client_sa_size = sizeof (client_sa);
1899       *client_sd = accept (pollfds[i].fd,
1900           (struct sockaddr *) &client_sa, &client_sa_size);
1901       if (*client_sd < 0)
1902       {
1903         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1904         continue;
1905       }
1906
1907       pthread_attr_init (&attr);
1908       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1909
1910       status = pthread_create (&tid, &attr, connection_thread_main,
1911           /* args = */ (void *) client_sd);
1912       if (status != 0)
1913       {
1914         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1915         close (*client_sd);
1916         free (client_sd);
1917         continue;
1918       }
1919     } /* for (pollfds_num) */
1920   } /* while (do_shutdown == 0) */
1921
1922   RRDD_LOG(LOG_INFO, "starting shutdown");
1923
1924   close_listen_sockets ();
1925
1926   pthread_mutex_lock (&connection_threads_lock);
1927   while (connection_threads_num > 0)
1928   {
1929     pthread_t wait_for;
1930
1931     wait_for = connection_threads[0];
1932
1933     pthread_mutex_unlock (&connection_threads_lock);
1934     pthread_join (wait_for, /* retval = */ NULL);
1935     pthread_mutex_lock (&connection_threads_lock);
1936   }
1937   pthread_mutex_unlock (&connection_threads_lock);
1938
1939   return (NULL);
1940 } /* }}} void *listen_thread_main */
1941
1942 static int daemonize (void) /* {{{ */
1943 {
1944   int status;
1945   int fd;
1946
1947   fd = open_pidfile();
1948   if (fd < 0) return fd;
1949
1950   if (!stay_foreground)
1951   {
1952     pid_t child;
1953     char *base_dir;
1954
1955     child = fork ();
1956     if (child < 0)
1957     {
1958       fprintf (stderr, "daemonize: fork(2) failed.\n");
1959       return (-1);
1960     }
1961     else if (child > 0)
1962     {
1963       return (1);
1964     }
1965
1966     /* Change into the /tmp directory. */
1967     base_dir = (config_base_dir != NULL)
1968       ? config_base_dir
1969       : "/tmp";
1970     status = chdir (base_dir);
1971     if (status != 0)
1972     {
1973       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1974       return (-1);
1975     }
1976
1977     /* Become session leader */
1978     setsid ();
1979
1980     /* Open the first three file descriptors to /dev/null */
1981     close (2);
1982     close (1);
1983     close (0);
1984
1985     open ("/dev/null", O_RDWR);
1986     dup (0);
1987     dup (0);
1988   } /* if (!stay_foreground) */
1989
1990   install_signal_handlers();
1991
1992   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1993   RRDD_LOG(LOG_INFO, "starting up");
1994
1995   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1996   if (cache_tree == NULL)
1997   {
1998     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1999     return (-1);
2000   }
2001
2002   status = write_pidfile (fd);
2003   return status;
2004 } /* }}} int daemonize */
2005
2006 static int cleanup (void) /* {{{ */
2007 {
2008   do_shutdown++;
2009
2010   pthread_cond_signal (&cache_cond);
2011   pthread_join (queue_thread, /* return = */ NULL);
2012
2013   remove_pidfile ();
2014
2015   RRDD_LOG(LOG_INFO, "goodbye");
2016   closelog ();
2017
2018   return (0);
2019 } /* }}} int cleanup */
2020
2021 static int read_options (int argc, char **argv) /* {{{ */
2022 {
2023   int option;
2024   int status = 0;
2025
2026   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?F")) != -1)
2027   {
2028     switch (option)
2029     {
2030       case 'g':
2031         stay_foreground=1;
2032         break;
2033
2034       case 'l':
2035       {
2036         char **temp;
2037
2038         temp = (char **) realloc (config_listen_address_list,
2039             sizeof (char *) * (config_listen_address_list_len + 1));
2040         if (temp == NULL)
2041         {
2042           fprintf (stderr, "read_options: realloc failed.\n");
2043           return (2);
2044         }
2045         config_listen_address_list = temp;
2046
2047         temp[config_listen_address_list_len] = strdup (optarg);
2048         if (temp[config_listen_address_list_len] == NULL)
2049         {
2050           fprintf (stderr, "read_options: strdup failed.\n");
2051           return (2);
2052         }
2053         config_listen_address_list_len++;
2054       }
2055       break;
2056
2057       case 'f':
2058       {
2059         int temp;
2060
2061         temp = atoi (optarg);
2062         if (temp > 0)
2063           config_flush_interval = temp;
2064         else
2065         {
2066           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2067           status = 3;
2068         }
2069       }
2070       break;
2071
2072       case 'w':
2073       {
2074         int temp;
2075
2076         temp = atoi (optarg);
2077         if (temp > 0)
2078           config_write_interval = temp;
2079         else
2080         {
2081           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2082           status = 2;
2083         }
2084       }
2085       break;
2086
2087       case 'z':
2088       {
2089         int temp;
2090
2091         temp = atoi(optarg);
2092         if (temp > 0)
2093           config_write_jitter = temp;
2094         else
2095         {
2096           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2097           status = 2;
2098         }
2099
2100         break;
2101       }
2102
2103       case 'b':
2104       {
2105         size_t len;
2106
2107         if (config_base_dir != NULL)
2108           free (config_base_dir);
2109         config_base_dir = strdup (optarg);
2110         if (config_base_dir == NULL)
2111         {
2112           fprintf (stderr, "read_options: strdup failed.\n");
2113           return (3);
2114         }
2115
2116         len = strlen (config_base_dir);
2117         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2118         {
2119           config_base_dir[len - 1] = 0;
2120           len--;
2121         }
2122
2123         if (len < 1)
2124         {
2125           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2126           return (4);
2127         }
2128       }
2129       break;
2130
2131       case 'p':
2132       {
2133         if (config_pid_file != NULL)
2134           free (config_pid_file);
2135         config_pid_file = strdup (optarg);
2136         if (config_pid_file == NULL)
2137         {
2138           fprintf (stderr, "read_options: strdup failed.\n");
2139           return (3);
2140         }
2141       }
2142       break;
2143
2144       case 'F':
2145         config_flush_at_shutdown = 1;
2146         break;
2147
2148       case 'j':
2149       {
2150         struct stat statbuf;
2151         const char *dir = optarg;
2152
2153         status = stat(dir, &statbuf);
2154         if (status != 0)
2155         {
2156           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2157           return 6;
2158         }
2159
2160         if (!S_ISDIR(statbuf.st_mode)
2161             || access(dir, R_OK|W_OK|X_OK) != 0)
2162         {
2163           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2164                   errno ? rrd_strerror(errno) : "");
2165           return 6;
2166         }
2167
2168         journal_cur = malloc(PATH_MAX + 1);
2169         journal_old = malloc(PATH_MAX + 1);
2170         if (journal_cur == NULL || journal_old == NULL)
2171         {
2172           fprintf(stderr, "malloc failure for journal files\n");
2173           return 6;
2174         }
2175         else 
2176         {
2177           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2178           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2179         }
2180       }
2181       break;
2182
2183       case 'h':
2184       case '?':
2185         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2186             "\n"
2187             "Usage: rrdcached [options]\n"
2188             "\n"
2189             "Valid options are:\n"
2190             "  -l <address>  Socket address to listen to.\n"
2191             "  -w <seconds>  Interval in which to write data.\n"
2192             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2193             "  -f <seconds>  Interval in which to flush dead data.\n"
2194             "  -p <file>     Location of the PID-file.\n"
2195             "  -b <dir>      Base directory to change to.\n"
2196             "  -g            Do not fork and run in the foreground.\n"
2197             "  -j <dir>      Directory in which to create the journal files.\n"
2198             "  -F            Always flush all updates at shutdown\n"
2199             "\n"
2200             "For more information and a detailed description of all options "
2201             "please refer\n"
2202             "to the rrdcached(1) manual page.\n",
2203             VERSION);
2204         status = -1;
2205         break;
2206     } /* switch (option) */
2207   } /* while (getopt) */
2208
2209   /* advise the user when values are not sane */
2210   if (config_flush_interval < 2 * config_write_interval)
2211     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2212             " 2x write interval (-w) !\n");
2213   if (config_write_jitter > config_write_interval)
2214     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2215             " write interval (-w) !\n");
2216
2217   if (journal_cur == NULL)
2218     config_flush_at_shutdown = 1;
2219
2220   return (status);
2221 } /* }}} int read_options */
2222
2223 int main (int argc, char **argv)
2224 {
2225   int status;
2226
2227   status = read_options (argc, argv);
2228   if (status != 0)
2229   {
2230     if (status < 0)
2231       status = 0;
2232     return (status);
2233   }
2234
2235   status = daemonize ();
2236   if (status == 1)
2237   {
2238     struct sigaction sigchld;
2239
2240     memset (&sigchld, 0, sizeof (sigchld));
2241     sigchld.sa_handler = SIG_IGN;
2242     sigaction (SIGCHLD, &sigchld, NULL);
2243
2244     return (0);
2245   }
2246   else if (status != 0)
2247   {
2248     fprintf (stderr, "daemonize failed, exiting.\n");
2249     return (1);
2250   }
2251
2252   if (journal_cur != NULL)
2253   {
2254     int had_journal = 0;
2255
2256     pthread_mutex_lock(&journal_lock);
2257
2258     RRDD_LOG(LOG_INFO, "checking for journal files");
2259
2260     had_journal += journal_replay(journal_old);
2261     had_journal += journal_replay(journal_cur);
2262
2263     if (had_journal)
2264       flush_old_values(-1);
2265
2266     pthread_mutex_unlock(&journal_lock);
2267     journal_rotate();
2268
2269     RRDD_LOG(LOG_INFO, "journal processing complete");
2270   }
2271
2272   /* start the queue thread */
2273   memset (&queue_thread, 0, sizeof (queue_thread));
2274   status = pthread_create (&queue_thread,
2275                            NULL, /* attr */
2276                            queue_thread_main,
2277                            NULL); /* args */
2278   if (status != 0)
2279   {
2280     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2281     cleanup();
2282     return (1);
2283   }
2284
2285   listen_thread_main (NULL);
2286   cleanup ();
2287
2288   return (0);
2289 } /* int main */
2290
2291 /*
2292  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2293  */