a few missing {{{ folding }}} markers added
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 struct listen_socket_s
105 {
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
110
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
125
126 struct callback_flush_data_s
127 {
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
134
135 enum queue_side_e
136 {
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
141
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
144
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
149
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
152
153 static int do_shutdown = 0;
154
155 static pthread_t queue_thread;
156
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
160
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
167
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static int config_flush_at_shutdown = 0;
172 static char *config_pid_file = NULL;
173 static char *config_base_dir = NULL;
174
175 static char **config_listen_address_list = NULL;
176 static int config_listen_address_list_len = 0;
177
178 static uint64_t stats_queue_length = 0;
179 static uint64_t stats_updates_received = 0;
180 static uint64_t stats_flush_received = 0;
181 static uint64_t stats_updates_written = 0;
182 static uint64_t stats_data_sets_written = 0;
183 static uint64_t stats_journal_bytes = 0;
184 static uint64_t stats_journal_rotate = 0;
185 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186
187 /* Journaled updates */
188 static char *journal_cur = NULL;
189 static char *journal_old = NULL;
190 static FILE *journal_fh = NULL;
191 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
192 static int journal_write(char *cmd, char *args);
193 static void journal_done(void);
194 static void journal_rotate(void);
195
196 /* 
197  * Functions
198  */
199 static void sig_common (const char *sig) /* {{{ */
200 {
201   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
202   do_shutdown++;
203   pthread_cond_broadcast(&cache_cond);
204 } /* }}} void sig_common */
205
206 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
207 {
208   sig_common("INT");
209 } /* }}} void sig_int_handler */
210
211 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
212 {
213   sig_common("TERM");
214 } /* }}} void sig_term_handler */
215
216 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
217 {
218   config_flush_at_shutdown = 1;
219   sig_common("USR1");
220 } /* }}} void sig_usr1_handler */
221
222 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
223 {
224   config_flush_at_shutdown = 0;
225   sig_common("USR2");
226 } /* }}} void sig_usr2_handler */
227
228 static void install_signal_handlers(void) /* {{{ */
229 {
230   /* These structures are static, because `sigaction' behaves weird if the are
231    * overwritten.. */
232   static struct sigaction sa_int;
233   static struct sigaction sa_term;
234   static struct sigaction sa_pipe;
235   static struct sigaction sa_usr1;
236   static struct sigaction sa_usr2;
237
238   /* Install signal handlers */
239   memset (&sa_int, 0, sizeof (sa_int));
240   sa_int.sa_handler = sig_int_handler;
241   sigaction (SIGINT, &sa_int, NULL);
242
243   memset (&sa_term, 0, sizeof (sa_term));
244   sa_term.sa_handler = sig_term_handler;
245   sigaction (SIGTERM, &sa_term, NULL);
246
247   memset (&sa_pipe, 0, sizeof (sa_pipe));
248   sa_pipe.sa_handler = SIG_IGN;
249   sigaction (SIGPIPE, &sa_pipe, NULL);
250
251   memset (&sa_pipe, 0, sizeof (sa_usr1));
252   sa_usr1.sa_handler = sig_usr1_handler;
253   sigaction (SIGUSR1, &sa_usr1, NULL);
254
255   memset (&sa_usr2, 0, sizeof (sa_usr2));
256   sa_usr2.sa_handler = sig_usr2_handler;
257   sigaction (SIGUSR2, &sa_usr2, NULL);
258
259 } /* }}} void install_signal_handlers */
260
261 static int open_pidfile(void) /* {{{ */
262 {
263   int fd;
264   char *file;
265
266   file = (config_pid_file != NULL)
267     ? config_pid_file
268     : LOCALSTATEDIR "/run/rrdcached.pid";
269
270   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
271   if (fd < 0)
272     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
273             file, rrd_strerror(errno));
274
275   return(fd);
276 } /* }}} static int open_pidfile */
277
278 static int write_pidfile (int fd) /* {{{ */
279 {
280   pid_t pid;
281   FILE *fh;
282
283   pid = getpid ();
284
285   fh = fdopen (fd, "w");
286   if (fh == NULL)
287   {
288     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
289     close(fd);
290     return (-1);
291   }
292
293   fprintf (fh, "%i\n", (int) pid);
294   fclose (fh);
295
296   return (0);
297 } /* }}} int write_pidfile */
298
299 static int remove_pidfile (void) /* {{{ */
300 {
301   char *file;
302   int status;
303
304   file = (config_pid_file != NULL)
305     ? config_pid_file
306     : LOCALSTATEDIR "/run/rrdcached.pid";
307
308   status = unlink (file);
309   if (status == 0)
310     return (0);
311   return (errno);
312 } /* }}} int remove_pidfile */
313
314 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
315 {
316   char    *buffer;
317   size_t   buffer_used;
318   size_t   buffer_free;
319   ssize_t  status;
320
321   buffer       = (char *) buffer_void;
322   buffer_used  = 0;
323   buffer_free  = buffer_size;
324
325   while (buffer_free > 0)
326   {
327     status = read (fd, buffer + buffer_used, buffer_free);
328     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
329       continue;
330
331     if (status < 0)
332       return (-1);
333
334     if (status == 0)
335       return (0);
336
337     assert ((0 > status) || (buffer_free >= (size_t) status));
338
339     buffer_free = buffer_free - status;
340     buffer_used = buffer_used + status;
341
342     if (buffer[buffer_used - 1] == '\n')
343       break;
344   }
345
346   assert (buffer_used > 0);
347
348   if (buffer[buffer_used - 1] != '\n')
349   {
350     errno = ENOBUFS;
351     return (-1);
352   }
353
354   buffer[buffer_used - 1] = 0;
355
356   /* Fix network line endings. */
357   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
358   {
359     buffer_used--;
360     buffer[buffer_used - 1] = 0;
361   }
362
363   return (buffer_used);
364 } /* }}} ssize_t sread */
365
366 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
367 {
368   const char *ptr;
369   size_t      nleft;
370   ssize_t     status;
371
372   /* special case for journal replay */
373   if (fd < 0) return 0;
374
375   ptr   = (const char *) buf;
376   nleft = count;
377
378   while (nleft > 0)
379   {
380     status = write (fd, (const void *) ptr, nleft);
381
382     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
383       continue;
384
385     if (status < 0)
386       return (status);
387
388     nleft -= status;
389     ptr   += status;
390   }
391
392   return (0);
393 } /* }}} ssize_t swrite */
394
395 static void _wipe_ci_values(cache_item_t *ci, time_t when)
396 {
397   ci->values = NULL;
398   ci->values_num = 0;
399
400   ci->last_flush_time = when;
401   if (config_write_jitter > 0)
402     ci->last_flush_time += (random() % config_write_jitter);
403
404   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
405 }
406
407 /*
408  * enqueue_cache_item:
409  * `cache_lock' must be acquired before calling this function!
410  */
411 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
412     queue_side_t side)
413 {
414   int did_insert = 0;
415
416   if (ci == NULL)
417     return (-1);
418
419   if (ci->values_num == 0)
420     return (0);
421
422   if (side == HEAD)
423   {
424     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
425     {
426       assert (ci->next == NULL);
427       ci->next = cache_queue_head;
428       cache_queue_head = ci;
429
430       if (cache_queue_tail == NULL)
431         cache_queue_tail = cache_queue_head;
432
433       did_insert = 1;
434     }
435     else if (cache_queue_head == ci)
436     {
437       /* do nothing */
438     }
439     else /* enqueued, but not first entry */
440     {
441       cache_item_t *prev;
442
443       /* find previous entry */
444       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
445         if (prev->next == ci)
446           break;
447       assert (prev != NULL);
448
449       /* move to the front */
450       prev->next = ci->next;
451       ci->next = cache_queue_head;
452       cache_queue_head = ci;
453
454       /* check if we need to adapt the tail */
455       if (cache_queue_tail == ci)
456         cache_queue_tail = prev;
457     }
458   }
459   else /* (side == TAIL) */
460   {
461     /* We don't move values back in the list.. */
462     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
463       return (0);
464
465     assert (ci->next == NULL);
466
467     if (cache_queue_tail == NULL)
468       cache_queue_head = ci;
469     else
470       cache_queue_tail->next = ci;
471     cache_queue_tail = ci;
472
473     did_insert = 1;
474   }
475
476   ci->flags |= CI_FLAGS_IN_QUEUE;
477
478   if (did_insert)
479   {
480     pthread_cond_broadcast(&cache_cond);
481     pthread_mutex_lock (&stats_lock);
482     stats_queue_length++;
483     pthread_mutex_unlock (&stats_lock);
484   }
485
486   return (0);
487 } /* }}} int enqueue_cache_item */
488
489 /*
490  * tree_callback_flush:
491  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
492  * while this is in progress.
493  */
494 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
495     gpointer data)
496 {
497   cache_item_t *ci;
498   callback_flush_data_t *cfd;
499
500   ci = (cache_item_t *) value;
501   cfd = (callback_flush_data_t *) data;
502
503   if ((ci->last_flush_time <= cfd->abs_timeout)
504       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
505       && (ci->values_num > 0))
506   {
507     enqueue_cache_item (ci, TAIL);
508   }
509   else if ((do_shutdown != 0)
510       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
511       && (ci->values_num > 0))
512   {
513     enqueue_cache_item (ci, TAIL);
514   }
515   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
516       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
517       && (ci->values_num <= 0))
518   {
519     char **temp;
520
521     temp = (char **) realloc (cfd->keys,
522         sizeof (char *) * (cfd->keys_num + 1));
523     if (temp == NULL)
524     {
525       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
526       return (FALSE);
527     }
528     cfd->keys = temp;
529     /* Make really sure this points to the _same_ place */
530     assert ((char *) key == ci->file);
531     cfd->keys[cfd->keys_num] = (char *) key;
532     cfd->keys_num++;
533   }
534
535   return (FALSE);
536 } /* }}} gboolean tree_callback_flush */
537
538 static int flush_old_values (int max_age)
539 {
540   callback_flush_data_t cfd;
541   size_t k;
542
543   memset (&cfd, 0, sizeof (cfd));
544   /* Pass the current time as user data so that we don't need to call
545    * `time' for each node. */
546   cfd.now = time (NULL);
547   cfd.keys = NULL;
548   cfd.keys_num = 0;
549
550   if (max_age > 0)
551     cfd.abs_timeout = cfd.now - max_age;
552   else
553     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
554
555   /* `tree_callback_flush' will return the keys of all values that haven't
556    * been touched in the last `config_flush_interval' seconds in `cfd'.
557    * The char*'s in this array point to the same memory as ci->file, so we
558    * don't need to free them separately. */
559   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
560
561   for (k = 0; k < cfd.keys_num; k++)
562   {
563     cache_item_t *ci;
564
565     /* This must not fail. */
566     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
567     assert (ci != NULL);
568
569     /* If we end up here with values available, something's seriously
570      * messed up. */
571     assert (ci->values_num == 0);
572
573     /* Remove the node from the tree */
574     g_tree_remove (cache_tree, cfd.keys[k]);
575     cfd.keys[k] = NULL;
576
577     /* Now free and clean up `ci'. */
578     free (ci->file);
579     ci->file = NULL;
580     free (ci);
581     ci = NULL;
582   } /* for (k = 0; k < cfd.keys_num; k++) */
583
584   if (cfd.keys != NULL)
585   {
586     free (cfd.keys);
587     cfd.keys = NULL;
588   }
589
590   return (0);
591 } /* int flush_old_values */
592
593 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
594 {
595   struct timeval now;
596   struct timespec next_flush;
597   int final_flush = 0; /* make sure we only flush once on shutdown */
598
599   gettimeofday (&now, NULL);
600   next_flush.tv_sec = now.tv_sec + config_flush_interval;
601   next_flush.tv_nsec = 1000 * now.tv_usec;
602
603   pthread_mutex_lock (&cache_lock);
604   while ((do_shutdown == 0) || (cache_queue_head != NULL))
605   {
606     cache_item_t *ci;
607     char *file;
608     char **values;
609     int values_num;
610     int status;
611     int i;
612
613     /* First, check if it's time to do the cache flush. */
614     gettimeofday (&now, NULL);
615     if ((now.tv_sec > next_flush.tv_sec)
616         || ((now.tv_sec == next_flush.tv_sec)
617           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
618     {
619       /* Flush all values that haven't been written in the last
620        * `config_write_interval' seconds. */
621       flush_old_values (config_write_interval);
622
623       /* Determine the time of the next cache flush. */
624       while (next_flush.tv_sec <= now.tv_sec)
625         next_flush.tv_sec += config_flush_interval;
626
627       /* unlock the cache while we rotate so we don't block incoming
628        * updates if the fsync() blocks on disk I/O */
629       pthread_mutex_unlock(&cache_lock);
630       journal_rotate();
631       pthread_mutex_lock(&cache_lock);
632     }
633
634     /* Now, check if there's something to store away. If not, wait until
635      * something comes in or it's time to do the cache flush.  if we are
636      * shutting down, do not wait around.  */
637     if (cache_queue_head == NULL && !do_shutdown)
638     {
639       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
640       if ((status != 0) && (status != ETIMEDOUT))
641       {
642         RRDD_LOG (LOG_ERR, "queue_thread_main: "
643             "pthread_cond_timedwait returned %i.", status);
644       }
645     }
646
647     /* We're about to shut down */
648     if (do_shutdown != 0 && !final_flush++)
649     {
650       if (config_flush_at_shutdown)
651         flush_old_values (-1); /* flush everything */
652       else
653         break;
654     }
655
656     /* Check if a value has arrived. This may be NULL if we timed out or there
657      * was an interrupt such as a signal. */
658     if (cache_queue_head == NULL)
659       continue;
660
661     ci = cache_queue_head;
662
663     /* copy the relevant parts */
664     file = strdup (ci->file);
665     if (file == NULL)
666     {
667       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
668       continue;
669     }
670
671     assert(ci->values != NULL);
672     assert(ci->values_num > 0);
673
674     values = ci->values;
675     values_num = ci->values_num;
676
677     _wipe_ci_values(ci, time(NULL));
678
679     cache_queue_head = ci->next;
680     if (cache_queue_head == NULL)
681       cache_queue_tail = NULL;
682     ci->next = NULL;
683
684     pthread_mutex_lock (&stats_lock);
685     assert (stats_queue_length > 0);
686     stats_queue_length--;
687     pthread_mutex_unlock (&stats_lock);
688
689     pthread_mutex_unlock (&cache_lock);
690
691     rrd_clear_error ();
692     status = rrd_update_r (file, NULL, values_num, (void *) values);
693     if (status != 0)
694     {
695       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
696           "rrd_update_r (%s) failed with status %i. (%s)",
697           file, status, rrd_get_error());
698     }
699
700     journal_write("wrote", file);
701     pthread_cond_broadcast(&ci->flushed);
702
703     for (i = 0; i < values_num; i++)
704       free (values[i]);
705
706     free(values);
707     free(file);
708
709     if (status == 0)
710     {
711       pthread_mutex_lock (&stats_lock);
712       stats_updates_written++;
713       stats_data_sets_written += values_num;
714       pthread_mutex_unlock (&stats_lock);
715     }
716
717     pthread_mutex_lock (&cache_lock);
718
719     /* We're about to shut down */
720     if (do_shutdown != 0 && !final_flush++)
721     {
722       if (config_flush_at_shutdown)
723           flush_old_values (-1); /* flush everything */
724       else
725         break;
726     }
727   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
728   pthread_mutex_unlock (&cache_lock);
729
730   if (config_flush_at_shutdown)
731   {
732     assert(cache_queue_head == NULL);
733     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
734   }
735
736   journal_done();
737
738   return (NULL);
739 } /* }}} void *queue_thread_main */
740
741 static int buffer_get_field (char **buffer_ret, /* {{{ */
742     size_t *buffer_size_ret, char **field_ret)
743 {
744   char *buffer;
745   size_t buffer_pos;
746   size_t buffer_size;
747   char *field;
748   size_t field_size;
749   int status;
750
751   buffer = *buffer_ret;
752   buffer_pos = 0;
753   buffer_size = *buffer_size_ret;
754   field = *buffer_ret;
755   field_size = 0;
756
757   if (buffer_size <= 0)
758     return (-1);
759
760   /* This is ensured by `handle_request'. */
761   assert (buffer[buffer_size - 1] == '\0');
762
763   status = -1;
764   while (buffer_pos < buffer_size)
765   {
766     /* Check for end-of-field or end-of-buffer */
767     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
768     {
769       field[field_size] = 0;
770       field_size++;
771       buffer_pos++;
772       status = 0;
773       break;
774     }
775     /* Handle escaped characters. */
776     else if (buffer[buffer_pos] == '\\')
777     {
778       if (buffer_pos >= (buffer_size - 1))
779         break;
780       buffer_pos++;
781       field[field_size] = buffer[buffer_pos];
782       field_size++;
783       buffer_pos++;
784     }
785     /* Normal operation */ 
786     else
787     {
788       field[field_size] = buffer[buffer_pos];
789       field_size++;
790       buffer_pos++;
791     }
792   } /* while (buffer_pos < buffer_size) */
793
794   if (status != 0)
795     return (status);
796
797   *buffer_ret = buffer + buffer_pos;
798   *buffer_size_ret = buffer_size - buffer_pos;
799   *field_ret = field;
800
801   return (0);
802 } /* }}} int buffer_get_field */
803
804 static int flush_file (const char *filename) /* {{{ */
805 {
806   cache_item_t *ci;
807
808   pthread_mutex_lock (&cache_lock);
809
810   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
811   if (ci == NULL)
812   {
813     pthread_mutex_unlock (&cache_lock);
814     return (ENOENT);
815   }
816
817   /* Enqueue at head */
818   enqueue_cache_item (ci, HEAD);
819
820   pthread_cond_wait(&ci->flushed, &cache_lock);
821   pthread_mutex_unlock(&cache_lock);
822
823   return (0);
824 } /* }}} int flush_file */
825
826 static int handle_request_help (int fd, /* {{{ */
827     char *buffer, size_t buffer_size)
828 {
829   int status;
830   char **help_text;
831   size_t help_text_len;
832   char *command;
833   size_t i;
834
835   char *help_help[] =
836   {
837     "5 Command overview\n",
838     "FLUSH <filename>\n",
839     "FLUSHALL\n",
840     "HELP [<command>]\n",
841     "UPDATE <filename> <values> [<values> ...]\n",
842     "STATS\n"
843   };
844   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
845
846   char *help_flush[] =
847   {
848     "4 Help for FLUSH\n",
849     "Usage: FLUSH <filename>\n",
850     "\n",
851     "Adds the given filename to the head of the update queue and returns\n",
852     "after is has been dequeued.\n"
853   };
854   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
855
856   char *help_flushall[] =
857   {
858     "3 Help for FLUSHALL\n",
859     "Usage: FLUSHALL\n",
860     "\n",
861     "Triggers writing of all pending updates.  Returns immediately.\n"
862   };
863   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
864
865   char *help_update[] =
866   {
867     "9 Help for UPDATE\n",
868     "Usage: UPDATE <filename> <values> [<values> ...]\n"
869     "\n",
870     "Adds the given file to the internal cache if it is not yet known and\n",
871     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
872     "for details.\n",
873     "\n",
874     "Each <values> has the following form:\n",
875     "  <values> = <time>:<value>[:<value>[...]]\n",
876     "See the rrdupdate(1) manpage for details.\n"
877   };
878   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
879
880   char *help_stats[] =
881   {
882     "4 Help for STATS\n",
883     "Usage: STATS\n",
884     "\n",
885     "Returns some performance counters, see the rrdcached(1) manpage for\n",
886     "a description of the values.\n"
887   };
888   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
889
890   status = buffer_get_field (&buffer, &buffer_size, &command);
891   if (status != 0)
892   {
893     help_text = help_help;
894     help_text_len = help_help_len;
895   }
896   else
897   {
898     if (strcasecmp (command, "update") == 0)
899     {
900       help_text = help_update;
901       help_text_len = help_update_len;
902     }
903     else if (strcasecmp (command, "flush") == 0)
904     {
905       help_text = help_flush;
906       help_text_len = help_flush_len;
907     }
908     else if (strcasecmp (command, "flushall") == 0)
909     {
910       help_text = help_flushall;
911       help_text_len = help_flushall_len;
912     }
913     else if (strcasecmp (command, "stats") == 0)
914     {
915       help_text = help_stats;
916       help_text_len = help_stats_len;
917     }
918     else
919     {
920       help_text = help_help;
921       help_text_len = help_help_len;
922     }
923   }
924
925   for (i = 0; i < help_text_len; i++)
926   {
927     status = swrite (fd, help_text[i], strlen (help_text[i]));
928     if (status < 0)
929     {
930       status = errno;
931       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
932       return (status);
933     }
934   }
935
936   return (0);
937 } /* }}} int handle_request_help */
938
939 static int handle_request_stats (int fd, /* {{{ */
940     char *buffer __attribute__((unused)),
941     size_t buffer_size __attribute__((unused)))
942 {
943   int status;
944   char outbuf[CMD_MAX];
945
946   uint64_t copy_queue_length;
947   uint64_t copy_updates_received;
948   uint64_t copy_flush_received;
949   uint64_t copy_updates_written;
950   uint64_t copy_data_sets_written;
951   uint64_t copy_journal_bytes;
952   uint64_t copy_journal_rotate;
953
954   uint64_t tree_nodes_number;
955   uint64_t tree_depth;
956
957   pthread_mutex_lock (&stats_lock);
958   copy_queue_length       = stats_queue_length;
959   copy_updates_received   = stats_updates_received;
960   copy_flush_received     = stats_flush_received;
961   copy_updates_written    = stats_updates_written;
962   copy_data_sets_written  = stats_data_sets_written;
963   copy_journal_bytes      = stats_journal_bytes;
964   copy_journal_rotate     = stats_journal_rotate;
965   pthread_mutex_unlock (&stats_lock);
966
967   pthread_mutex_lock (&cache_lock);
968   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
969   tree_depth        = (uint64_t) g_tree_height (cache_tree);
970   pthread_mutex_unlock (&cache_lock);
971
972 #define RRDD_STATS_SEND \
973   outbuf[sizeof (outbuf) - 1] = 0; \
974   status = swrite (fd, outbuf, strlen (outbuf)); \
975   if (status < 0) \
976   { \
977     status = errno; \
978     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
979     return (status); \
980   }
981
982   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
983   RRDD_STATS_SEND;
984
985   snprintf (outbuf, sizeof (outbuf),
986       "QueueLength: %"PRIu64"\n", copy_queue_length);
987   RRDD_STATS_SEND;
988
989   snprintf (outbuf, sizeof (outbuf),
990       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
991   RRDD_STATS_SEND;
992
993   snprintf (outbuf, sizeof (outbuf),
994       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
995   RRDD_STATS_SEND;
996
997   snprintf (outbuf, sizeof (outbuf),
998       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
999   RRDD_STATS_SEND;
1000
1001   snprintf (outbuf, sizeof (outbuf),
1002       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1003   RRDD_STATS_SEND;
1004
1005   snprintf (outbuf, sizeof (outbuf),
1006       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1007   RRDD_STATS_SEND;
1008
1009   snprintf (outbuf, sizeof (outbuf),
1010       "TreeDepth: %"PRIu64"\n", tree_depth);
1011   RRDD_STATS_SEND;
1012
1013   snprintf (outbuf, sizeof(outbuf),
1014       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1015   RRDD_STATS_SEND;
1016
1017   snprintf (outbuf, sizeof(outbuf),
1018       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1019   RRDD_STATS_SEND;
1020
1021   return (0);
1022 #undef RRDD_STATS_SEND
1023 } /* }}} int handle_request_stats */
1024
1025 static int handle_request_flush (int fd, /* {{{ */
1026     char *buffer, size_t buffer_size)
1027 {
1028   char *file;
1029   int status;
1030   char result[CMD_MAX];
1031
1032   status = buffer_get_field (&buffer, &buffer_size, &file);
1033   if (status != 0)
1034   {
1035     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1036   }
1037   else
1038   {
1039     pthread_mutex_lock(&stats_lock);
1040     stats_flush_received++;
1041     pthread_mutex_unlock(&stats_lock);
1042
1043     status = flush_file (file);
1044     if (status == 0)
1045       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1046     else if (status == ENOENT)
1047     {
1048       /* no file in our tree; see whether it exists at all */
1049       struct stat statbuf;
1050
1051       memset(&statbuf, 0, sizeof(statbuf));
1052       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1053         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1054       else
1055         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1056     }
1057     else if (status < 0)
1058       strncpy (result, "-1 Internal error.\n", sizeof (result));
1059     else
1060       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1061   }
1062   result[sizeof (result) - 1] = 0;
1063
1064   status = swrite (fd, result, strlen (result));
1065   if (status < 0)
1066   {
1067     status = errno;
1068     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1069     return (status);
1070   }
1071
1072   return (0);
1073 } /* }}} int handle_request_flush */
1074
1075 static int handle_request_flushall(int fd) /* {{{ */
1076 {
1077   int status;
1078   char answer[] ="0 Started flush.\n";
1079
1080   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1081
1082   pthread_mutex_lock(&cache_lock);
1083   flush_old_values(-1);
1084   pthread_mutex_unlock(&cache_lock);
1085
1086   status = swrite(fd, answer, strlen(answer));
1087   if (status < 0)
1088   {
1089     status = errno;
1090     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1091   }
1092
1093   return (status);
1094 } /* }}} static int handle_request_flushall */
1095
1096 static int handle_request_update (int fd, /* {{{ */
1097     char *buffer, size_t buffer_size)
1098 {
1099   char *file;
1100   int values_num = 0;
1101   int status;
1102
1103   time_t now;
1104
1105   cache_item_t *ci;
1106   char answer[CMD_MAX];
1107
1108 #define RRDD_UPDATE_SEND \
1109   answer[sizeof (answer) - 1] = 0; \
1110   status = swrite (fd, answer, strlen (answer)); \
1111   if (status < 0) \
1112   { \
1113     status = errno; \
1114     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1115     return (status); \
1116   }
1117
1118   now = time (NULL);
1119
1120   status = buffer_get_field (&buffer, &buffer_size, &file);
1121   if (status != 0)
1122   {
1123     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1124         sizeof (answer));
1125     RRDD_UPDATE_SEND;
1126     return (0);
1127   }
1128
1129   pthread_mutex_lock(&stats_lock);
1130   stats_updates_received++;
1131   pthread_mutex_unlock(&stats_lock);
1132
1133   pthread_mutex_lock (&cache_lock);
1134   ci = g_tree_lookup (cache_tree, file);
1135
1136   if (ci == NULL) /* {{{ */
1137   {
1138     struct stat statbuf;
1139
1140     /* don't hold the lock while we setup; stat(2) might block */
1141     pthread_mutex_unlock(&cache_lock);
1142
1143     memset (&statbuf, 0, sizeof (statbuf));
1144     status = stat (file, &statbuf);
1145     if (status != 0)
1146     {
1147       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1148
1149       status = errno;
1150       if (status == ENOENT)
1151         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1152       else
1153         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1154             status);
1155       RRDD_UPDATE_SEND;
1156       return (0);
1157     }
1158     if (!S_ISREG (statbuf.st_mode))
1159     {
1160       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1161       RRDD_UPDATE_SEND;
1162       return (0);
1163     }
1164     if (access(file, R_OK|W_OK) != 0)
1165     {
1166       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1167                 file, rrd_strerror(errno));
1168       RRDD_UPDATE_SEND;
1169       return (0);
1170     }
1171
1172     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1173     if (ci == NULL)
1174     {
1175       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1176
1177       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1178       RRDD_UPDATE_SEND;
1179       return (0);
1180     }
1181     memset (ci, 0, sizeof (cache_item_t));
1182
1183     ci->file = strdup (file);
1184     if (ci->file == NULL)
1185     {
1186       free (ci);
1187       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1188
1189       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1190       RRDD_UPDATE_SEND;
1191       return (0);
1192     }
1193
1194     _wipe_ci_values(ci, now);
1195     ci->flags = CI_FLAGS_IN_TREE;
1196
1197     pthread_mutex_lock(&cache_lock);
1198     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1199   } /* }}} */
1200   assert (ci != NULL);
1201
1202   while (buffer_size > 0)
1203   {
1204     char **temp;
1205     char *value;
1206
1207     status = buffer_get_field (&buffer, &buffer_size, &value);
1208     if (status != 0)
1209     {
1210       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1211       break;
1212     }
1213
1214     temp = (char **) realloc (ci->values,
1215         sizeof (char *) * (ci->values_num + 1));
1216     if (temp == NULL)
1217     {
1218       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1219       continue;
1220     }
1221     ci->values = temp;
1222
1223     ci->values[ci->values_num] = strdup (value);
1224     if (ci->values[ci->values_num] == NULL)
1225     {
1226       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1227       continue;
1228     }
1229     ci->values_num++;
1230
1231     values_num++;
1232   }
1233
1234   if (((now - ci->last_flush_time) >= config_write_interval)
1235       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1236       && (ci->values_num > 0))
1237   {
1238     enqueue_cache_item (ci, TAIL);
1239   }
1240
1241   pthread_mutex_unlock (&cache_lock);
1242
1243   if (values_num < 1)
1244   {
1245     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1246   }
1247   else
1248   {
1249     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1250         (values_num == 1) ? "" : "s");
1251   }
1252   RRDD_UPDATE_SEND;
1253   return (0);
1254 #undef RRDD_UPDATE_SEND
1255 } /* }}} int handle_request_update */
1256
1257 /* we came across a "WROTE" entry during journal replay.
1258  * throw away any values that we have accumulated for this file
1259  */
1260 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1261                                  const char *buffer,
1262                                  size_t buffer_size __attribute__((unused)))
1263 {
1264   int i;
1265   cache_item_t *ci;
1266   const char *file = buffer;
1267
1268   pthread_mutex_lock(&cache_lock);
1269
1270   ci = g_tree_lookup(cache_tree, file);
1271   if (ci == NULL)
1272   {
1273     pthread_mutex_unlock(&cache_lock);
1274     return (0);
1275   }
1276
1277   if (ci->values)
1278   {
1279     for (i=0; i < ci->values_num; i++)
1280       free(ci->values[i]);
1281
1282     free(ci->values);
1283   }
1284
1285   _wipe_ci_values(ci, time(NULL));
1286
1287   pthread_mutex_unlock(&cache_lock);
1288   return (0);
1289 } /* }}} int handle_request_wrote */
1290
1291 /* if fd < 0, we are in journal replay mode */
1292 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1293 {
1294   char *buffer_ptr;
1295   char *command;
1296   int status;
1297
1298   assert (buffer[buffer_size - 1] == '\0');
1299
1300   buffer_ptr = buffer;
1301   command = NULL;
1302   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1303   if (status != 0)
1304   {
1305     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1306     return (-1);
1307   }
1308
1309   if (strcasecmp (command, "update") == 0)
1310   {
1311     /* don't re-write updates in replay mode */
1312     if (fd >= 0)
1313       journal_write(command, buffer_ptr);
1314
1315     return (handle_request_update (fd, buffer_ptr, buffer_size));
1316   }
1317   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1318   {
1319     /* this is only valid in replay mode */
1320     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1321   }
1322   else if (strcasecmp (command, "flush") == 0)
1323   {
1324     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1325   }
1326   else if (strcasecmp (command, "flushall") == 0)
1327   {
1328     return (handle_request_flushall(fd));
1329   }
1330   else if (strcasecmp (command, "stats") == 0)
1331   {
1332     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1333   }
1334   else if (strcasecmp (command, "help") == 0)
1335   {
1336     return (handle_request_help (fd, buffer_ptr, buffer_size));
1337   }
1338   else
1339   {
1340     char result[CMD_MAX];
1341
1342     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1343     result[sizeof (result) - 1] = 0;
1344
1345     status = swrite (fd, result, strlen (result));
1346     if (status < 0)
1347     {
1348       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1349       return (-1);
1350     }
1351   }
1352
1353   return (0);
1354 } /* }}} int handle_request */
1355
1356 /* MUST NOT hold journal_lock before calling this */
1357 static void journal_rotate(void) /* {{{ */
1358 {
1359   FILE *old_fh = NULL;
1360
1361   if (journal_cur == NULL || journal_old == NULL)
1362     return;
1363
1364   pthread_mutex_lock(&journal_lock);
1365
1366   /* we rotate this way (rename before close) so that the we can release
1367    * the journal lock as fast as possible.  Journal writes to the new
1368    * journal can proceed immediately after the new file is opened.  The
1369    * fclose can then block without affecting new updates.
1370    */
1371   if (journal_fh != NULL)
1372   {
1373     old_fh = journal_fh;
1374     rename(journal_cur, journal_old);
1375     ++stats_journal_rotate;
1376   }
1377
1378   journal_fh = fopen(journal_cur, "a");
1379   pthread_mutex_unlock(&journal_lock);
1380
1381   if (old_fh != NULL)
1382     fclose(old_fh);
1383
1384   if (journal_fh == NULL)
1385   {
1386     RRDD_LOG(LOG_CRIT,
1387              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1388              journal_cur, rrd_strerror(errno));
1389
1390     RRDD_LOG(LOG_ERR,
1391              "JOURNALING DISABLED: All values will be flushed at shutdown");
1392     config_flush_at_shutdown = 1;
1393   }
1394
1395 } /* }}} static void journal_rotate */
1396
1397 static void journal_done(void) /* {{{ */
1398 {
1399   if (journal_cur == NULL)
1400     return;
1401
1402   pthread_mutex_lock(&journal_lock);
1403   if (journal_fh != NULL)
1404   {
1405     fclose(journal_fh);
1406     journal_fh = NULL;
1407   }
1408
1409   if (config_flush_at_shutdown)
1410   {
1411     RRDD_LOG(LOG_INFO, "removing journals");
1412     unlink(journal_old);
1413     unlink(journal_cur);
1414   }
1415   else
1416   {
1417     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1418              "journals will be used at next startup");
1419   }
1420
1421   pthread_mutex_unlock(&journal_lock);
1422
1423 } /* }}} static void journal_done */
1424
1425 static int journal_write(char *cmd, char *args) /* {{{ */
1426 {
1427   int chars;
1428
1429   if (journal_fh == NULL)
1430     return 0;
1431
1432   pthread_mutex_lock(&journal_lock);
1433   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1434   pthread_mutex_unlock(&journal_lock);
1435
1436   if (chars > 0)
1437   {
1438     pthread_mutex_lock(&stats_lock);
1439     stats_journal_bytes += chars;
1440     pthread_mutex_unlock(&stats_lock);
1441   }
1442
1443   return chars;
1444 } /* }}} static int journal_write */
1445
1446 static int journal_replay (const char *file) /* {{{ */
1447 {
1448   FILE *fh;
1449   int entry_cnt = 0;
1450   int fail_cnt = 0;
1451   uint64_t line = 0;
1452   char entry[CMD_MAX];
1453
1454   if (file == NULL) return 0;
1455
1456   fh = fopen(file, "r");
1457   if (fh == NULL)
1458   {
1459     if (errno != ENOENT)
1460       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1461                file, rrd_strerror(errno));
1462     return 0;
1463   }
1464   else
1465     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1466
1467   while(!feof(fh))
1468   {
1469     size_t entry_len;
1470
1471     ++line;
1472     fgets(entry, sizeof(entry), fh);
1473     entry_len = strlen(entry);
1474
1475     /* check \n termination in case journal writing crashed mid-line */
1476     if (entry_len == 0)
1477       continue;
1478     else if (entry[entry_len - 1] != '\n')
1479     {
1480       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1481       ++fail_cnt;
1482       continue;
1483     }
1484
1485     entry[entry_len - 1] = '\0';
1486
1487     if (handle_request(-1, entry, entry_len) == 0)
1488       ++entry_cnt;
1489     else
1490       ++fail_cnt;
1491   }
1492
1493   fclose(fh);
1494
1495   if (entry_cnt > 0)
1496   {
1497     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1498              entry_cnt, fail_cnt);
1499     return 1;
1500   }
1501   else
1502     return 0;
1503
1504 } /* }}} static int journal_replay */
1505
1506 static void *connection_thread_main (void *args) /* {{{ */
1507 {
1508   pthread_t self;
1509   int i;
1510   int fd;
1511   
1512   fd = *((int *) args);
1513   free (args);
1514
1515   pthread_mutex_lock (&connection_threads_lock);
1516   {
1517     pthread_t *temp;
1518
1519     temp = (pthread_t *) realloc (connection_threads,
1520         sizeof (pthread_t) * (connection_threads_num + 1));
1521     if (temp == NULL)
1522     {
1523       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1524     }
1525     else
1526     {
1527       connection_threads = temp;
1528       connection_threads[connection_threads_num] = pthread_self ();
1529       connection_threads_num++;
1530     }
1531   }
1532   pthread_mutex_unlock (&connection_threads_lock);
1533
1534   while (do_shutdown == 0)
1535   {
1536     char buffer[CMD_MAX];
1537
1538     struct pollfd pollfd;
1539     int status;
1540
1541     pollfd.fd = fd;
1542     pollfd.events = POLLIN | POLLPRI;
1543     pollfd.revents = 0;
1544
1545     status = poll (&pollfd, 1, /* timeout = */ 500);
1546     if (do_shutdown)
1547       break;
1548     else if (status == 0) /* timeout */
1549       continue;
1550     else if (status < 0) /* error */
1551     {
1552       status = errno;
1553       if (status == EINTR)
1554         continue;
1555       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1556       continue;
1557     }
1558
1559     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1560     {
1561       close (fd);
1562       break;
1563     }
1564     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1565     {
1566       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1567           "poll(2) returned something unexpected: %#04hx",
1568           pollfd.revents);
1569       close (fd);
1570       break;
1571     }
1572
1573     status = (int) sread (fd, buffer, sizeof (buffer));
1574     if (status <= 0)
1575     {
1576       close (fd);
1577
1578       if (status < 0)
1579         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1580
1581       break;
1582     }
1583
1584     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1585     if (status != 0)
1586       break;
1587   }
1588
1589   close(fd);
1590
1591   self = pthread_self ();
1592   /* Remove this thread from the connection threads list */
1593   pthread_mutex_lock (&connection_threads_lock);
1594   /* Find out own index in the array */
1595   for (i = 0; i < connection_threads_num; i++)
1596     if (pthread_equal (connection_threads[i], self) != 0)
1597       break;
1598   assert (i < connection_threads_num);
1599
1600   /* Move the trailing threads forward. */
1601   if (i < (connection_threads_num - 1))
1602   {
1603     memmove (connection_threads + i,
1604         connection_threads + i + 1,
1605         sizeof (pthread_t) * (connection_threads_num - i - 1));
1606   }
1607
1608   connection_threads_num--;
1609   pthread_mutex_unlock (&connection_threads_lock);
1610
1611   return (NULL);
1612 } /* }}} void *connection_thread_main */
1613
1614 static int open_listen_socket_unix (const char *path) /* {{{ */
1615 {
1616   int fd;
1617   struct sockaddr_un sa;
1618   listen_socket_t *temp;
1619   int status;
1620
1621   temp = (listen_socket_t *) realloc (listen_fds,
1622       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1623   if (temp == NULL)
1624   {
1625     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1626     return (-1);
1627   }
1628   listen_fds = temp;
1629   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1630
1631   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1632   if (fd < 0)
1633   {
1634     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1635     return (-1);
1636   }
1637
1638   memset (&sa, 0, sizeof (sa));
1639   sa.sun_family = AF_UNIX;
1640   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1641
1642   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1643   if (status != 0)
1644   {
1645     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1646     close (fd);
1647     unlink (path);
1648     return (-1);
1649   }
1650
1651   status = listen (fd, /* backlog = */ 10);
1652   if (status != 0)
1653   {
1654     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1655     close (fd);
1656     unlink (path);
1657     return (-1);
1658   }
1659   
1660   listen_fds[listen_fds_num].fd = fd;
1661   snprintf (listen_fds[listen_fds_num].path,
1662       sizeof (listen_fds[listen_fds_num].path) - 1,
1663       "unix:%s", path);
1664   listen_fds_num++;
1665
1666   return (0);
1667 } /* }}} int open_listen_socket_unix */
1668
1669 static int open_listen_socket (const char *addr_orig) /* {{{ */
1670 {
1671   struct addrinfo ai_hints;
1672   struct addrinfo *ai_res;
1673   struct addrinfo *ai_ptr;
1674   char addr_copy[NI_MAXHOST];
1675   char *addr;
1676   char *port;
1677   int status;
1678
1679   assert (addr_orig != NULL);
1680
1681   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1682   addr_copy[sizeof (addr_copy) - 1] = 0;
1683   addr = addr_copy;
1684
1685   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1686     return (open_listen_socket_unix (addr + strlen ("unix:")));
1687   else if (addr[0] == '/')
1688     return (open_listen_socket_unix (addr));
1689
1690   memset (&ai_hints, 0, sizeof (ai_hints));
1691   ai_hints.ai_flags = 0;
1692 #ifdef AI_ADDRCONFIG
1693   ai_hints.ai_flags |= AI_ADDRCONFIG;
1694 #endif
1695   ai_hints.ai_family = AF_UNSPEC;
1696   ai_hints.ai_socktype = SOCK_STREAM;
1697
1698   port = NULL;
1699  if (*addr == '[') /* IPv6+port format */
1700   {
1701     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1702     addr++;
1703
1704     port = strchr (addr, ']');
1705     if (port == NULL)
1706     {
1707       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1708           addr_orig);
1709       return (-1);
1710     }
1711     *port = 0;
1712     port++;
1713
1714     if (*port == ':')
1715       port++;
1716     else if (*port == 0)
1717       port = NULL;
1718     else
1719     {
1720       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1721           port);
1722       return (-1);
1723     }
1724   } /* if (*addr = ']') */
1725   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1726   {
1727     port = rindex(addr, ':');
1728     if (port != NULL)
1729     {
1730       *port = 0;
1731       port++;
1732     }
1733   }
1734   ai_res = NULL;
1735   status = getaddrinfo (addr,
1736                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1737                         &ai_hints, &ai_res);
1738   if (status != 0)
1739   {
1740     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1741         "%s", addr, gai_strerror (status));
1742     return (-1);
1743   }
1744
1745   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1746   {
1747     int fd;
1748     listen_socket_t *temp;
1749     int one = 1;
1750
1751     temp = (listen_socket_t *) realloc (listen_fds,
1752         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1753     if (temp == NULL)
1754     {
1755       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1756       continue;
1757     }
1758     listen_fds = temp;
1759     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1760
1761     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1762     if (fd < 0)
1763     {
1764       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1765       continue;
1766     }
1767
1768     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1769
1770     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1771     if (status != 0)
1772     {
1773       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1774       close (fd);
1775       continue;
1776     }
1777
1778     status = listen (fd, /* backlog = */ 10);
1779     if (status != 0)
1780     {
1781       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1782       close (fd);
1783       return (-1);
1784     }
1785
1786     listen_fds[listen_fds_num].fd = fd;
1787     strncpy (listen_fds[listen_fds_num].path, addr,
1788         sizeof (listen_fds[listen_fds_num].path) - 1);
1789     listen_fds_num++;
1790   } /* for (ai_ptr) */
1791
1792   return (0);
1793 } /* }}} int open_listen_socket */
1794
1795 static int close_listen_sockets (void) /* {{{ */
1796 {
1797   size_t i;
1798
1799   for (i = 0; i < listen_fds_num; i++)
1800   {
1801     close (listen_fds[i].fd);
1802     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1803       unlink (listen_fds[i].path + strlen ("unix:"));
1804   }
1805
1806   free (listen_fds);
1807   listen_fds = NULL;
1808   listen_fds_num = 0;
1809
1810   return (0);
1811 } /* }}} int close_listen_sockets */
1812
1813 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1814 {
1815   struct pollfd *pollfds;
1816   int pollfds_num;
1817   int status;
1818   int i;
1819
1820   for (i = 0; i < config_listen_address_list_len; i++)
1821     open_listen_socket (config_listen_address_list[i]);
1822
1823   if (config_listen_address_list_len < 1)
1824     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1825
1826   if (listen_fds_num < 1)
1827   {
1828     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1829         "could be opened. Sorry.");
1830     return (NULL);
1831   }
1832
1833   pollfds_num = listen_fds_num;
1834   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1835   if (pollfds == NULL)
1836   {
1837     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1838     return (NULL);
1839   }
1840   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1841
1842   RRDD_LOG(LOG_INFO, "listening for connections");
1843
1844   while (do_shutdown == 0)
1845   {
1846     assert (pollfds_num == ((int) listen_fds_num));
1847     for (i = 0; i < pollfds_num; i++)
1848     {
1849       pollfds[i].fd = listen_fds[i].fd;
1850       pollfds[i].events = POLLIN | POLLPRI;
1851       pollfds[i].revents = 0;
1852     }
1853
1854     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1855     if (do_shutdown)
1856       break;
1857     else if (status == 0) /* timeout */
1858       continue;
1859     else if (status < 0) /* error */
1860     {
1861       status = errno;
1862       if (status != EINTR)
1863       {
1864         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1865       }
1866       continue;
1867     }
1868
1869     for (i = 0; i < pollfds_num; i++)
1870     {
1871       int *client_sd;
1872       struct sockaddr_storage client_sa;
1873       socklen_t client_sa_size;
1874       pthread_t tid;
1875       pthread_attr_t attr;
1876
1877       if (pollfds[i].revents == 0)
1878         continue;
1879
1880       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1881       {
1882         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1883             "poll(2) returned something unexpected for listen FD #%i.",
1884             pollfds[i].fd);
1885         continue;
1886       }
1887
1888       client_sd = (int *) malloc (sizeof (int));
1889       if (client_sd == NULL)
1890       {
1891         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1892         continue;
1893       }
1894
1895       client_sa_size = sizeof (client_sa);
1896       *client_sd = accept (pollfds[i].fd,
1897           (struct sockaddr *) &client_sa, &client_sa_size);
1898       if (*client_sd < 0)
1899       {
1900         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1901         continue;
1902       }
1903
1904       pthread_attr_init (&attr);
1905       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1906
1907       status = pthread_create (&tid, &attr, connection_thread_main,
1908           /* args = */ (void *) client_sd);
1909       if (status != 0)
1910       {
1911         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1912         close (*client_sd);
1913         free (client_sd);
1914         continue;
1915       }
1916     } /* for (pollfds_num) */
1917   } /* while (do_shutdown == 0) */
1918
1919   RRDD_LOG(LOG_INFO, "starting shutdown");
1920
1921   close_listen_sockets ();
1922
1923   pthread_mutex_lock (&connection_threads_lock);
1924   while (connection_threads_num > 0)
1925   {
1926     pthread_t wait_for;
1927
1928     wait_for = connection_threads[0];
1929
1930     pthread_mutex_unlock (&connection_threads_lock);
1931     pthread_join (wait_for, /* retval = */ NULL);
1932     pthread_mutex_lock (&connection_threads_lock);
1933   }
1934   pthread_mutex_unlock (&connection_threads_lock);
1935
1936   return (NULL);
1937 } /* }}} void *listen_thread_main */
1938
1939 static int daemonize (void) /* {{{ */
1940 {
1941   int status;
1942   int fd;
1943
1944   fd = open_pidfile();
1945   if (fd < 0) return fd;
1946
1947   if (!stay_foreground)
1948   {
1949     pid_t child;
1950     char *base_dir;
1951
1952     child = fork ();
1953     if (child < 0)
1954     {
1955       fprintf (stderr, "daemonize: fork(2) failed.\n");
1956       return (-1);
1957     }
1958     else if (child > 0)
1959     {
1960       return (1);
1961     }
1962
1963     /* Change into the /tmp directory. */
1964     base_dir = (config_base_dir != NULL)
1965       ? config_base_dir
1966       : "/tmp";
1967     status = chdir (base_dir);
1968     if (status != 0)
1969     {
1970       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1971       return (-1);
1972     }
1973
1974     /* Become session leader */
1975     setsid ();
1976
1977     /* Open the first three file descriptors to /dev/null */
1978     close (2);
1979     close (1);
1980     close (0);
1981
1982     open ("/dev/null", O_RDWR);
1983     dup (0);
1984     dup (0);
1985   } /* if (!stay_foreground) */
1986
1987   install_signal_handlers();
1988
1989   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1990   RRDD_LOG(LOG_INFO, "starting up");
1991
1992   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1993   if (cache_tree == NULL)
1994   {
1995     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1996     return (-1);
1997   }
1998
1999   status = write_pidfile (fd);
2000   return status;
2001 } /* }}} int daemonize */
2002
2003 static int cleanup (void) /* {{{ */
2004 {
2005   do_shutdown++;
2006
2007   pthread_cond_signal (&cache_cond);
2008   pthread_join (queue_thread, /* return = */ NULL);
2009
2010   remove_pidfile ();
2011
2012   RRDD_LOG(LOG_INFO, "goodbye");
2013   closelog ();
2014
2015   return (0);
2016 } /* }}} int cleanup */
2017
2018 static int read_options (int argc, char **argv) /* {{{ */
2019 {
2020   int option;
2021   int status = 0;
2022
2023   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?F")) != -1)
2024   {
2025     switch (option)
2026     {
2027       case 'g':
2028         stay_foreground=1;
2029         break;
2030
2031       case 'l':
2032       {
2033         char **temp;
2034
2035         temp = (char **) realloc (config_listen_address_list,
2036             sizeof (char *) * (config_listen_address_list_len + 1));
2037         if (temp == NULL)
2038         {
2039           fprintf (stderr, "read_options: realloc failed.\n");
2040           return (2);
2041         }
2042         config_listen_address_list = temp;
2043
2044         temp[config_listen_address_list_len] = strdup (optarg);
2045         if (temp[config_listen_address_list_len] == NULL)
2046         {
2047           fprintf (stderr, "read_options: strdup failed.\n");
2048           return (2);
2049         }
2050         config_listen_address_list_len++;
2051       }
2052       break;
2053
2054       case 'f':
2055       {
2056         int temp;
2057
2058         temp = atoi (optarg);
2059         if (temp > 0)
2060           config_flush_interval = temp;
2061         else
2062         {
2063           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2064           status = 3;
2065         }
2066       }
2067       break;
2068
2069       case 'w':
2070       {
2071         int temp;
2072
2073         temp = atoi (optarg);
2074         if (temp > 0)
2075           config_write_interval = temp;
2076         else
2077         {
2078           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2079           status = 2;
2080         }
2081       }
2082       break;
2083
2084       case 'z':
2085       {
2086         int temp;
2087
2088         temp = atoi(optarg);
2089         if (temp > 0)
2090           config_write_jitter = temp;
2091         else
2092         {
2093           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2094           status = 2;
2095         }
2096
2097         break;
2098       }
2099
2100       case 'b':
2101       {
2102         size_t len;
2103
2104         if (config_base_dir != NULL)
2105           free (config_base_dir);
2106         config_base_dir = strdup (optarg);
2107         if (config_base_dir == NULL)
2108         {
2109           fprintf (stderr, "read_options: strdup failed.\n");
2110           return (3);
2111         }
2112
2113         len = strlen (config_base_dir);
2114         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2115         {
2116           config_base_dir[len - 1] = 0;
2117           len--;
2118         }
2119
2120         if (len < 1)
2121         {
2122           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2123           return (4);
2124         }
2125       }
2126       break;
2127
2128       case 'p':
2129       {
2130         if (config_pid_file != NULL)
2131           free (config_pid_file);
2132         config_pid_file = strdup (optarg);
2133         if (config_pid_file == NULL)
2134         {
2135           fprintf (stderr, "read_options: strdup failed.\n");
2136           return (3);
2137         }
2138       }
2139       break;
2140
2141       case 'F':
2142         config_flush_at_shutdown = 1;
2143         break;
2144
2145       case 'j':
2146       {
2147         struct stat statbuf;
2148         const char *dir = optarg;
2149
2150         status = stat(dir, &statbuf);
2151         if (status != 0)
2152         {
2153           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2154           return 6;
2155         }
2156
2157         if (!S_ISDIR(statbuf.st_mode)
2158             || access(dir, R_OK|W_OK|X_OK) != 0)
2159         {
2160           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2161                   errno ? rrd_strerror(errno) : "");
2162           return 6;
2163         }
2164
2165         journal_cur = malloc(PATH_MAX + 1);
2166         journal_old = malloc(PATH_MAX + 1);
2167         if (journal_cur == NULL || journal_old == NULL)
2168         {
2169           fprintf(stderr, "malloc failure for journal files\n");
2170           return 6;
2171         }
2172         else 
2173         {
2174           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2175           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2176         }
2177       }
2178       break;
2179
2180       case 'h':
2181       case '?':
2182         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2183             "\n"
2184             "Usage: rrdcached [options]\n"
2185             "\n"
2186             "Valid options are:\n"
2187             "  -l <address>  Socket address to listen to.\n"
2188             "  -w <seconds>  Interval in which to write data.\n"
2189             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2190             "  -f <seconds>  Interval in which to flush dead data.\n"
2191             "  -p <file>     Location of the PID-file.\n"
2192             "  -b <dir>      Base directory to change to.\n"
2193             "  -g            Do not fork and run in the foreground.\n"
2194             "  -j <dir>      Directory in which to create the journal files.\n"
2195             "  -F            Always flush all updates at shutdown\n"
2196             "\n"
2197             "For more information and a detailed description of all options "
2198             "please refer\n"
2199             "to the rrdcached(1) manual page.\n",
2200             VERSION);
2201         status = -1;
2202         break;
2203     } /* switch (option) */
2204   } /* while (getopt) */
2205
2206   /* advise the user when values are not sane */
2207   if (config_flush_interval < 2 * config_write_interval)
2208     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2209             " 2x write interval (-w) !\n");
2210   if (config_write_jitter > config_write_interval)
2211     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2212             " write interval (-w) !\n");
2213
2214   if (journal_cur == NULL)
2215     config_flush_at_shutdown = 1;
2216
2217   return (status);
2218 } /* }}} int read_options */
2219
2220 int main (int argc, char **argv)
2221 {
2222   int status;
2223
2224   status = read_options (argc, argv);
2225   if (status != 0)
2226   {
2227     if (status < 0)
2228       status = 0;
2229     return (status);
2230   }
2231
2232   status = daemonize ();
2233   if (status == 1)
2234   {
2235     struct sigaction sigchld;
2236
2237     memset (&sigchld, 0, sizeof (sigchld));
2238     sigchld.sa_handler = SIG_IGN;
2239     sigaction (SIGCHLD, &sigchld, NULL);
2240
2241     return (0);
2242   }
2243   else if (status != 0)
2244   {
2245     fprintf (stderr, "daemonize failed, exiting.\n");
2246     return (1);
2247   }
2248
2249   if (journal_cur != NULL)
2250   {
2251     int had_journal = 0;
2252
2253     pthread_mutex_lock(&journal_lock);
2254
2255     RRDD_LOG(LOG_INFO, "checking for journal files");
2256
2257     had_journal += journal_replay(journal_old);
2258     had_journal += journal_replay(journal_cur);
2259
2260     if (had_journal)
2261       flush_old_values(-1);
2262
2263     pthread_mutex_unlock(&journal_lock);
2264     journal_rotate();
2265
2266     RRDD_LOG(LOG_INFO, "journal processing complete");
2267   }
2268
2269   /* start the queue thread */
2270   memset (&queue_thread, 0, sizeof (queue_thread));
2271   status = pthread_create (&queue_thread,
2272                            NULL, /* attr */
2273                            queue_thread_main,
2274                            NULL); /* args */
2275   if (status != 0)
2276   {
2277     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2278     cleanup();
2279     return (1);
2280   }
2281
2282   listen_thread_main (NULL);
2283   cleanup ();
2284
2285   return (0);
2286 } /* int main */
2287
2288 /*
2289  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2290  */