This patch introduces the concept of socket privilege levels. "UPDATE"
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 struct listen_socket_s
111 {
112   int fd;
113   char addr[PATH_MAX + 1];
114   int family;
115   socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
118
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
122 {
123   char *file;
124   char **values;
125   int values_num;
126   time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE  (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129   int flags;
130   pthread_cond_t  flushed;
131   cache_item_t *next;
132 };
133
134 struct callback_flush_data_s
135 {
136   time_t now;
137   time_t abs_timeout;
138   char **keys;
139   size_t keys_num;
140 };
141 typedef struct callback_flush_data_s callback_flush_data_t;
142
143 enum queue_side_e
144 {
145   HEAD,
146   TAIL
147 };
148 typedef enum queue_side_e queue_side_t;
149
150 /* max length of socket command or response */
151 #define CMD_MAX 4096
152
153 /*
154  * Variables
155  */
156 static int stay_foreground = 0;
157
158 static listen_socket_t *listen_fds = NULL;
159 static size_t listen_fds_num = 0;
160
161 static int do_shutdown = 0;
162
163 static pthread_t queue_thread;
164
165 static pthread_t *connection_threads = NULL;
166 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
167 static int connection_threads_num = 0;
168
169 /* Cache stuff */
170 static GTree          *cache_tree = NULL;
171 static cache_item_t   *cache_queue_head = NULL;
172 static cache_item_t   *cache_queue_tail = NULL;
173 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
174 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
175
176 static int config_write_interval = 300;
177 static int config_write_jitter   = 0;
178 static int config_flush_interval = 3600;
179 static int config_flush_at_shutdown = 0;
180 static char *config_pid_file = NULL;
181 static char *config_base_dir = NULL;
182
183 static listen_socket_t **config_listen_address_list = NULL;
184 static int config_listen_address_list_len = 0;
185
186 static uint64_t stats_queue_length = 0;
187 static uint64_t stats_updates_received = 0;
188 static uint64_t stats_flush_received = 0;
189 static uint64_t stats_updates_written = 0;
190 static uint64_t stats_data_sets_written = 0;
191 static uint64_t stats_journal_bytes = 0;
192 static uint64_t stats_journal_rotate = 0;
193 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
194
195 /* Journaled updates */
196 static char *journal_cur = NULL;
197 static char *journal_old = NULL;
198 static FILE *journal_fh = NULL;
199 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
200 static int journal_write(char *cmd, char *args);
201 static void journal_done(void);
202 static void journal_rotate(void);
203
204 /* 
205  * Functions
206  */
207 static void sig_common (const char *sig) /* {{{ */
208 {
209   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
210   do_shutdown++;
211   pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_common */
213
214 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
215 {
216   sig_common("INT");
217 } /* }}} void sig_int_handler */
218
219 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
220 {
221   sig_common("TERM");
222 } /* }}} void sig_term_handler */
223
224 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
225 {
226   config_flush_at_shutdown = 1;
227   sig_common("USR1");
228 } /* }}} void sig_usr1_handler */
229
230 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
231 {
232   config_flush_at_shutdown = 0;
233   sig_common("USR2");
234 } /* }}} void sig_usr2_handler */
235
236 static void install_signal_handlers(void) /* {{{ */
237 {
238   /* These structures are static, because `sigaction' behaves weird if the are
239    * overwritten.. */
240   static struct sigaction sa_int;
241   static struct sigaction sa_term;
242   static struct sigaction sa_pipe;
243   static struct sigaction sa_usr1;
244   static struct sigaction sa_usr2;
245
246   /* Install signal handlers */
247   memset (&sa_int, 0, sizeof (sa_int));
248   sa_int.sa_handler = sig_int_handler;
249   sigaction (SIGINT, &sa_int, NULL);
250
251   memset (&sa_term, 0, sizeof (sa_term));
252   sa_term.sa_handler = sig_term_handler;
253   sigaction (SIGTERM, &sa_term, NULL);
254
255   memset (&sa_pipe, 0, sizeof (sa_pipe));
256   sa_pipe.sa_handler = SIG_IGN;
257   sigaction (SIGPIPE, &sa_pipe, NULL);
258
259   memset (&sa_pipe, 0, sizeof (sa_usr1));
260   sa_usr1.sa_handler = sig_usr1_handler;
261   sigaction (SIGUSR1, &sa_usr1, NULL);
262
263   memset (&sa_usr2, 0, sizeof (sa_usr2));
264   sa_usr2.sa_handler = sig_usr2_handler;
265   sigaction (SIGUSR2, &sa_usr2, NULL);
266
267 } /* }}} void install_signal_handlers */
268
269 static int open_pidfile(void) /* {{{ */
270 {
271   int fd;
272   char *file;
273
274   file = (config_pid_file != NULL)
275     ? config_pid_file
276     : LOCALSTATEDIR "/run/rrdcached.pid";
277
278   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
279   if (fd < 0)
280     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
281             file, rrd_strerror(errno));
282
283   return(fd);
284 } /* }}} static int open_pidfile */
285
286 static int write_pidfile (int fd) /* {{{ */
287 {
288   pid_t pid;
289   FILE *fh;
290
291   pid = getpid ();
292
293   fh = fdopen (fd, "w");
294   if (fh == NULL)
295   {
296     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
297     close(fd);
298     return (-1);
299   }
300
301   fprintf (fh, "%i\n", (int) pid);
302   fclose (fh);
303
304   return (0);
305 } /* }}} int write_pidfile */
306
307 static int remove_pidfile (void) /* {{{ */
308 {
309   char *file;
310   int status;
311
312   file = (config_pid_file != NULL)
313     ? config_pid_file
314     : LOCALSTATEDIR "/run/rrdcached.pid";
315
316   status = unlink (file);
317   if (status == 0)
318     return (0);
319   return (errno);
320 } /* }}} int remove_pidfile */
321
322 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
323 {
324   char    *buffer;
325   size_t   buffer_used;
326   size_t   buffer_free;
327   ssize_t  status;
328
329   buffer       = (char *) buffer_void;
330   buffer_used  = 0;
331   buffer_free  = buffer_size;
332
333   while (buffer_free > 0)
334   {
335     status = read (fd, buffer + buffer_used, buffer_free);
336     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
337       continue;
338
339     if (status < 0)
340       return (-1);
341
342     if (status == 0)
343       return (0);
344
345     assert ((0 > status) || (buffer_free >= (size_t) status));
346
347     buffer_free = buffer_free - status;
348     buffer_used = buffer_used + status;
349
350     if (buffer[buffer_used - 1] == '\n')
351       break;
352   }
353
354   assert (buffer_used > 0);
355
356   if (buffer[buffer_used - 1] != '\n')
357   {
358     errno = ENOBUFS;
359     return (-1);
360   }
361
362   buffer[buffer_used - 1] = 0;
363
364   /* Fix network line endings. */
365   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
366   {
367     buffer_used--;
368     buffer[buffer_used - 1] = 0;
369   }
370
371   return (buffer_used);
372 } /* }}} ssize_t sread */
373
374 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
375 {
376   const char *ptr;
377   size_t      nleft;
378   ssize_t     status;
379
380   /* special case for journal replay */
381   if (fd < 0) return 0;
382
383   ptr   = (const char *) buf;
384   nleft = count;
385
386   while (nleft > 0)
387   {
388     status = write (fd, (const void *) ptr, nleft);
389
390     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
391       continue;
392
393     if (status < 0)
394       return (status);
395
396     nleft -= status;
397     ptr   += status;
398   }
399
400   return (0);
401 } /* }}} ssize_t swrite */
402
403 static void _wipe_ci_values(cache_item_t *ci, time_t when)
404 {
405   ci->values = NULL;
406   ci->values_num = 0;
407
408   ci->last_flush_time = when;
409   if (config_write_jitter > 0)
410     ci->last_flush_time += (random() % config_write_jitter);
411
412   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
413 }
414
415 /*
416  * enqueue_cache_item:
417  * `cache_lock' must be acquired before calling this function!
418  */
419 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
420     queue_side_t side)
421 {
422   int did_insert = 0;
423
424   if (ci == NULL)
425     return (-1);
426
427   if (ci->values_num == 0)
428     return (0);
429
430   if (side == HEAD)
431   {
432     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
433     {
434       assert (ci->next == NULL);
435       ci->next = cache_queue_head;
436       cache_queue_head = ci;
437
438       if (cache_queue_tail == NULL)
439         cache_queue_tail = cache_queue_head;
440
441       did_insert = 1;
442     }
443     else if (cache_queue_head == ci)
444     {
445       /* do nothing */
446     }
447     else /* enqueued, but not first entry */
448     {
449       cache_item_t *prev;
450
451       /* find previous entry */
452       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
453         if (prev->next == ci)
454           break;
455       assert (prev != NULL);
456
457       /* move to the front */
458       prev->next = ci->next;
459       ci->next = cache_queue_head;
460       cache_queue_head = ci;
461
462       /* check if we need to adapt the tail */
463       if (cache_queue_tail == ci)
464         cache_queue_tail = prev;
465     }
466   }
467   else /* (side == TAIL) */
468   {
469     /* We don't move values back in the list.. */
470     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
471       return (0);
472
473     assert (ci->next == NULL);
474
475     if (cache_queue_tail == NULL)
476       cache_queue_head = ci;
477     else
478       cache_queue_tail->next = ci;
479     cache_queue_tail = ci;
480
481     did_insert = 1;
482   }
483
484   ci->flags |= CI_FLAGS_IN_QUEUE;
485
486   if (did_insert)
487   {
488     pthread_cond_broadcast(&cache_cond);
489     pthread_mutex_lock (&stats_lock);
490     stats_queue_length++;
491     pthread_mutex_unlock (&stats_lock);
492   }
493
494   return (0);
495 } /* }}} int enqueue_cache_item */
496
497 /*
498  * tree_callback_flush:
499  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
500  * while this is in progress.
501  */
502 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
503     gpointer data)
504 {
505   cache_item_t *ci;
506   callback_flush_data_t *cfd;
507
508   ci = (cache_item_t *) value;
509   cfd = (callback_flush_data_t *) data;
510
511   if ((ci->last_flush_time <= cfd->abs_timeout)
512       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
513       && (ci->values_num > 0))
514   {
515     enqueue_cache_item (ci, TAIL);
516   }
517   else if ((do_shutdown != 0)
518       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
519       && (ci->values_num > 0))
520   {
521     enqueue_cache_item (ci, TAIL);
522   }
523   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
524       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
525       && (ci->values_num <= 0))
526   {
527     char **temp;
528
529     temp = (char **) realloc (cfd->keys,
530         sizeof (char *) * (cfd->keys_num + 1));
531     if (temp == NULL)
532     {
533       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
534       return (FALSE);
535     }
536     cfd->keys = temp;
537     /* Make really sure this points to the _same_ place */
538     assert ((char *) key == ci->file);
539     cfd->keys[cfd->keys_num] = (char *) key;
540     cfd->keys_num++;
541   }
542
543   return (FALSE);
544 } /* }}} gboolean tree_callback_flush */
545
546 static int flush_old_values (int max_age)
547 {
548   callback_flush_data_t cfd;
549   size_t k;
550
551   memset (&cfd, 0, sizeof (cfd));
552   /* Pass the current time as user data so that we don't need to call
553    * `time' for each node. */
554   cfd.now = time (NULL);
555   cfd.keys = NULL;
556   cfd.keys_num = 0;
557
558   if (max_age > 0)
559     cfd.abs_timeout = cfd.now - max_age;
560   else
561     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
562
563   /* `tree_callback_flush' will return the keys of all values that haven't
564    * been touched in the last `config_flush_interval' seconds in `cfd'.
565    * The char*'s in this array point to the same memory as ci->file, so we
566    * don't need to free them separately. */
567   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
568
569   for (k = 0; k < cfd.keys_num; k++)
570   {
571     cache_item_t *ci;
572
573     /* This must not fail. */
574     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
575     assert (ci != NULL);
576
577     /* If we end up here with values available, something's seriously
578      * messed up. */
579     assert (ci->values_num == 0);
580
581     /* Remove the node from the tree */
582     g_tree_remove (cache_tree, cfd.keys[k]);
583     cfd.keys[k] = NULL;
584
585     /* Now free and clean up `ci'. */
586     free (ci->file);
587     ci->file = NULL;
588     free (ci);
589     ci = NULL;
590   } /* for (k = 0; k < cfd.keys_num; k++) */
591
592   if (cfd.keys != NULL)
593   {
594     free (cfd.keys);
595     cfd.keys = NULL;
596   }
597
598   return (0);
599 } /* int flush_old_values */
600
601 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
602 {
603   struct timeval now;
604   struct timespec next_flush;
605   int final_flush = 0; /* make sure we only flush once on shutdown */
606
607   gettimeofday (&now, NULL);
608   next_flush.tv_sec = now.tv_sec + config_flush_interval;
609   next_flush.tv_nsec = 1000 * now.tv_usec;
610
611   pthread_mutex_lock (&cache_lock);
612   while ((do_shutdown == 0) || (cache_queue_head != NULL))
613   {
614     cache_item_t *ci;
615     char *file;
616     char **values;
617     int values_num;
618     int status;
619     int i;
620
621     /* First, check if it's time to do the cache flush. */
622     gettimeofday (&now, NULL);
623     if ((now.tv_sec > next_flush.tv_sec)
624         || ((now.tv_sec == next_flush.tv_sec)
625           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
626     {
627       /* Flush all values that haven't been written in the last
628        * `config_write_interval' seconds. */
629       flush_old_values (config_write_interval);
630
631       /* Determine the time of the next cache flush. */
632       while (next_flush.tv_sec <= now.tv_sec)
633         next_flush.tv_sec += config_flush_interval;
634
635       /* unlock the cache while we rotate so we don't block incoming
636        * updates if the fsync() blocks on disk I/O */
637       pthread_mutex_unlock(&cache_lock);
638       journal_rotate();
639       pthread_mutex_lock(&cache_lock);
640     }
641
642     /* Now, check if there's something to store away. If not, wait until
643      * something comes in or it's time to do the cache flush.  if we are
644      * shutting down, do not wait around.  */
645     if (cache_queue_head == NULL && !do_shutdown)
646     {
647       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
648       if ((status != 0) && (status != ETIMEDOUT))
649       {
650         RRDD_LOG (LOG_ERR, "queue_thread_main: "
651             "pthread_cond_timedwait returned %i.", status);
652       }
653     }
654
655     /* We're about to shut down */
656     if (do_shutdown != 0 && !final_flush++)
657     {
658       if (config_flush_at_shutdown)
659         flush_old_values (-1); /* flush everything */
660       else
661         break;
662     }
663
664     /* Check if a value has arrived. This may be NULL if we timed out or there
665      * was an interrupt such as a signal. */
666     if (cache_queue_head == NULL)
667       continue;
668
669     ci = cache_queue_head;
670
671     /* copy the relevant parts */
672     file = strdup (ci->file);
673     if (file == NULL)
674     {
675       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
676       continue;
677     }
678
679     assert(ci->values != NULL);
680     assert(ci->values_num > 0);
681
682     values = ci->values;
683     values_num = ci->values_num;
684
685     _wipe_ci_values(ci, time(NULL));
686
687     cache_queue_head = ci->next;
688     if (cache_queue_head == NULL)
689       cache_queue_tail = NULL;
690     ci->next = NULL;
691
692     pthread_mutex_lock (&stats_lock);
693     assert (stats_queue_length > 0);
694     stats_queue_length--;
695     pthread_mutex_unlock (&stats_lock);
696
697     pthread_mutex_unlock (&cache_lock);
698
699     rrd_clear_error ();
700     status = rrd_update_r (file, NULL, values_num, (void *) values);
701     if (status != 0)
702     {
703       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
704           "rrd_update_r (%s) failed with status %i. (%s)",
705           file, status, rrd_get_error());
706     }
707
708     journal_write("wrote", file);
709     pthread_cond_broadcast(&ci->flushed);
710
711     for (i = 0; i < values_num; i++)
712       free (values[i]);
713
714     free(values);
715     free(file);
716
717     if (status == 0)
718     {
719       pthread_mutex_lock (&stats_lock);
720       stats_updates_written++;
721       stats_data_sets_written += values_num;
722       pthread_mutex_unlock (&stats_lock);
723     }
724
725     pthread_mutex_lock (&cache_lock);
726
727     /* We're about to shut down */
728     if (do_shutdown != 0 && !final_flush++)
729     {
730       if (config_flush_at_shutdown)
731           flush_old_values (-1); /* flush everything */
732       else
733         break;
734     }
735   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
736   pthread_mutex_unlock (&cache_lock);
737
738   if (config_flush_at_shutdown)
739   {
740     assert(cache_queue_head == NULL);
741     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
742   }
743
744   journal_done();
745
746   return (NULL);
747 } /* }}} void *queue_thread_main */
748
749 static int buffer_get_field (char **buffer_ret, /* {{{ */
750     size_t *buffer_size_ret, char **field_ret)
751 {
752   char *buffer;
753   size_t buffer_pos;
754   size_t buffer_size;
755   char *field;
756   size_t field_size;
757   int status;
758
759   buffer = *buffer_ret;
760   buffer_pos = 0;
761   buffer_size = *buffer_size_ret;
762   field = *buffer_ret;
763   field_size = 0;
764
765   if (buffer_size <= 0)
766     return (-1);
767
768   /* This is ensured by `handle_request'. */
769   assert (buffer[buffer_size - 1] == '\0');
770
771   status = -1;
772   while (buffer_pos < buffer_size)
773   {
774     /* Check for end-of-field or end-of-buffer */
775     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
776     {
777       field[field_size] = 0;
778       field_size++;
779       buffer_pos++;
780       status = 0;
781       break;
782     }
783     /* Handle escaped characters. */
784     else if (buffer[buffer_pos] == '\\')
785     {
786       if (buffer_pos >= (buffer_size - 1))
787         break;
788       buffer_pos++;
789       field[field_size] = buffer[buffer_pos];
790       field_size++;
791       buffer_pos++;
792     }
793     /* Normal operation */ 
794     else
795     {
796       field[field_size] = buffer[buffer_pos];
797       field_size++;
798       buffer_pos++;
799     }
800   } /* while (buffer_pos < buffer_size) */
801
802   if (status != 0)
803     return (status);
804
805   *buffer_ret = buffer + buffer_pos;
806   *buffer_size_ret = buffer_size - buffer_pos;
807   *field_ret = field;
808
809   return (0);
810 } /* }}} int buffer_get_field */
811
812 static int flush_file (const char *filename) /* {{{ */
813 {
814   cache_item_t *ci;
815
816   pthread_mutex_lock (&cache_lock);
817
818   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
819   if (ci == NULL)
820   {
821     pthread_mutex_unlock (&cache_lock);
822     return (ENOENT);
823   }
824
825   if (ci->values_num > 0)
826   {
827     /* Enqueue at head */
828     enqueue_cache_item (ci, HEAD);
829     pthread_cond_wait(&ci->flushed, &cache_lock);
830   }
831
832   pthread_mutex_unlock(&cache_lock);
833
834   return (0);
835 } /* }}} int flush_file */
836
837 static int handle_request_help (int fd, /* {{{ */
838     char *buffer, size_t buffer_size)
839 {
840   int status;
841   char **help_text;
842   size_t help_text_len;
843   char *command;
844   size_t i;
845
846   char *help_help[] =
847   {
848     "5 Command overview\n",
849     "FLUSH <filename>\n",
850     "FLUSHALL\n",
851     "HELP [<command>]\n",
852     "UPDATE <filename> <values> [<values> ...]\n",
853     "STATS\n"
854   };
855   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
856
857   char *help_flush[] =
858   {
859     "4 Help for FLUSH\n",
860     "Usage: FLUSH <filename>\n",
861     "\n",
862     "Adds the given filename to the head of the update queue and returns\n",
863     "after is has been dequeued.\n"
864   };
865   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
866
867   char *help_flushall[] =
868   {
869     "3 Help for FLUSHALL\n",
870     "Usage: FLUSHALL\n",
871     "\n",
872     "Triggers writing of all pending updates.  Returns immediately.\n"
873   };
874   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
875
876   char *help_update[] =
877   {
878     "9 Help for UPDATE\n",
879     "Usage: UPDATE <filename> <values> [<values> ...]\n"
880     "\n",
881     "Adds the given file to the internal cache if it is not yet known and\n",
882     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
883     "for details.\n",
884     "\n",
885     "Each <values> has the following form:\n",
886     "  <values> = <time>:<value>[:<value>[...]]\n",
887     "See the rrdupdate(1) manpage for details.\n"
888   };
889   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
890
891   char *help_stats[] =
892   {
893     "4 Help for STATS\n",
894     "Usage: STATS\n",
895     "\n",
896     "Returns some performance counters, see the rrdcached(1) manpage for\n",
897     "a description of the values.\n"
898   };
899   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
900
901   status = buffer_get_field (&buffer, &buffer_size, &command);
902   if (status != 0)
903   {
904     help_text = help_help;
905     help_text_len = help_help_len;
906   }
907   else
908   {
909     if (strcasecmp (command, "update") == 0)
910     {
911       help_text = help_update;
912       help_text_len = help_update_len;
913     }
914     else if (strcasecmp (command, "flush") == 0)
915     {
916       help_text = help_flush;
917       help_text_len = help_flush_len;
918     }
919     else if (strcasecmp (command, "flushall") == 0)
920     {
921       help_text = help_flushall;
922       help_text_len = help_flushall_len;
923     }
924     else if (strcasecmp (command, "stats") == 0)
925     {
926       help_text = help_stats;
927       help_text_len = help_stats_len;
928     }
929     else
930     {
931       help_text = help_help;
932       help_text_len = help_help_len;
933     }
934   }
935
936   for (i = 0; i < help_text_len; i++)
937   {
938     status = swrite (fd, help_text[i], strlen (help_text[i]));
939     if (status < 0)
940     {
941       status = errno;
942       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
943       return (status);
944     }
945   }
946
947   return (0);
948 } /* }}} int handle_request_help */
949
950 static int handle_request_stats (int fd, /* {{{ */
951     char *buffer __attribute__((unused)),
952     size_t buffer_size __attribute__((unused)))
953 {
954   int status;
955   char outbuf[CMD_MAX];
956
957   uint64_t copy_queue_length;
958   uint64_t copy_updates_received;
959   uint64_t copy_flush_received;
960   uint64_t copy_updates_written;
961   uint64_t copy_data_sets_written;
962   uint64_t copy_journal_bytes;
963   uint64_t copy_journal_rotate;
964
965   uint64_t tree_nodes_number;
966   uint64_t tree_depth;
967
968   pthread_mutex_lock (&stats_lock);
969   copy_queue_length       = stats_queue_length;
970   copy_updates_received   = stats_updates_received;
971   copy_flush_received     = stats_flush_received;
972   copy_updates_written    = stats_updates_written;
973   copy_data_sets_written  = stats_data_sets_written;
974   copy_journal_bytes      = stats_journal_bytes;
975   copy_journal_rotate     = stats_journal_rotate;
976   pthread_mutex_unlock (&stats_lock);
977
978   pthread_mutex_lock (&cache_lock);
979   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
980   tree_depth        = (uint64_t) g_tree_height (cache_tree);
981   pthread_mutex_unlock (&cache_lock);
982
983 #define RRDD_STATS_SEND \
984   outbuf[sizeof (outbuf) - 1] = 0; \
985   status = swrite (fd, outbuf, strlen (outbuf)); \
986   if (status < 0) \
987   { \
988     status = errno; \
989     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
990     return (status); \
991   }
992
993   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
994   RRDD_STATS_SEND;
995
996   snprintf (outbuf, sizeof (outbuf),
997       "QueueLength: %"PRIu64"\n", copy_queue_length);
998   RRDD_STATS_SEND;
999
1000   snprintf (outbuf, sizeof (outbuf),
1001       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1002   RRDD_STATS_SEND;
1003
1004   snprintf (outbuf, sizeof (outbuf),
1005       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1006   RRDD_STATS_SEND;
1007
1008   snprintf (outbuf, sizeof (outbuf),
1009       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1010   RRDD_STATS_SEND;
1011
1012   snprintf (outbuf, sizeof (outbuf),
1013       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1014   RRDD_STATS_SEND;
1015
1016   snprintf (outbuf, sizeof (outbuf),
1017       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1018   RRDD_STATS_SEND;
1019
1020   snprintf (outbuf, sizeof (outbuf),
1021       "TreeDepth: %"PRIu64"\n", tree_depth);
1022   RRDD_STATS_SEND;
1023
1024   snprintf (outbuf, sizeof(outbuf),
1025       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1026   RRDD_STATS_SEND;
1027
1028   snprintf (outbuf, sizeof(outbuf),
1029       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1030   RRDD_STATS_SEND;
1031
1032   return (0);
1033 #undef RRDD_STATS_SEND
1034 } /* }}} int handle_request_stats */
1035
1036 static int handle_request_flush (int fd, /* {{{ */
1037     char *buffer, size_t buffer_size)
1038 {
1039   char *file;
1040   int status;
1041   char result[CMD_MAX];
1042
1043   status = buffer_get_field (&buffer, &buffer_size, &file);
1044   if (status != 0)
1045   {
1046     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1047   }
1048   else
1049   {
1050     pthread_mutex_lock(&stats_lock);
1051     stats_flush_received++;
1052     pthread_mutex_unlock(&stats_lock);
1053
1054     status = flush_file (file);
1055     if (status == 0)
1056       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1057     else if (status == ENOENT)
1058     {
1059       /* no file in our tree; see whether it exists at all */
1060       struct stat statbuf;
1061
1062       memset(&statbuf, 0, sizeof(statbuf));
1063       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1064         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1065       else
1066         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1067     }
1068     else if (status < 0)
1069       strncpy (result, "-1 Internal error.\n", sizeof (result));
1070     else
1071       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1072   }
1073   result[sizeof (result) - 1] = 0;
1074
1075   status = swrite (fd, result, strlen (result));
1076   if (status < 0)
1077   {
1078     status = errno;
1079     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1080     return (status);
1081   }
1082
1083   return (0);
1084 } /* }}} int handle_request_flush */
1085
1086 static int handle_request_flushall(int fd) /* {{{ */
1087 {
1088   int status;
1089   char answer[] ="0 Started flush.\n";
1090
1091   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1092
1093   pthread_mutex_lock(&cache_lock);
1094   flush_old_values(-1);
1095   pthread_mutex_unlock(&cache_lock);
1096
1097   status = swrite(fd, answer, strlen(answer));
1098   if (status < 0)
1099   {
1100     status = errno;
1101     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1102   }
1103
1104   return (status);
1105 } /* }}} static int handle_request_flushall */
1106
1107 static int handle_request_update (int fd, /* {{{ */
1108     char *buffer, size_t buffer_size)
1109 {
1110   char *file;
1111   int values_num = 0;
1112   int status;
1113
1114   time_t now;
1115
1116   cache_item_t *ci;
1117   char answer[CMD_MAX];
1118
1119 #define RRDD_UPDATE_SEND \
1120   answer[sizeof (answer) - 1] = 0; \
1121   status = swrite (fd, answer, strlen (answer)); \
1122   if (status < 0) \
1123   { \
1124     status = errno; \
1125     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1126     return (status); \
1127   }
1128
1129   now = time (NULL);
1130
1131   status = buffer_get_field (&buffer, &buffer_size, &file);
1132   if (status != 0)
1133   {
1134     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1135         sizeof (answer));
1136     RRDD_UPDATE_SEND;
1137     return (0);
1138   }
1139
1140   pthread_mutex_lock(&stats_lock);
1141   stats_updates_received++;
1142   pthread_mutex_unlock(&stats_lock);
1143
1144   pthread_mutex_lock (&cache_lock);
1145   ci = g_tree_lookup (cache_tree, file);
1146
1147   if (ci == NULL) /* {{{ */
1148   {
1149     struct stat statbuf;
1150
1151     /* don't hold the lock while we setup; stat(2) might block */
1152     pthread_mutex_unlock(&cache_lock);
1153
1154     memset (&statbuf, 0, sizeof (statbuf));
1155     status = stat (file, &statbuf);
1156     if (status != 0)
1157     {
1158       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1159
1160       status = errno;
1161       if (status == ENOENT)
1162         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1163       else
1164         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1165             status);
1166       RRDD_UPDATE_SEND;
1167       return (0);
1168     }
1169     if (!S_ISREG (statbuf.st_mode))
1170     {
1171       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1172       RRDD_UPDATE_SEND;
1173       return (0);
1174     }
1175     if (access(file, R_OK|W_OK) != 0)
1176     {
1177       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1178                 file, rrd_strerror(errno));
1179       RRDD_UPDATE_SEND;
1180       return (0);
1181     }
1182
1183     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1184     if (ci == NULL)
1185     {
1186       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1187
1188       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1189       RRDD_UPDATE_SEND;
1190       return (0);
1191     }
1192     memset (ci, 0, sizeof (cache_item_t));
1193
1194     ci->file = strdup (file);
1195     if (ci->file == NULL)
1196     {
1197       free (ci);
1198       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1199
1200       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1201       RRDD_UPDATE_SEND;
1202       return (0);
1203     }
1204
1205     _wipe_ci_values(ci, now);
1206     ci->flags = CI_FLAGS_IN_TREE;
1207
1208     pthread_mutex_lock(&cache_lock);
1209     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1210   } /* }}} */
1211   assert (ci != NULL);
1212
1213   while (buffer_size > 0)
1214   {
1215     char **temp;
1216     char *value;
1217
1218     status = buffer_get_field (&buffer, &buffer_size, &value);
1219     if (status != 0)
1220     {
1221       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1222       break;
1223     }
1224
1225     temp = (char **) realloc (ci->values,
1226         sizeof (char *) * (ci->values_num + 1));
1227     if (temp == NULL)
1228     {
1229       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1230       continue;
1231     }
1232     ci->values = temp;
1233
1234     ci->values[ci->values_num] = strdup (value);
1235     if (ci->values[ci->values_num] == NULL)
1236     {
1237       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1238       continue;
1239     }
1240     ci->values_num++;
1241
1242     values_num++;
1243   }
1244
1245   if (((now - ci->last_flush_time) >= config_write_interval)
1246       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1247       && (ci->values_num > 0))
1248   {
1249     enqueue_cache_item (ci, TAIL);
1250   }
1251
1252   pthread_mutex_unlock (&cache_lock);
1253
1254   if (values_num < 1)
1255   {
1256     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1257   }
1258   else
1259   {
1260     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1261         (values_num == 1) ? "" : "s");
1262   }
1263   RRDD_UPDATE_SEND;
1264   return (0);
1265 #undef RRDD_UPDATE_SEND
1266 } /* }}} int handle_request_update */
1267
1268 /* we came across a "WROTE" entry during journal replay.
1269  * throw away any values that we have accumulated for this file
1270  */
1271 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1272                                  const char *buffer,
1273                                  size_t buffer_size __attribute__((unused)))
1274 {
1275   int i;
1276   cache_item_t *ci;
1277   const char *file = buffer;
1278
1279   pthread_mutex_lock(&cache_lock);
1280
1281   ci = g_tree_lookup(cache_tree, file);
1282   if (ci == NULL)
1283   {
1284     pthread_mutex_unlock(&cache_lock);
1285     return (0);
1286   }
1287
1288   if (ci->values)
1289   {
1290     for (i=0; i < ci->values_num; i++)
1291       free(ci->values[i]);
1292
1293     free(ci->values);
1294   }
1295
1296   _wipe_ci_values(ci, time(NULL));
1297
1298   pthread_mutex_unlock(&cache_lock);
1299   return (0);
1300 } /* }}} int handle_request_wrote */
1301
1302 /* returns 1 if we have the required privilege level */
1303 static int has_privilege (socket_privilege priv, /* {{{ */
1304                           socket_privilege required, int fd)
1305 {
1306   int status;
1307   char error[CMD_MAX];
1308
1309   if (priv >= required)
1310     return 1;
1311
1312   sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1313   status = swrite(fd, error, strlen(error));
1314
1315   if (status < 0)
1316     return status;
1317   else
1318     return 0;
1319 } /* }}} static int has_privilege */
1320
1321 /* if fd < 0, we are in journal replay mode */
1322 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1323                            char *buffer, size_t buffer_size)
1324 {
1325   char *buffer_ptr;
1326   char *command;
1327   int status;
1328
1329   assert (buffer[buffer_size - 1] == '\0');
1330
1331   buffer_ptr = buffer;
1332   command = NULL;
1333   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1334   if (status != 0)
1335   {
1336     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1337     return (-1);
1338   }
1339
1340   if (strcasecmp (command, "update") == 0)
1341   {
1342     /* don't re-write updates in replay mode */
1343     if (fd >= 0)
1344       journal_write(command, buffer_ptr);
1345
1346     status = has_privilege(privilege, PRIV_HIGH, fd);
1347     if (status <= 0)
1348       return status;
1349
1350     return (handle_request_update (fd, buffer_ptr, buffer_size));
1351   }
1352   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1353   {
1354     /* this is only valid in replay mode */
1355     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1356   }
1357   else if (strcasecmp (command, "flush") == 0)
1358   {
1359     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1360   }
1361   else if (strcasecmp (command, "flushall") == 0)
1362   {
1363     status = has_privilege(privilege, PRIV_HIGH, fd);
1364     if (status <= 0)
1365       return status;
1366
1367     return (handle_request_flushall(fd));
1368   }
1369   else if (strcasecmp (command, "stats") == 0)
1370   {
1371     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1372   }
1373   else if (strcasecmp (command, "help") == 0)
1374   {
1375     return (handle_request_help (fd, buffer_ptr, buffer_size));
1376   }
1377   else
1378   {
1379     char result[CMD_MAX];
1380
1381     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1382     result[sizeof (result) - 1] = 0;
1383
1384     status = swrite (fd, result, strlen (result));
1385     if (status < 0)
1386     {
1387       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1388       return (-1);
1389     }
1390   }
1391
1392   return (0);
1393 } /* }}} int handle_request */
1394
1395 /* MUST NOT hold journal_lock before calling this */
1396 static void journal_rotate(void) /* {{{ */
1397 {
1398   FILE *old_fh = NULL;
1399
1400   if (journal_cur == NULL || journal_old == NULL)
1401     return;
1402
1403   pthread_mutex_lock(&journal_lock);
1404
1405   /* we rotate this way (rename before close) so that the we can release
1406    * the journal lock as fast as possible.  Journal writes to the new
1407    * journal can proceed immediately after the new file is opened.  The
1408    * fclose can then block without affecting new updates.
1409    */
1410   if (journal_fh != NULL)
1411   {
1412     old_fh = journal_fh;
1413     rename(journal_cur, journal_old);
1414     ++stats_journal_rotate;
1415   }
1416
1417   journal_fh = fopen(journal_cur, "a");
1418   pthread_mutex_unlock(&journal_lock);
1419
1420   if (old_fh != NULL)
1421     fclose(old_fh);
1422
1423   if (journal_fh == NULL)
1424   {
1425     RRDD_LOG(LOG_CRIT,
1426              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1427              journal_cur, rrd_strerror(errno));
1428
1429     RRDD_LOG(LOG_ERR,
1430              "JOURNALING DISABLED: All values will be flushed at shutdown");
1431     config_flush_at_shutdown = 1;
1432   }
1433
1434 } /* }}} static void journal_rotate */
1435
1436 static void journal_done(void) /* {{{ */
1437 {
1438   if (journal_cur == NULL)
1439     return;
1440
1441   pthread_mutex_lock(&journal_lock);
1442   if (journal_fh != NULL)
1443   {
1444     fclose(journal_fh);
1445     journal_fh = NULL;
1446   }
1447
1448   if (config_flush_at_shutdown)
1449   {
1450     RRDD_LOG(LOG_INFO, "removing journals");
1451     unlink(journal_old);
1452     unlink(journal_cur);
1453   }
1454   else
1455   {
1456     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1457              "journals will be used at next startup");
1458   }
1459
1460   pthread_mutex_unlock(&journal_lock);
1461
1462 } /* }}} static void journal_done */
1463
1464 static int journal_write(char *cmd, char *args) /* {{{ */
1465 {
1466   int chars;
1467
1468   if (journal_fh == NULL)
1469     return 0;
1470
1471   pthread_mutex_lock(&journal_lock);
1472   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1473   pthread_mutex_unlock(&journal_lock);
1474
1475   if (chars > 0)
1476   {
1477     pthread_mutex_lock(&stats_lock);
1478     stats_journal_bytes += chars;
1479     pthread_mutex_unlock(&stats_lock);
1480   }
1481
1482   return chars;
1483 } /* }}} static int journal_write */
1484
1485 static int journal_replay (const char *file) /* {{{ */
1486 {
1487   FILE *fh;
1488   int entry_cnt = 0;
1489   int fail_cnt = 0;
1490   uint64_t line = 0;
1491   char entry[CMD_MAX];
1492
1493   if (file == NULL) return 0;
1494
1495   fh = fopen(file, "r");
1496   if (fh == NULL)
1497   {
1498     if (errno != ENOENT)
1499       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1500                file, rrd_strerror(errno));
1501     return 0;
1502   }
1503   else
1504     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1505
1506   while(!feof(fh))
1507   {
1508     size_t entry_len;
1509
1510     ++line;
1511     fgets(entry, sizeof(entry), fh);
1512     entry_len = strlen(entry);
1513
1514     /* check \n termination in case journal writing crashed mid-line */
1515     if (entry_len == 0)
1516       continue;
1517     else if (entry[entry_len - 1] != '\n')
1518     {
1519       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1520       ++fail_cnt;
1521       continue;
1522     }
1523
1524     entry[entry_len - 1] = '\0';
1525
1526     if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1527       ++entry_cnt;
1528     else
1529       ++fail_cnt;
1530   }
1531
1532   fclose(fh);
1533
1534   if (entry_cnt > 0)
1535   {
1536     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1537              entry_cnt, fail_cnt);
1538     return 1;
1539   }
1540   else
1541     return 0;
1542
1543 } /* }}} static int journal_replay */
1544
1545 static void *connection_thread_main (void *args) /* {{{ */
1546 {
1547   pthread_t self;
1548   listen_socket_t *sock;
1549   int i;
1550   int fd;
1551
1552   sock = (listen_socket_t *) args;
1553   fd = sock->fd;
1554
1555   pthread_mutex_lock (&connection_threads_lock);
1556   {
1557     pthread_t *temp;
1558
1559     temp = (pthread_t *) realloc (connection_threads,
1560         sizeof (pthread_t) * (connection_threads_num + 1));
1561     if (temp == NULL)
1562     {
1563       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1564     }
1565     else
1566     {
1567       connection_threads = temp;
1568       connection_threads[connection_threads_num] = pthread_self ();
1569       connection_threads_num++;
1570     }
1571   }
1572   pthread_mutex_unlock (&connection_threads_lock);
1573
1574   while (do_shutdown == 0)
1575   {
1576     char buffer[CMD_MAX];
1577
1578     struct pollfd pollfd;
1579     int status;
1580
1581     pollfd.fd = fd;
1582     pollfd.events = POLLIN | POLLPRI;
1583     pollfd.revents = 0;
1584
1585     status = poll (&pollfd, 1, /* timeout = */ 500);
1586     if (do_shutdown)
1587       break;
1588     else if (status == 0) /* timeout */
1589       continue;
1590     else if (status < 0) /* error */
1591     {
1592       status = errno;
1593       if (status == EINTR)
1594         continue;
1595       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1596       continue;
1597     }
1598
1599     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1600     {
1601       close (fd);
1602       break;
1603     }
1604     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1605     {
1606       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1607           "poll(2) returned something unexpected: %#04hx",
1608           pollfd.revents);
1609       close (fd);
1610       break;
1611     }
1612
1613     status = (int) sread (fd, buffer, sizeof (buffer));
1614     if (status <= 0)
1615     {
1616       close (fd);
1617
1618       if (status < 0)
1619         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1620
1621       break;
1622     }
1623
1624     status = handle_request (fd, sock->privilege, buffer, status);
1625     if (status != 0)
1626       break;
1627   }
1628
1629   close(fd);
1630   free(args);
1631
1632   self = pthread_self ();
1633   /* Remove this thread from the connection threads list */
1634   pthread_mutex_lock (&connection_threads_lock);
1635   /* Find out own index in the array */
1636   for (i = 0; i < connection_threads_num; i++)
1637     if (pthread_equal (connection_threads[i], self) != 0)
1638       break;
1639   assert (i < connection_threads_num);
1640
1641   /* Move the trailing threads forward. */
1642   if (i < (connection_threads_num - 1))
1643   {
1644     memmove (connection_threads + i,
1645         connection_threads + i + 1,
1646         sizeof (pthread_t) * (connection_threads_num - i - 1));
1647   }
1648
1649   connection_threads_num--;
1650   pthread_mutex_unlock (&connection_threads_lock);
1651
1652   return (NULL);
1653 } /* }}} void *connection_thread_main */
1654
1655 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1656 {
1657   int fd;
1658   struct sockaddr_un sa;
1659   listen_socket_t *temp;
1660   int status;
1661   const char *path;
1662
1663   path = sock->addr;
1664   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1665     path += strlen("unix:");
1666
1667   temp = (listen_socket_t *) realloc (listen_fds,
1668       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1669   if (temp == NULL)
1670   {
1671     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1672     return (-1);
1673   }
1674   listen_fds = temp;
1675   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1676
1677   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1678   if (fd < 0)
1679   {
1680     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1681     return (-1);
1682   }
1683
1684   memset (&sa, 0, sizeof (sa));
1685   sa.sun_family = AF_UNIX;
1686   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1687
1688   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1689   if (status != 0)
1690   {
1691     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1692     close (fd);
1693     unlink (path);
1694     return (-1);
1695   }
1696
1697   status = listen (fd, /* backlog = */ 10);
1698   if (status != 0)
1699   {
1700     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1701     close (fd);
1702     unlink (path);
1703     return (-1);
1704   }
1705
1706   listen_fds[listen_fds_num].fd = fd;
1707   listen_fds[listen_fds_num].family = PF_UNIX;
1708   strncpy(listen_fds[listen_fds_num].addr, path,
1709           sizeof (listen_fds[listen_fds_num].addr) - 1);
1710   listen_fds_num++;
1711
1712   return (0);
1713 } /* }}} int open_listen_socket_unix */
1714
1715 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1716 {
1717   struct addrinfo ai_hints;
1718   struct addrinfo *ai_res;
1719   struct addrinfo *ai_ptr;
1720   char addr_copy[NI_MAXHOST];
1721   char *addr;
1722   char *port;
1723   int status;
1724
1725   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1726   addr_copy[sizeof (addr_copy) - 1] = 0;
1727   addr = addr_copy;
1728
1729   memset (&ai_hints, 0, sizeof (ai_hints));
1730   ai_hints.ai_flags = 0;
1731 #ifdef AI_ADDRCONFIG
1732   ai_hints.ai_flags |= AI_ADDRCONFIG;
1733 #endif
1734   ai_hints.ai_family = AF_UNSPEC;
1735   ai_hints.ai_socktype = SOCK_STREAM;
1736
1737   port = NULL;
1738   if (*addr == '[') /* IPv6+port format */
1739   {
1740     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1741     addr++;
1742
1743     port = strchr (addr, ']');
1744     if (port == NULL)
1745     {
1746       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1747           sock->addr);
1748       return (-1);
1749     }
1750     *port = 0;
1751     port++;
1752
1753     if (*port == ':')
1754       port++;
1755     else if (*port == 0)
1756       port = NULL;
1757     else
1758     {
1759       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1760           port);
1761       return (-1);
1762     }
1763   } /* if (*addr = ']') */
1764   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1765   {
1766     port = rindex(addr, ':');
1767     if (port != NULL)
1768     {
1769       *port = 0;
1770       port++;
1771     }
1772   }
1773   ai_res = NULL;
1774   status = getaddrinfo (addr,
1775                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1776                         &ai_hints, &ai_res);
1777   if (status != 0)
1778   {
1779     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1780         "%s", addr, gai_strerror (status));
1781     return (-1);
1782   }
1783
1784   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1785   {
1786     int fd;
1787     listen_socket_t *temp;
1788     int one = 1;
1789
1790     temp = (listen_socket_t *) realloc (listen_fds,
1791         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1792     if (temp == NULL)
1793     {
1794       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1795       continue;
1796     }
1797     listen_fds = temp;
1798     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1799
1800     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1801     if (fd < 0)
1802     {
1803       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1804       continue;
1805     }
1806
1807     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1808
1809     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1810     if (status != 0)
1811     {
1812       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1813       close (fd);
1814       continue;
1815     }
1816
1817     status = listen (fd, /* backlog = */ 10);
1818     if (status != 0)
1819     {
1820       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1821       close (fd);
1822       return (-1);
1823     }
1824
1825     listen_fds[listen_fds_num].fd = fd;
1826     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1827     listen_fds_num++;
1828   } /* for (ai_ptr) */
1829
1830   return (0);
1831 } /* }}} static int open_listen_socket_network */
1832
1833 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1834 {
1835   assert(sock != NULL);
1836   assert(sock->addr != NULL);
1837
1838   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1839       || sock->addr[0] == '/')
1840     return (open_listen_socket_unix(sock));
1841   else
1842     return (open_listen_socket_network(sock));
1843 } /* }}} int open_listen_socket */
1844
1845 static int close_listen_sockets (void) /* {{{ */
1846 {
1847   size_t i;
1848
1849   for (i = 0; i < listen_fds_num; i++)
1850   {
1851     close (listen_fds[i].fd);
1852
1853     if (listen_fds[i].family == PF_UNIX)
1854       unlink(listen_fds[i].addr);
1855   }
1856
1857   free (listen_fds);
1858   listen_fds = NULL;
1859   listen_fds_num = 0;
1860
1861   return (0);
1862 } /* }}} int close_listen_sockets */
1863
1864 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1865 {
1866   struct pollfd *pollfds;
1867   int pollfds_num;
1868   int status;
1869   int i;
1870
1871   for (i = 0; i < config_listen_address_list_len; i++)
1872     open_listen_socket (config_listen_address_list[i]);
1873
1874   if (config_listen_address_list_len < 1)
1875   {
1876     listen_socket_t sock;
1877     memset(&sock, 0, sizeof(sock));
1878     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1879     open_listen_socket (&sock);
1880   }
1881
1882   if (listen_fds_num < 1)
1883   {
1884     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1885         "could be opened. Sorry.");
1886     return (NULL);
1887   }
1888
1889   pollfds_num = listen_fds_num;
1890   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1891   if (pollfds == NULL)
1892   {
1893     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1894     return (NULL);
1895   }
1896   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1897
1898   RRDD_LOG(LOG_INFO, "listening for connections");
1899
1900   while (do_shutdown == 0)
1901   {
1902     assert (pollfds_num == ((int) listen_fds_num));
1903     for (i = 0; i < pollfds_num; i++)
1904     {
1905       pollfds[i].fd = listen_fds[i].fd;
1906       pollfds[i].events = POLLIN | POLLPRI;
1907       pollfds[i].revents = 0;
1908     }
1909
1910     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1911     if (do_shutdown)
1912       break;
1913     else if (status == 0) /* timeout */
1914       continue;
1915     else if (status < 0) /* error */
1916     {
1917       status = errno;
1918       if (status != EINTR)
1919       {
1920         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1921       }
1922       continue;
1923     }
1924
1925     for (i = 0; i < pollfds_num; i++)
1926     {
1927       listen_socket_t *client_sock;
1928       struct sockaddr_storage client_sa;
1929       socklen_t client_sa_size;
1930       pthread_t tid;
1931       pthread_attr_t attr;
1932
1933       if (pollfds[i].revents == 0)
1934         continue;
1935
1936       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1937       {
1938         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1939             "poll(2) returned something unexpected for listen FD #%i.",
1940             pollfds[i].fd);
1941         continue;
1942       }
1943
1944       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1945       if (client_sock == NULL)
1946       {
1947         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1948         continue;
1949       }
1950       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1951
1952       client_sa_size = sizeof (client_sa);
1953       client_sock->fd = accept (pollfds[i].fd,
1954           (struct sockaddr *) &client_sa, &client_sa_size);
1955       if (client_sock->fd < 0)
1956       {
1957         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1958         free(client_sock);
1959         continue;
1960       }
1961
1962       pthread_attr_init (&attr);
1963       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1964
1965       status = pthread_create (&tid, &attr, connection_thread_main,
1966                                client_sock);
1967       if (status != 0)
1968       {
1969         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1970         close (client_sock->fd);
1971         free (client_sock);
1972         continue;
1973       }
1974     } /* for (pollfds_num) */
1975   } /* while (do_shutdown == 0) */
1976
1977   RRDD_LOG(LOG_INFO, "starting shutdown");
1978
1979   close_listen_sockets ();
1980
1981   pthread_mutex_lock (&connection_threads_lock);
1982   while (connection_threads_num > 0)
1983   {
1984     pthread_t wait_for;
1985
1986     wait_for = connection_threads[0];
1987
1988     pthread_mutex_unlock (&connection_threads_lock);
1989     pthread_join (wait_for, /* retval = */ NULL);
1990     pthread_mutex_lock (&connection_threads_lock);
1991   }
1992   pthread_mutex_unlock (&connection_threads_lock);
1993
1994   return (NULL);
1995 } /* }}} void *listen_thread_main */
1996
1997 static int daemonize (void) /* {{{ */
1998 {
1999   int status;
2000   int fd;
2001
2002   fd = open_pidfile();
2003   if (fd < 0) return fd;
2004
2005   if (!stay_foreground)
2006   {
2007     pid_t child;
2008     char *base_dir;
2009
2010     child = fork ();
2011     if (child < 0)
2012     {
2013       fprintf (stderr, "daemonize: fork(2) failed.\n");
2014       return (-1);
2015     }
2016     else if (child > 0)
2017     {
2018       return (1);
2019     }
2020
2021     /* Change into the /tmp directory. */
2022     base_dir = (config_base_dir != NULL)
2023       ? config_base_dir
2024       : "/tmp";
2025     status = chdir (base_dir);
2026     if (status != 0)
2027     {
2028       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2029       return (-1);
2030     }
2031
2032     /* Become session leader */
2033     setsid ();
2034
2035     /* Open the first three file descriptors to /dev/null */
2036     close (2);
2037     close (1);
2038     close (0);
2039
2040     open ("/dev/null", O_RDWR);
2041     dup (0);
2042     dup (0);
2043   } /* if (!stay_foreground) */
2044
2045   install_signal_handlers();
2046
2047   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2048   RRDD_LOG(LOG_INFO, "starting up");
2049
2050   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2051   if (cache_tree == NULL)
2052   {
2053     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2054     return (-1);
2055   }
2056
2057   status = write_pidfile (fd);
2058   return status;
2059 } /* }}} int daemonize */
2060
2061 static int cleanup (void) /* {{{ */
2062 {
2063   do_shutdown++;
2064
2065   pthread_cond_signal (&cache_cond);
2066   pthread_join (queue_thread, /* return = */ NULL);
2067
2068   remove_pidfile ();
2069
2070   RRDD_LOG(LOG_INFO, "goodbye");
2071   closelog ();
2072
2073   return (0);
2074 } /* }}} int cleanup */
2075
2076 static int read_options (int argc, char **argv) /* {{{ */
2077 {
2078   int option;
2079   int status = 0;
2080
2081   while ((option = getopt(argc, argv, "gl:L:f:w:b:z:p:j:h?F")) != -1)
2082   {
2083     switch (option)
2084     {
2085       case 'g':
2086         stay_foreground=1;
2087         break;
2088
2089       case 'L':
2090       case 'l':
2091       {
2092         listen_socket_t **temp;
2093         listen_socket_t *new;
2094
2095         new = malloc(sizeof(listen_socket_t));
2096         if (new == NULL)
2097         {
2098           fprintf(stderr, "read_options: malloc failed.\n");
2099           return(2);
2100         }
2101         memset(new, 0, sizeof(listen_socket_t));
2102
2103         temp = (listen_socket_t **) realloc (config_listen_address_list,
2104             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2105         if (temp == NULL)
2106         {
2107           fprintf (stderr, "read_options: realloc failed.\n");
2108           return (2);
2109         }
2110         config_listen_address_list = temp;
2111
2112         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2113         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2114
2115         temp[config_listen_address_list_len] = new;
2116         config_listen_address_list_len++;
2117       }
2118       break;
2119
2120       case 'f':
2121       {
2122         int temp;
2123
2124         temp = atoi (optarg);
2125         if (temp > 0)
2126           config_flush_interval = temp;
2127         else
2128         {
2129           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2130           status = 3;
2131         }
2132       }
2133       break;
2134
2135       case 'w':
2136       {
2137         int temp;
2138
2139         temp = atoi (optarg);
2140         if (temp > 0)
2141           config_write_interval = temp;
2142         else
2143         {
2144           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2145           status = 2;
2146         }
2147       }
2148       break;
2149
2150       case 'z':
2151       {
2152         int temp;
2153
2154         temp = atoi(optarg);
2155         if (temp > 0)
2156           config_write_jitter = temp;
2157         else
2158         {
2159           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2160           status = 2;
2161         }
2162
2163         break;
2164       }
2165
2166       case 'b':
2167       {
2168         size_t len;
2169
2170         if (config_base_dir != NULL)
2171           free (config_base_dir);
2172         config_base_dir = strdup (optarg);
2173         if (config_base_dir == NULL)
2174         {
2175           fprintf (stderr, "read_options: strdup failed.\n");
2176           return (3);
2177         }
2178
2179         len = strlen (config_base_dir);
2180         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2181         {
2182           config_base_dir[len - 1] = 0;
2183           len--;
2184         }
2185
2186         if (len < 1)
2187         {
2188           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2189           return (4);
2190         }
2191       }
2192       break;
2193
2194       case 'p':
2195       {
2196         if (config_pid_file != NULL)
2197           free (config_pid_file);
2198         config_pid_file = strdup (optarg);
2199         if (config_pid_file == NULL)
2200         {
2201           fprintf (stderr, "read_options: strdup failed.\n");
2202           return (3);
2203         }
2204       }
2205       break;
2206
2207       case 'F':
2208         config_flush_at_shutdown = 1;
2209         break;
2210
2211       case 'j':
2212       {
2213         struct stat statbuf;
2214         const char *dir = optarg;
2215
2216         status = stat(dir, &statbuf);
2217         if (status != 0)
2218         {
2219           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2220           return 6;
2221         }
2222
2223         if (!S_ISDIR(statbuf.st_mode)
2224             || access(dir, R_OK|W_OK|X_OK) != 0)
2225         {
2226           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2227                   errno ? rrd_strerror(errno) : "");
2228           return 6;
2229         }
2230
2231         journal_cur = malloc(PATH_MAX + 1);
2232         journal_old = malloc(PATH_MAX + 1);
2233         if (journal_cur == NULL || journal_old == NULL)
2234         {
2235           fprintf(stderr, "malloc failure for journal files\n");
2236           return 6;
2237         }
2238         else 
2239         {
2240           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2241           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2242         }
2243       }
2244       break;
2245
2246       case 'h':
2247       case '?':
2248         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2249             "\n"
2250             "Usage: rrdcached [options]\n"
2251             "\n"
2252             "Valid options are:\n"
2253             "  -l <address>  Socket address to listen to.\n"
2254             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2255             "  -w <seconds>  Interval in which to write data.\n"
2256             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2257             "  -f <seconds>  Interval in which to flush dead data.\n"
2258             "  -p <file>     Location of the PID-file.\n"
2259             "  -b <dir>      Base directory to change to.\n"
2260             "  -g            Do not fork and run in the foreground.\n"
2261             "  -j <dir>      Directory in which to create the journal files.\n"
2262             "  -F            Always flush all updates at shutdown\n"
2263             "\n"
2264             "For more information and a detailed description of all options "
2265             "please refer\n"
2266             "to the rrdcached(1) manual page.\n",
2267             VERSION);
2268         status = -1;
2269         break;
2270     } /* switch (option) */
2271   } /* while (getopt) */
2272
2273   /* advise the user when values are not sane */
2274   if (config_flush_interval < 2 * config_write_interval)
2275     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2276             " 2x write interval (-w) !\n");
2277   if (config_write_jitter > config_write_interval)
2278     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2279             " write interval (-w) !\n");
2280
2281   if (journal_cur == NULL)
2282     config_flush_at_shutdown = 1;
2283
2284   return (status);
2285 } /* }}} int read_options */
2286
2287 int main (int argc, char **argv)
2288 {
2289   int status;
2290
2291   status = read_options (argc, argv);
2292   if (status != 0)
2293   {
2294     if (status < 0)
2295       status = 0;
2296     return (status);
2297   }
2298
2299   status = daemonize ();
2300   if (status == 1)
2301   {
2302     struct sigaction sigchld;
2303
2304     memset (&sigchld, 0, sizeof (sigchld));
2305     sigchld.sa_handler = SIG_IGN;
2306     sigaction (SIGCHLD, &sigchld, NULL);
2307
2308     return (0);
2309   }
2310   else if (status != 0)
2311   {
2312     fprintf (stderr, "daemonize failed, exiting.\n");
2313     return (1);
2314   }
2315
2316   if (journal_cur != NULL)
2317   {
2318     int had_journal = 0;
2319
2320     pthread_mutex_lock(&journal_lock);
2321
2322     RRDD_LOG(LOG_INFO, "checking for journal files");
2323
2324     had_journal += journal_replay(journal_old);
2325     had_journal += journal_replay(journal_cur);
2326
2327     if (had_journal)
2328       flush_old_values(-1);
2329
2330     pthread_mutex_unlock(&journal_lock);
2331     journal_rotate();
2332
2333     RRDD_LOG(LOG_INFO, "journal processing complete");
2334   }
2335
2336   /* start the queue thread */
2337   memset (&queue_thread, 0, sizeof (queue_thread));
2338   status = pthread_create (&queue_thread,
2339                            NULL, /* attr */
2340                            queue_thread_main,
2341                            NULL); /* args */
2342   if (status != 0)
2343   {
2344     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2345     cleanup();
2346     return (1);
2347   }
2348
2349   listen_thread_main (NULL);
2350   cleanup ();
2351
2352   return (0);
2353 } /* int main */
2354
2355 /*
2356  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2357  */