9c8847dd7e4a17ce157853a02ff2440d5354dce0
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE  (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143   int flags;
144   pthread_cond_t  flushed;
145   cache_item_t *prev;
146   cache_item_t *next;
147 };
148
149 struct callback_flush_data_s
150 {
151   time_t now;
152   time_t abs_timeout;
153   char **keys;
154   size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
157
158 enum queue_side_e
159 {
160   HEAD,
161   TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
164
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
168
169 /*
170  * Variables
171  */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
174
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
177
178 static int do_shutdown = 0;
179
180 static pthread_t queue_thread;
181
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
185
186 /* Cache stuff */
187 static GTree          *cache_tree = NULL;
188 static cache_item_t   *cache_queue_head = NULL;
189 static cache_item_t   *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
192
193 static int config_write_interval = 300;
194 static int config_write_jitter   = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
201
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
204
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
213
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
222
223 /* 
224  * Functions
225  */
226 static void sig_common (const char *sig) /* {{{ */
227 {
228   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229   do_shutdown++;
230   pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
232
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234 {
235   sig_common("INT");
236 } /* }}} void sig_int_handler */
237
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239 {
240   sig_common("TERM");
241 } /* }}} void sig_term_handler */
242
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   config_flush_at_shutdown = 1;
246   sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
248
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250 {
251   config_flush_at_shutdown = 0;
252   sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
254
255 static void install_signal_handlers(void) /* {{{ */
256 {
257   /* These structures are static, because `sigaction' behaves weird if the are
258    * overwritten.. */
259   static struct sigaction sa_int;
260   static struct sigaction sa_term;
261   static struct sigaction sa_pipe;
262   static struct sigaction sa_usr1;
263   static struct sigaction sa_usr2;
264
265   /* Install signal handlers */
266   memset (&sa_int, 0, sizeof (sa_int));
267   sa_int.sa_handler = sig_int_handler;
268   sigaction (SIGINT, &sa_int, NULL);
269
270   memset (&sa_term, 0, sizeof (sa_term));
271   sa_term.sa_handler = sig_term_handler;
272   sigaction (SIGTERM, &sa_term, NULL);
273
274   memset (&sa_pipe, 0, sizeof (sa_pipe));
275   sa_pipe.sa_handler = SIG_IGN;
276   sigaction (SIGPIPE, &sa_pipe, NULL);
277
278   memset (&sa_pipe, 0, sizeof (sa_usr1));
279   sa_usr1.sa_handler = sig_usr1_handler;
280   sigaction (SIGUSR1, &sa_usr1, NULL);
281
282   memset (&sa_usr2, 0, sizeof (sa_usr2));
283   sa_usr2.sa_handler = sig_usr2_handler;
284   sigaction (SIGUSR2, &sa_usr2, NULL);
285
286 } /* }}} void install_signal_handlers */
287
288 static int open_pidfile(void) /* {{{ */
289 {
290   int fd;
291   char *file;
292
293   file = (config_pid_file != NULL)
294     ? config_pid_file
295     : LOCALSTATEDIR "/run/rrdcached.pid";
296
297   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298   if (fd < 0)
299     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300             file, rrd_strerror(errno));
301
302   return(fd);
303 } /* }}} static int open_pidfile */
304
305 static int write_pidfile (int fd) /* {{{ */
306 {
307   pid_t pid;
308   FILE *fh;
309
310   pid = getpid ();
311
312   fh = fdopen (fd, "w");
313   if (fh == NULL)
314   {
315     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316     close(fd);
317     return (-1);
318   }
319
320   fprintf (fh, "%i\n", (int) pid);
321   fclose (fh);
322
323   return (0);
324 } /* }}} int write_pidfile */
325
326 static int remove_pidfile (void) /* {{{ */
327 {
328   char *file;
329   int status;
330
331   file = (config_pid_file != NULL)
332     ? config_pid_file
333     : LOCALSTATEDIR "/run/rrdcached.pid";
334
335   status = unlink (file);
336   if (status == 0)
337     return (0);
338   return (errno);
339 } /* }}} int remove_pidfile */
340
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342 {
343   char *eol;
344
345   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346                sock->next_read - sock->next_cmd);
347
348   if (eol == NULL)
349   {
350     /* no commands left, move remainder back to front of rbuf */
351     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352             sock->next_read - sock->next_cmd);
353     sock->next_read -= sock->next_cmd;
354     sock->next_cmd = 0;
355     *len = 0;
356     return NULL;
357   }
358   else
359   {
360     char *cmd = sock->rbuf + sock->next_cmd;
361     *eol = '\0';
362
363     sock->next_cmd = eol - sock->rbuf + 1;
364
365     if (eol > sock->rbuf && *(eol-1) == '\r')
366       *(--eol) = '\0'; /* handle "\r\n" EOL */
367
368     *len = eol - cmd;
369
370     return cmd;
371   }
372
373   /* NOTREACHED */
374   assert(1==0);
375 }
376
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379 {
380   char *new_buf;
381
382   assert(sock != NULL);
383
384   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385   if (new_buf == NULL)
386   {
387     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388     return -1;
389   }
390
391   strncpy(new_buf + sock->wbuf_len, str, len + 1);
392
393   sock->wbuf = new_buf;
394   sock->wbuf_len += len;
395
396   return 0;
397 } /* }}} static int add_to_wbuf */
398
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401 {
402   va_list argp;
403   char buffer[CMD_MAX];
404   int len;
405
406   if (sock == NULL) return 0; /* journal replay mode */
407   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
408
409   va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413   len = vsprintf(buffer, fmt, argp);
414 #endif
415   va_end(argp);
416   if (len < 0)
417   {
418     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419     return -1;
420   }
421
422   return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
424
425 static int count_lines(char *str) /* {{{ */
426 {
427   int lines = 0;
428
429   if (str != NULL)
430   {
431     while ((str = strchr(str, '\n')) != NULL)
432     {
433       ++lines;
434       ++str;
435     }
436   }
437
438   return lines;
439 } /* }}} static int count_lines */
440
441 /* send the response back to the user.
442  * returns 0 on success, -1 on error
443  * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445                           char *fmt, ...) /* {{{ */
446 {
447   va_list argp;
448   char buffer[CMD_MAX];
449   int lines;
450   ssize_t wrote;
451   int rclen, len;
452
453   if (sock == NULL) return rc;  /* journal replay mode */
454
455   if (sock->batch_mode)
456   {
457     if (rc == RESP_OK)
458       return rc; /* no response on success during BATCH */
459     lines = sock->batch_cmd;
460   }
461   else if (rc == RESP_OK)
462     lines = count_lines(sock->wbuf);
463   else
464     lines = -1;
465
466   rclen = sprintf(buffer, "%d ", lines);
467   va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471   len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473   va_end(argp);
474   if (len < 0)
475     return -1;
476
477   len += rclen;
478
479   /* append the result to the wbuf, don't write to the user */
480   if (sock->batch_mode)
481     return add_to_wbuf(sock, buffer, len);
482
483   /* first write must be complete */
484   if (len != write(sock->fd, buffer, len))
485   {
486     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487     return -1;
488   }
489
490   if (sock->wbuf != NULL)
491   {
492     wrote = 0;
493     while (wrote < sock->wbuf_len)
494     {
495       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496       if (wb <= 0)
497       {
498         RRDD_LOG(LOG_INFO, "send_response: could not write results");
499         return -1;
500       }
501       wrote += wb;
502     }
503   }
504
505   free(sock->wbuf); sock->wbuf = NULL;
506   sock->wbuf_len = 0;
507
508   return 0;
509 } /* }}} */
510
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
512 {
513   ci->values = NULL;
514   ci->values_num = 0;
515
516   ci->last_flush_time = when;
517   if (config_write_jitter > 0)
518     ci->last_flush_time += (random() % config_write_jitter);
519 }
520
521 /* remove_from_queue
522  * remove a "cache_item_t" item from the queue.
523  * must hold 'cache_lock' when calling this
524  */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526 {
527   if (ci == NULL) return;
528
529   if (ci->prev == NULL)
530     cache_queue_head = ci->next; /* reset head */
531   else
532     ci->prev->next = ci->next;
533
534   if (ci->next == NULL)
535     cache_queue_tail = ci->prev; /* reset the tail */
536   else
537     ci->next->prev = ci->prev;
538
539   ci->next = ci->prev = NULL;
540   ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
542
543 /* remove an entry from the tree and free all its resources.
544  * must hold 'cache lock' while calling this.
545  * returns 0 on success, otherwise errno */
546 static int forget_file(const char *file)
547 {
548   cache_item_t *ci;
549
550   ci = g_tree_lookup(cache_tree, file);
551   if (ci == NULL)
552     return ENOENT;
553
554   g_tree_remove (cache_tree, file);
555   remove_from_queue(ci);
556
557   for (int i=0; i < ci->values_num; i++)
558     free(ci->values[i]);
559
560   free (ci->values);
561   free (ci->file);
562
563   /* in case anyone is waiting */
564   pthread_cond_broadcast(&ci->flushed);
565
566   free (ci);
567
568   return 0;
569 } /* }}} static int forget_file */
570
571 /*
572  * enqueue_cache_item:
573  * `cache_lock' must be acquired before calling this function!
574  */
575 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
576     queue_side_t side)
577 {
578   if (ci == NULL)
579     return (-1);
580
581   if (ci->values_num == 0)
582     return (0);
583
584   if (side == HEAD)
585   {
586     if (cache_queue_head == ci)
587       return 0;
588
589     /* remove from the double linked list */
590     if (ci->flags & CI_FLAGS_IN_QUEUE)
591       remove_from_queue(ci);
592
593     ci->prev = NULL;
594     ci->next = cache_queue_head;
595     if (ci->next != NULL)
596       ci->next->prev = ci;
597     cache_queue_head = ci;
598
599     if (cache_queue_tail == NULL)
600       cache_queue_tail = cache_queue_head;
601   }
602   else /* (side == TAIL) */
603   {
604     /* We don't move values back in the list.. */
605     if (ci->flags & CI_FLAGS_IN_QUEUE)
606       return (0);
607
608     assert (ci->next == NULL);
609     assert (ci->prev == NULL);
610
611     ci->prev = cache_queue_tail;
612
613     if (cache_queue_tail == NULL)
614       cache_queue_head = ci;
615     else
616       cache_queue_tail->next = ci;
617
618     cache_queue_tail = ci;
619   }
620
621   ci->flags |= CI_FLAGS_IN_QUEUE;
622
623   pthread_cond_broadcast(&cache_cond);
624   pthread_mutex_lock (&stats_lock);
625   stats_queue_length++;
626   pthread_mutex_unlock (&stats_lock);
627
628   return (0);
629 } /* }}} int enqueue_cache_item */
630
631 /*
632  * tree_callback_flush:
633  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
634  * while this is in progress.
635  */
636 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
637     gpointer data)
638 {
639   cache_item_t *ci;
640   callback_flush_data_t *cfd;
641
642   ci = (cache_item_t *) value;
643   cfd = (callback_flush_data_t *) data;
644
645   if ((ci->last_flush_time <= cfd->abs_timeout)
646       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
647       && (ci->values_num > 0))
648   {
649     enqueue_cache_item (ci, TAIL);
650   }
651   else if ((do_shutdown != 0)
652       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
653       && (ci->values_num > 0))
654   {
655     enqueue_cache_item (ci, TAIL);
656   }
657   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
658       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
659       && (ci->values_num <= 0))
660   {
661     char **temp;
662
663     temp = (char **) realloc (cfd->keys,
664         sizeof (char *) * (cfd->keys_num + 1));
665     if (temp == NULL)
666     {
667       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
668       return (FALSE);
669     }
670     cfd->keys = temp;
671     /* Make really sure this points to the _same_ place */
672     assert ((char *) key == ci->file);
673     cfd->keys[cfd->keys_num] = (char *) key;
674     cfd->keys_num++;
675   }
676
677   return (FALSE);
678 } /* }}} gboolean tree_callback_flush */
679
680 static int flush_old_values (int max_age)
681 {
682   callback_flush_data_t cfd;
683   size_t k;
684
685   memset (&cfd, 0, sizeof (cfd));
686   /* Pass the current time as user data so that we don't need to call
687    * `time' for each node. */
688   cfd.now = time (NULL);
689   cfd.keys = NULL;
690   cfd.keys_num = 0;
691
692   if (max_age > 0)
693     cfd.abs_timeout = cfd.now - max_age;
694   else
695     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
696
697   /* `tree_callback_flush' will return the keys of all values that haven't
698    * been touched in the last `config_flush_interval' seconds in `cfd'.
699    * The char*'s in this array point to the same memory as ci->file, so we
700    * don't need to free them separately. */
701   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
702
703   for (k = 0; k < cfd.keys_num; k++)
704   {
705     /* should never fail, since we have held the cache_lock
706      * the entire time */
707     assert( forget_file(cfd.keys[k]) == 0 );
708   }
709
710   if (cfd.keys != NULL)
711   {
712     free (cfd.keys);
713     cfd.keys = NULL;
714   }
715
716   return (0);
717 } /* int flush_old_values */
718
719 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
720 {
721   struct timeval now;
722   struct timespec next_flush;
723   int final_flush = 0; /* make sure we only flush once on shutdown */
724
725   gettimeofday (&now, NULL);
726   next_flush.tv_sec = now.tv_sec + config_flush_interval;
727   next_flush.tv_nsec = 1000 * now.tv_usec;
728
729   pthread_mutex_lock (&cache_lock);
730   while ((do_shutdown == 0) || (cache_queue_head != NULL))
731   {
732     cache_item_t *ci;
733     char *file;
734     char **values;
735     int values_num;
736     int status;
737     int i;
738
739     /* First, check if it's time to do the cache flush. */
740     gettimeofday (&now, NULL);
741     if ((now.tv_sec > next_flush.tv_sec)
742         || ((now.tv_sec == next_flush.tv_sec)
743           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
744     {
745       /* Flush all values that haven't been written in the last
746        * `config_write_interval' seconds. */
747       flush_old_values (config_write_interval);
748
749       /* Determine the time of the next cache flush. */
750       while (next_flush.tv_sec <= now.tv_sec)
751         next_flush.tv_sec += config_flush_interval;
752
753       /* unlock the cache while we rotate so we don't block incoming
754        * updates if the fsync() blocks on disk I/O */
755       pthread_mutex_unlock(&cache_lock);
756       journal_rotate();
757       pthread_mutex_lock(&cache_lock);
758     }
759
760     /* Now, check if there's something to store away. If not, wait until
761      * something comes in or it's time to do the cache flush.  if we are
762      * shutting down, do not wait around.  */
763     if (cache_queue_head == NULL && !do_shutdown)
764     {
765       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
766       if ((status != 0) && (status != ETIMEDOUT))
767       {
768         RRDD_LOG (LOG_ERR, "queue_thread_main: "
769             "pthread_cond_timedwait returned %i.", status);
770       }
771     }
772
773     /* We're about to shut down */
774     if (do_shutdown != 0 && !final_flush++)
775     {
776       if (config_flush_at_shutdown)
777         flush_old_values (-1); /* flush everything */
778       else
779         break;
780     }
781
782     /* Check if a value has arrived. This may be NULL if we timed out or there
783      * was an interrupt such as a signal. */
784     if (cache_queue_head == NULL)
785       continue;
786
787     ci = cache_queue_head;
788
789     /* copy the relevant parts */
790     file = strdup (ci->file);
791     if (file == NULL)
792     {
793       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
794       continue;
795     }
796
797     assert(ci->values != NULL);
798     assert(ci->values_num > 0);
799
800     values = ci->values;
801     values_num = ci->values_num;
802
803     wipe_ci_values(ci, time(NULL));
804     remove_from_queue(ci);
805
806     pthread_mutex_lock (&stats_lock);
807     assert (stats_queue_length > 0);
808     stats_queue_length--;
809     pthread_mutex_unlock (&stats_lock);
810
811     pthread_mutex_unlock (&cache_lock);
812
813     rrd_clear_error ();
814     status = rrd_update_r (file, NULL, values_num, (void *) values);
815     if (status != 0)
816     {
817       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
818           "rrd_update_r (%s) failed with status %i. (%s)",
819           file, status, rrd_get_error());
820     }
821
822     journal_write("wrote", file);
823     pthread_cond_broadcast(&ci->flushed);
824
825     for (i = 0; i < values_num; i++)
826       free (values[i]);
827
828     free(values);
829     free(file);
830
831     if (status == 0)
832     {
833       pthread_mutex_lock (&stats_lock);
834       stats_updates_written++;
835       stats_data_sets_written += values_num;
836       pthread_mutex_unlock (&stats_lock);
837     }
838
839     pthread_mutex_lock (&cache_lock);
840
841     /* We're about to shut down */
842     if (do_shutdown != 0 && !final_flush++)
843     {
844       if (config_flush_at_shutdown)
845           flush_old_values (-1); /* flush everything */
846       else
847         break;
848     }
849   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
850   pthread_mutex_unlock (&cache_lock);
851
852   if (config_flush_at_shutdown)
853   {
854     assert(cache_queue_head == NULL);
855     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
856   }
857
858   journal_done();
859
860   return (NULL);
861 } /* }}} void *queue_thread_main */
862
863 static int buffer_get_field (char **buffer_ret, /* {{{ */
864     size_t *buffer_size_ret, char **field_ret)
865 {
866   char *buffer;
867   size_t buffer_pos;
868   size_t buffer_size;
869   char *field;
870   size_t field_size;
871   int status;
872
873   buffer = *buffer_ret;
874   buffer_pos = 0;
875   buffer_size = *buffer_size_ret;
876   field = *buffer_ret;
877   field_size = 0;
878
879   if (buffer_size <= 0)
880     return (-1);
881
882   /* This is ensured by `handle_request'. */
883   assert (buffer[buffer_size - 1] == '\0');
884
885   status = -1;
886   while (buffer_pos < buffer_size)
887   {
888     /* Check for end-of-field or end-of-buffer */
889     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
890     {
891       field[field_size] = 0;
892       field_size++;
893       buffer_pos++;
894       status = 0;
895       break;
896     }
897     /* Handle escaped characters. */
898     else if (buffer[buffer_pos] == '\\')
899     {
900       if (buffer_pos >= (buffer_size - 1))
901         break;
902       buffer_pos++;
903       field[field_size] = buffer[buffer_pos];
904       field_size++;
905       buffer_pos++;
906     }
907     /* Normal operation */ 
908     else
909     {
910       field[field_size] = buffer[buffer_pos];
911       field_size++;
912       buffer_pos++;
913     }
914   } /* while (buffer_pos < buffer_size) */
915
916   if (status != 0)
917     return (status);
918
919   *buffer_ret = buffer + buffer_pos;
920   *buffer_size_ret = buffer_size - buffer_pos;
921   *field_ret = field;
922
923   return (0);
924 } /* }}} int buffer_get_field */
925
926 /* if we're restricting writes to the base directory,
927  * check whether the file falls within the dir
928  * returns 1 if OK, otherwise 0
929  */
930 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
931 {
932   assert(file != NULL);
933
934   if (!config_write_base_only
935       || sock == NULL /* journal replay */
936       || config_base_dir == NULL)
937     return 1;
938
939   if (strstr(file, "../") != NULL) goto err;
940
941   /* relative paths without "../" are ok */
942   if (*file != '/') return 1;
943
944   /* file must be of the format base + "/" + <1+ char filename> */
945   if (strlen(file) < _config_base_dir_len + 2) goto err;
946   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
947   if (*(file + _config_base_dir_len) != '/') goto err;
948
949   return 1;
950
951 err:
952   if (sock != NULL && sock->fd >= 0)
953     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
954
955   return 0;
956 } /* }}} static int check_file_access */
957
958 /* returns 1 if we have the required privilege level,
959  * otherwise issue an error to the user on sock */
960 static int has_privilege (listen_socket_t *sock, /* {{{ */
961                           socket_privilege priv)
962 {
963   if (sock == NULL) /* journal replay */
964     return 1;
965
966   if (sock->privilege >= priv)
967     return 1;
968
969   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
970 } /* }}} static int has_privilege */
971
972 static int flush_file (const char *filename) /* {{{ */
973 {
974   cache_item_t *ci;
975
976   pthread_mutex_lock (&cache_lock);
977
978   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
979   if (ci == NULL)
980   {
981     pthread_mutex_unlock (&cache_lock);
982     return (ENOENT);
983   }
984
985   if (ci->values_num > 0)
986   {
987     /* Enqueue at head */
988     enqueue_cache_item (ci, HEAD);
989     pthread_cond_wait(&ci->flushed, &cache_lock);
990   }
991
992   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
993    * may have been purged during our cond_wait() */
994
995   pthread_mutex_unlock(&cache_lock);
996
997   return (0);
998 } /* }}} int flush_file */
999
1000 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1001     char *buffer, size_t buffer_size)
1002 {
1003   int status;
1004   char **help_text;
1005   char *command;
1006
1007   char *help_help[2] =
1008   {
1009     "Command overview\n"
1010     ,
1011     "HELP [<command>]\n"
1012     "FLUSH <filename>\n"
1013     "FLUSHALL\n"
1014     "PENDING <filename>\n"
1015     "FORGET <filename>\n"
1016     "UPDATE <filename> <values> [<values> ...]\n"
1017     "BATCH\n"
1018     "STATS\n"
1019   };
1020
1021   char *help_flush[2] =
1022   {
1023     "Help for FLUSH\n"
1024     ,
1025     "Usage: FLUSH <filename>\n"
1026     "\n"
1027     "Adds the given filename to the head of the update queue and returns\n"
1028     "after is has been dequeued.\n"
1029   };
1030
1031   char *help_flushall[2] =
1032   {
1033     "Help for FLUSHALL\n"
1034     ,
1035     "Usage: FLUSHALL\n"
1036     "\n"
1037     "Triggers writing of all pending updates.  Returns immediately.\n"
1038   };
1039
1040   char *help_pending[2] =
1041   {
1042     "Help for PENDING\n"
1043     ,
1044     "Usage: PENDING <filename>\n"
1045     "\n"
1046     "Shows any 'pending' updates for a file, in order.\n"
1047     "The updates shown have not yet been written to the underlying RRD file.\n"
1048   };
1049
1050   char *help_forget[2] =
1051   {
1052     "Help for FORGET\n"
1053     ,
1054     "Usage: FORGET <filename>\n"
1055     "\n"
1056     "Removes the file completely from the cache.\n"
1057     "Any pending updates for the file will be lost.\n"
1058   };
1059
1060   char *help_update[2] =
1061   {
1062     "Help for UPDATE\n"
1063     ,
1064     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1065     "\n"
1066     "Adds the given file to the internal cache if it is not yet known and\n"
1067     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1068     "for details.\n"
1069     "\n"
1070     "Each <values> has the following form:\n"
1071     "  <values> = <time>:<value>[:<value>[...]]\n"
1072     "See the rrdupdate(1) manpage for details.\n"
1073   };
1074
1075   char *help_stats[2] =
1076   {
1077     "Help for STATS\n"
1078     ,
1079     "Usage: STATS\n"
1080     "\n"
1081     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1082     "a description of the values.\n"
1083   };
1084
1085   char *help_batch[2] =
1086   {
1087     "Help for BATCH\n"
1088     ,
1089     "The 'BATCH' command permits the client to initiate a bulk load\n"
1090     "   of commands to rrdcached.\n"
1091     "\n"
1092     "Usage:\n"
1093     "\n"
1094     "    client: BATCH\n"
1095     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1096     "    client: command #1\n"
1097     "    client: command #2\n"
1098     "    client: ... and so on\n"
1099     "    client: .\n"
1100     "    server: 2 errors\n"
1101     "    server: 7 message for command #7\n"
1102     "    server: 9 message for command #9\n"
1103     "\n"
1104     "For more information, consult the rrdcached(1) documentation.\n"
1105   };
1106
1107   status = buffer_get_field (&buffer, &buffer_size, &command);
1108   if (status != 0)
1109     help_text = help_help;
1110   else
1111   {
1112     if (strcasecmp (command, "update") == 0)
1113       help_text = help_update;
1114     else if (strcasecmp (command, "flush") == 0)
1115       help_text = help_flush;
1116     else if (strcasecmp (command, "flushall") == 0)
1117       help_text = help_flushall;
1118     else if (strcasecmp (command, "pending") == 0)
1119       help_text = help_pending;
1120     else if (strcasecmp (command, "forget") == 0)
1121       help_text = help_forget;
1122     else if (strcasecmp (command, "stats") == 0)
1123       help_text = help_stats;
1124     else if (strcasecmp (command, "batch") == 0)
1125       help_text = help_batch;
1126     else
1127       help_text = help_help;
1128   }
1129
1130   add_response_info(sock, help_text[1]);
1131   return send_response(sock, RESP_OK, help_text[0]);
1132 } /* }}} int handle_request_help */
1133
1134 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1135 {
1136   uint64_t copy_queue_length;
1137   uint64_t copy_updates_received;
1138   uint64_t copy_flush_received;
1139   uint64_t copy_updates_written;
1140   uint64_t copy_data_sets_written;
1141   uint64_t copy_journal_bytes;
1142   uint64_t copy_journal_rotate;
1143
1144   uint64_t tree_nodes_number;
1145   uint64_t tree_depth;
1146
1147   pthread_mutex_lock (&stats_lock);
1148   copy_queue_length       = stats_queue_length;
1149   copy_updates_received   = stats_updates_received;
1150   copy_flush_received     = stats_flush_received;
1151   copy_updates_written    = stats_updates_written;
1152   copy_data_sets_written  = stats_data_sets_written;
1153   copy_journal_bytes      = stats_journal_bytes;
1154   copy_journal_rotate     = stats_journal_rotate;
1155   pthread_mutex_unlock (&stats_lock);
1156
1157   pthread_mutex_lock (&cache_lock);
1158   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1159   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1160   pthread_mutex_unlock (&cache_lock);
1161
1162   add_response_info(sock,
1163                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1164   add_response_info(sock,
1165                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1166   add_response_info(sock,
1167                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1168   add_response_info(sock,
1169                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1170   add_response_info(sock,
1171                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1172   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1173   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1174   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1175   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1176
1177   send_response(sock, RESP_OK, "Statistics follow\n");
1178
1179   return (0);
1180 } /* }}} int handle_request_stats */
1181
1182 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1183     char *buffer, size_t buffer_size)
1184 {
1185   char *file;
1186   int status;
1187
1188   status = buffer_get_field (&buffer, &buffer_size, &file);
1189   if (status != 0)
1190   {
1191     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1192   }
1193   else
1194   {
1195     pthread_mutex_lock(&stats_lock);
1196     stats_flush_received++;
1197     pthread_mutex_unlock(&stats_lock);
1198
1199     if (!check_file_access(file, sock)) return 0;
1200
1201     status = flush_file (file);
1202     if (status == 0)
1203       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1204     else if (status == ENOENT)
1205     {
1206       /* no file in our tree; see whether it exists at all */
1207       struct stat statbuf;
1208
1209       memset(&statbuf, 0, sizeof(statbuf));
1210       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1211         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1212       else
1213         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1214     }
1215     else if (status < 0)
1216       return send_response(sock, RESP_ERR, "Internal error.\n");
1217     else
1218       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1219   }
1220
1221   /* NOTREACHED */
1222   assert(1==0);
1223 } /* }}} int handle_request_slurp */
1224
1225 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1226 {
1227   int status;
1228
1229   status = has_privilege(sock, PRIV_HIGH);
1230   if (status <= 0)
1231     return status;
1232
1233   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1234
1235   pthread_mutex_lock(&cache_lock);
1236   flush_old_values(-1);
1237   pthread_mutex_unlock(&cache_lock);
1238
1239   return send_response(sock, RESP_OK, "Started flush.\n");
1240 } /* }}} static int handle_request_flushall */
1241
1242 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1243                                   char *buffer, size_t buffer_size)
1244 {
1245   int status;
1246   char *file;
1247   cache_item_t *ci;
1248
1249   status = buffer_get_field(&buffer, &buffer_size, &file);
1250   if (status != 0)
1251     return send_response(sock, RESP_ERR,
1252                          "Usage: PENDING <filename>\n");
1253
1254   status = has_privilege(sock, PRIV_HIGH);
1255   if (status <= 0)
1256     return status;
1257
1258   pthread_mutex_lock(&cache_lock);
1259   ci = g_tree_lookup(cache_tree, file);
1260   if (ci == NULL)
1261   {
1262     pthread_mutex_unlock(&cache_lock);
1263     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1264   }
1265
1266   for (int i=0; i < ci->values_num; i++)
1267     add_response_info(sock, "%s\n", ci->values[i]);
1268
1269   pthread_mutex_unlock(&cache_lock);
1270   return send_response(sock, RESP_OK, "updates pending\n");
1271 } /* }}} static int handle_request_pending */
1272
1273 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1274                                  char *buffer, size_t buffer_size)
1275 {
1276   int status;
1277   char *file;
1278
1279   status = buffer_get_field(&buffer, &buffer_size, &file);
1280   if (status != 0)
1281     return send_response(sock, RESP_ERR,
1282                          "Usage: FORGET <filename>\n");
1283
1284   status = has_privilege(sock, PRIV_HIGH);
1285   if (status <= 0)
1286     return status;
1287
1288   if (!check_file_access(file, sock)) return 0;
1289
1290   pthread_mutex_lock(&cache_lock);
1291   status = forget_file(file);
1292   pthread_mutex_unlock(&cache_lock);
1293
1294   if (status == 0)
1295   {
1296     if (sock != NULL)
1297       journal_write("forget", file);
1298
1299     return send_response(sock, RESP_OK, "Gone!\n");
1300   }
1301   else
1302     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1303                          status < 0 ? "Internal error" : rrd_strerror(status));
1304
1305   /* NOTREACHED */
1306   assert(1==0);
1307 } /* }}} static int handle_request_forget */
1308
1309 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1310     char *buffer, size_t buffer_size)
1311 {
1312   char *file;
1313   int values_num = 0;
1314   int status;
1315   char orig_buf[CMD_MAX];
1316
1317   time_t now;
1318   cache_item_t *ci;
1319
1320   now = time (NULL);
1321
1322   status = has_privilege(sock, PRIV_HIGH);
1323   if (status <= 0)
1324     return status;
1325
1326   /* save it for the journal later */
1327   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1328
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return send_response(sock, RESP_ERR,
1332                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1333
1334   pthread_mutex_lock(&stats_lock);
1335   stats_updates_received++;
1336   pthread_mutex_unlock(&stats_lock);
1337
1338   if (!check_file_access(file, sock)) return 0;
1339
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1342
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346
1347     /* don't hold the lock while we setup; stat(2) might block */
1348     pthread_mutex_unlock(&cache_lock);
1349
1350     memset (&statbuf, 0, sizeof (statbuf));
1351     status = stat (file, &statbuf);
1352     if (status != 0)
1353     {
1354       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1355
1356       status = errno;
1357       if (status == ENOENT)
1358         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1359       else
1360         return send_response(sock, RESP_ERR,
1361                              "stat failed with error %i.\n", status);
1362     }
1363     if (!S_ISREG (statbuf.st_mode))
1364       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1365
1366     if (access(file, R_OK|W_OK) != 0)
1367       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1368                            file, rrd_strerror(errno));
1369
1370     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1371     if (ci == NULL)
1372     {
1373       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1374
1375       return send_response(sock, RESP_ERR, "malloc failed.\n");
1376     }
1377     memset (ci, 0, sizeof (cache_item_t));
1378
1379     ci->file = strdup (file);
1380     if (ci->file == NULL)
1381     {
1382       free (ci);
1383       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1384
1385       return send_response(sock, RESP_ERR, "strdup failed.\n");
1386     }
1387
1388     wipe_ci_values(ci, now);
1389     ci->flags = CI_FLAGS_IN_TREE;
1390
1391     pthread_mutex_lock(&cache_lock);
1392     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1393   } /* }}} */
1394   assert (ci != NULL);
1395
1396   /* don't re-write updates in replay mode */
1397   if (sock != NULL)
1398     journal_write("update", orig_buf);
1399
1400   while (buffer_size > 0)
1401   {
1402     char **temp;
1403     char *value;
1404
1405     status = buffer_get_field (&buffer, &buffer_size, &value);
1406     if (status != 0)
1407     {
1408       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1409       break;
1410     }
1411
1412     temp = (char **) realloc (ci->values,
1413         sizeof (char *) * (ci->values_num + 1));
1414     if (temp == NULL)
1415     {
1416       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1417       continue;
1418     }
1419     ci->values = temp;
1420
1421     ci->values[ci->values_num] = strdup (value);
1422     if (ci->values[ci->values_num] == NULL)
1423     {
1424       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1425       continue;
1426     }
1427     ci->values_num++;
1428
1429     values_num++;
1430   }
1431
1432   if (((now - ci->last_flush_time) >= config_write_interval)
1433       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1434       && (ci->values_num > 0))
1435   {
1436     enqueue_cache_item (ci, TAIL);
1437   }
1438
1439   pthread_mutex_unlock (&cache_lock);
1440
1441   if (values_num < 1)
1442     return send_response(sock, RESP_ERR, "No values updated.\n");
1443   else
1444     return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1445
1446   /* NOTREACHED */
1447   assert(1==0);
1448
1449 } /* }}} int handle_request_update */
1450
1451 /* we came across a "WROTE" entry during journal replay.
1452  * throw away any values that we have accumulated for this file
1453  */
1454 static int handle_request_wrote (const char *buffer) /* {{{ */
1455 {
1456   int i;
1457   cache_item_t *ci;
1458   const char *file = buffer;
1459
1460   pthread_mutex_lock(&cache_lock);
1461
1462   ci = g_tree_lookup(cache_tree, file);
1463   if (ci == NULL)
1464   {
1465     pthread_mutex_unlock(&cache_lock);
1466     return (0);
1467   }
1468
1469   if (ci->values)
1470   {
1471     for (i=0; i < ci->values_num; i++)
1472       free(ci->values[i]);
1473
1474     free(ci->values);
1475   }
1476
1477   wipe_ci_values(ci, time(NULL));
1478   remove_from_queue(ci);
1479
1480   pthread_mutex_unlock(&cache_lock);
1481   return (0);
1482 } /* }}} int handle_request_wrote */
1483
1484 /* start "BATCH" processing */
1485 static int batch_start (listen_socket_t *sock) /* {{{ */
1486 {
1487   int status;
1488   if (sock->batch_mode)
1489     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1490
1491   status = send_response(sock, RESP_OK,
1492                          "Go ahead.  End with dot '.' on its own line.\n");
1493   sock->batch_mode = 1;
1494   sock->batch_cmd = 0;
1495
1496   return status;
1497 } /* }}} static int batch_start */
1498
1499 /* finish "BATCH" processing and return results to the client */
1500 static int batch_done (listen_socket_t *sock) /* {{{ */
1501 {
1502   assert(sock->batch_mode);
1503   sock->batch_mode = 0;
1504   sock->batch_cmd  = 0;
1505   return send_response(sock, RESP_OK, "errors\n");
1506 } /* }}} static int batch_done */
1507
1508 /* if sock==NULL, we are in journal replay mode */
1509 static int handle_request (listen_socket_t *sock, /* {{{ */
1510                            char *buffer, size_t buffer_size)
1511 {
1512   char *buffer_ptr;
1513   char *command;
1514   int status;
1515
1516   assert (buffer[buffer_size - 1] == '\0');
1517
1518   buffer_ptr = buffer;
1519   command = NULL;
1520   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1521   if (status != 0)
1522   {
1523     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1524     return (-1);
1525   }
1526
1527   if (sock != NULL && sock->batch_mode)
1528     sock->batch_cmd++;
1529
1530   if (strcasecmp (command, "update") == 0)
1531     return (handle_request_update (sock, buffer_ptr, buffer_size));
1532   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1533   {
1534     /* this is only valid in replay mode */
1535     return (handle_request_wrote (buffer_ptr));
1536   }
1537   else if (strcasecmp (command, "flush") == 0)
1538     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1539   else if (strcasecmp (command, "flushall") == 0)
1540     return (handle_request_flushall(sock));
1541   else if (strcasecmp (command, "pending") == 0)
1542     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1543   else if (strcasecmp (command, "forget") == 0)
1544     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1545   else if (strcasecmp (command, "stats") == 0)
1546     return (handle_request_stats (sock));
1547   else if (strcasecmp (command, "help") == 0)
1548     return (handle_request_help (sock, buffer_ptr, buffer_size));
1549   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1550     return batch_start(sock);
1551   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1552     return batch_done(sock);
1553   else
1554     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1555
1556   /* NOTREACHED */
1557   assert(1==0);
1558 } /* }}} int handle_request */
1559
1560 /* MUST NOT hold journal_lock before calling this */
1561 static void journal_rotate(void) /* {{{ */
1562 {
1563   FILE *old_fh = NULL;
1564   int new_fd;
1565
1566   if (journal_cur == NULL || journal_old == NULL)
1567     return;
1568
1569   pthread_mutex_lock(&journal_lock);
1570
1571   /* we rotate this way (rename before close) so that the we can release
1572    * the journal lock as fast as possible.  Journal writes to the new
1573    * journal can proceed immediately after the new file is opened.  The
1574    * fclose can then block without affecting new updates.
1575    */
1576   if (journal_fh != NULL)
1577   {
1578     old_fh = journal_fh;
1579     journal_fh = NULL;
1580     rename(journal_cur, journal_old);
1581     ++stats_journal_rotate;
1582   }
1583
1584   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1585                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1586   if (new_fd >= 0)
1587   {
1588     journal_fh = fdopen(new_fd, "a");
1589     if (journal_fh == NULL)
1590       close(new_fd);
1591   }
1592
1593   pthread_mutex_unlock(&journal_lock);
1594
1595   if (old_fh != NULL)
1596     fclose(old_fh);
1597
1598   if (journal_fh == NULL)
1599   {
1600     RRDD_LOG(LOG_CRIT,
1601              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1602              journal_cur, rrd_strerror(errno));
1603
1604     RRDD_LOG(LOG_ERR,
1605              "JOURNALING DISABLED: All values will be flushed at shutdown");
1606     config_flush_at_shutdown = 1;
1607   }
1608
1609 } /* }}} static void journal_rotate */
1610
1611 static void journal_done(void) /* {{{ */
1612 {
1613   if (journal_cur == NULL)
1614     return;
1615
1616   pthread_mutex_lock(&journal_lock);
1617   if (journal_fh != NULL)
1618   {
1619     fclose(journal_fh);
1620     journal_fh = NULL;
1621   }
1622
1623   if (config_flush_at_shutdown)
1624   {
1625     RRDD_LOG(LOG_INFO, "removing journals");
1626     unlink(journal_old);
1627     unlink(journal_cur);
1628   }
1629   else
1630   {
1631     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1632              "journals will be used at next startup");
1633   }
1634
1635   pthread_mutex_unlock(&journal_lock);
1636
1637 } /* }}} static void journal_done */
1638
1639 static int journal_write(char *cmd, char *args) /* {{{ */
1640 {
1641   int chars;
1642
1643   if (journal_fh == NULL)
1644     return 0;
1645
1646   pthread_mutex_lock(&journal_lock);
1647   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1648   pthread_mutex_unlock(&journal_lock);
1649
1650   if (chars > 0)
1651   {
1652     pthread_mutex_lock(&stats_lock);
1653     stats_journal_bytes += chars;
1654     pthread_mutex_unlock(&stats_lock);
1655   }
1656
1657   return chars;
1658 } /* }}} static int journal_write */
1659
1660 static int journal_replay (const char *file) /* {{{ */
1661 {
1662   FILE *fh;
1663   int entry_cnt = 0;
1664   int fail_cnt = 0;
1665   uint64_t line = 0;
1666   char entry[CMD_MAX];
1667
1668   if (file == NULL) return 0;
1669
1670   {
1671     char *reason;
1672     int status = 0;
1673     struct stat statbuf;
1674
1675     memset(&statbuf, 0, sizeof(statbuf));
1676     if (stat(file, &statbuf) != 0)
1677     {
1678       if (errno == ENOENT)
1679         return 0;
1680
1681       reason = "stat error";
1682       status = errno;
1683     }
1684     else if (!S_ISREG(statbuf.st_mode))
1685     {
1686       reason = "not a regular file";
1687       status = EPERM;
1688     }
1689     if (statbuf.st_uid != daemon_uid)
1690     {
1691       reason = "not owned by daemon user";
1692       status = EACCES;
1693     }
1694     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1695     {
1696       reason = "must not be user/group writable";
1697       status = EACCES;
1698     }
1699
1700     if (status != 0)
1701     {
1702       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1703                file, rrd_strerror(status), reason);
1704       return 0;
1705     }
1706   }
1707
1708   fh = fopen(file, "r");
1709   if (fh == NULL)
1710   {
1711     if (errno != ENOENT)
1712       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1713                file, rrd_strerror(errno));
1714     return 0;
1715   }
1716   else
1717     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1718
1719   while(!feof(fh))
1720   {
1721     size_t entry_len;
1722
1723     ++line;
1724     if (fgets(entry, sizeof(entry), fh) == NULL)
1725       break;
1726     entry_len = strlen(entry);
1727
1728     /* check \n termination in case journal writing crashed mid-line */
1729     if (entry_len == 0)
1730       continue;
1731     else if (entry[entry_len - 1] != '\n')
1732     {
1733       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1734       ++fail_cnt;
1735       continue;
1736     }
1737
1738     entry[entry_len - 1] = '\0';
1739
1740     if (handle_request(NULL, entry, entry_len) == 0)
1741       ++entry_cnt;
1742     else
1743       ++fail_cnt;
1744   }
1745
1746   fclose(fh);
1747
1748   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1749            entry_cnt, fail_cnt);
1750
1751   return entry_cnt > 0 ? 1 : 0;
1752 } /* }}} static int journal_replay */
1753
1754 static void journal_init(void) /* {{{ */
1755 {
1756   int had_journal = 0;
1757
1758   if (journal_cur == NULL) return;
1759
1760   pthread_mutex_lock(&journal_lock);
1761
1762   RRDD_LOG(LOG_INFO, "checking for journal files");
1763
1764   had_journal += journal_replay(journal_old);
1765   had_journal += journal_replay(journal_cur);
1766
1767   /* it must have been a crash.  start a flush */
1768   if (had_journal && config_flush_at_shutdown)
1769     flush_old_values(-1);
1770
1771   pthread_mutex_unlock(&journal_lock);
1772   journal_rotate();
1773
1774   RRDD_LOG(LOG_INFO, "journal processing complete");
1775
1776 } /* }}} static void journal_init */
1777
1778 static void close_connection(listen_socket_t *sock)
1779 {
1780   close(sock->fd) ;  sock->fd   = -1;
1781   free(sock->rbuf);  sock->rbuf = NULL;
1782   free(sock->wbuf);  sock->wbuf = NULL;
1783
1784   free(sock);
1785 }
1786
1787 static void *connection_thread_main (void *args) /* {{{ */
1788 {
1789   pthread_t self;
1790   listen_socket_t *sock;
1791   int i;
1792   int fd;
1793
1794   sock = (listen_socket_t *) args;
1795   fd = sock->fd;
1796
1797   /* init read buffers */
1798   sock->next_read = sock->next_cmd = 0;
1799   sock->rbuf = malloc(RBUF_SIZE);
1800   if (sock->rbuf == NULL)
1801   {
1802     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1803     close_connection(sock);
1804     return NULL;
1805   }
1806
1807   pthread_mutex_lock (&connection_threads_lock);
1808   {
1809     pthread_t *temp;
1810
1811     temp = (pthread_t *) realloc (connection_threads,
1812         sizeof (pthread_t) * (connection_threads_num + 1));
1813     if (temp == NULL)
1814     {
1815       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1816     }
1817     else
1818     {
1819       connection_threads = temp;
1820       connection_threads[connection_threads_num] = pthread_self ();
1821       connection_threads_num++;
1822     }
1823   }
1824   pthread_mutex_unlock (&connection_threads_lock);
1825
1826   while (do_shutdown == 0)
1827   {
1828     char *cmd;
1829     ssize_t cmd_len;
1830     ssize_t rbytes;
1831
1832     struct pollfd pollfd;
1833     int status;
1834
1835     pollfd.fd = fd;
1836     pollfd.events = POLLIN | POLLPRI;
1837     pollfd.revents = 0;
1838
1839     status = poll (&pollfd, 1, /* timeout = */ 500);
1840     if (do_shutdown)
1841       break;
1842     else if (status == 0) /* timeout */
1843       continue;
1844     else if (status < 0) /* error */
1845     {
1846       status = errno;
1847       if (status == EINTR)
1848         continue;
1849       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1850       continue;
1851     }
1852
1853     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1854     {
1855       close_connection(sock);
1856       break;
1857     }
1858     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1859     {
1860       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1861           "poll(2) returned something unexpected: %#04hx",
1862           pollfd.revents);
1863       close_connection(sock);
1864       break;
1865     }
1866
1867     rbytes = read(fd, sock->rbuf + sock->next_read,
1868                   RBUF_SIZE - sock->next_read);
1869     if (rbytes < 0)
1870     {
1871       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1872       break;
1873     }
1874     else if (rbytes == 0)
1875       break; /* eof */
1876
1877     sock->next_read += rbytes;
1878
1879     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1880     {
1881       status = handle_request (sock, cmd, cmd_len+1);
1882       if (status != 0)
1883         goto out_close;
1884     }
1885   }
1886
1887 out_close:
1888   close_connection(sock);
1889
1890   self = pthread_self ();
1891   /* Remove this thread from the connection threads list */
1892   pthread_mutex_lock (&connection_threads_lock);
1893   /* Find out own index in the array */
1894   for (i = 0; i < connection_threads_num; i++)
1895     if (pthread_equal (connection_threads[i], self) != 0)
1896       break;
1897   assert (i < connection_threads_num);
1898
1899   /* Move the trailing threads forward. */
1900   if (i < (connection_threads_num - 1))
1901   {
1902     memmove (connection_threads + i,
1903         connection_threads + i + 1,
1904         sizeof (pthread_t) * (connection_threads_num - i - 1));
1905   }
1906
1907   connection_threads_num--;
1908   pthread_mutex_unlock (&connection_threads_lock);
1909
1910   return (NULL);
1911 } /* }}} void *connection_thread_main */
1912
1913 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1914 {
1915   int fd;
1916   struct sockaddr_un sa;
1917   listen_socket_t *temp;
1918   int status;
1919   const char *path;
1920
1921   path = sock->addr;
1922   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1923     path += strlen("unix:");
1924
1925   temp = (listen_socket_t *) realloc (listen_fds,
1926       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1927   if (temp == NULL)
1928   {
1929     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1930     return (-1);
1931   }
1932   listen_fds = temp;
1933   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1934
1935   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1936   if (fd < 0)
1937   {
1938     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1939     return (-1);
1940   }
1941
1942   memset (&sa, 0, sizeof (sa));
1943   sa.sun_family = AF_UNIX;
1944   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1945
1946   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1947   if (status != 0)
1948   {
1949     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1950     close (fd);
1951     unlink (path);
1952     return (-1);
1953   }
1954
1955   status = listen (fd, /* backlog = */ 10);
1956   if (status != 0)
1957   {
1958     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1959     close (fd);
1960     unlink (path);
1961     return (-1);
1962   }
1963
1964   listen_fds[listen_fds_num].fd = fd;
1965   listen_fds[listen_fds_num].family = PF_UNIX;
1966   strncpy(listen_fds[listen_fds_num].addr, path,
1967           sizeof (listen_fds[listen_fds_num].addr) - 1);
1968   listen_fds_num++;
1969
1970   return (0);
1971 } /* }}} int open_listen_socket_unix */
1972
1973 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1974 {
1975   struct addrinfo ai_hints;
1976   struct addrinfo *ai_res;
1977   struct addrinfo *ai_ptr;
1978   char addr_copy[NI_MAXHOST];
1979   char *addr;
1980   char *port;
1981   int status;
1982
1983   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1984   addr_copy[sizeof (addr_copy) - 1] = 0;
1985   addr = addr_copy;
1986
1987   memset (&ai_hints, 0, sizeof (ai_hints));
1988   ai_hints.ai_flags = 0;
1989 #ifdef AI_ADDRCONFIG
1990   ai_hints.ai_flags |= AI_ADDRCONFIG;
1991 #endif
1992   ai_hints.ai_family = AF_UNSPEC;
1993   ai_hints.ai_socktype = SOCK_STREAM;
1994
1995   port = NULL;
1996   if (*addr == '[') /* IPv6+port format */
1997   {
1998     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1999     addr++;
2000
2001     port = strchr (addr, ']');
2002     if (port == NULL)
2003     {
2004       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2005           sock->addr);
2006       return (-1);
2007     }
2008     *port = 0;
2009     port++;
2010
2011     if (*port == ':')
2012       port++;
2013     else if (*port == 0)
2014       port = NULL;
2015     else
2016     {
2017       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2018           port);
2019       return (-1);
2020     }
2021   } /* if (*addr = ']') */
2022   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2023   {
2024     port = rindex(addr, ':');
2025     if (port != NULL)
2026     {
2027       *port = 0;
2028       port++;
2029     }
2030   }
2031   ai_res = NULL;
2032   status = getaddrinfo (addr,
2033                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2034                         &ai_hints, &ai_res);
2035   if (status != 0)
2036   {
2037     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2038         "%s", addr, gai_strerror (status));
2039     return (-1);
2040   }
2041
2042   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2043   {
2044     int fd;
2045     listen_socket_t *temp;
2046     int one = 1;
2047
2048     temp = (listen_socket_t *) realloc (listen_fds,
2049         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2050     if (temp == NULL)
2051     {
2052       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2053       continue;
2054     }
2055     listen_fds = temp;
2056     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2057
2058     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2059     if (fd < 0)
2060     {
2061       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2062       continue;
2063     }
2064
2065     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2066
2067     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2068     if (status != 0)
2069     {
2070       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2071       close (fd);
2072       continue;
2073     }
2074
2075     status = listen (fd, /* backlog = */ 10);
2076     if (status != 0)
2077     {
2078       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2079       close (fd);
2080       return (-1);
2081     }
2082
2083     listen_fds[listen_fds_num].fd = fd;
2084     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2085     listen_fds_num++;
2086   } /* for (ai_ptr) */
2087
2088   return (0);
2089 } /* }}} static int open_listen_socket_network */
2090
2091 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2092 {
2093   assert(sock != NULL);
2094   assert(sock->addr != NULL);
2095
2096   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2097       || sock->addr[0] == '/')
2098     return (open_listen_socket_unix(sock));
2099   else
2100     return (open_listen_socket_network(sock));
2101 } /* }}} int open_listen_socket */
2102
2103 static int close_listen_sockets (void) /* {{{ */
2104 {
2105   size_t i;
2106
2107   for (i = 0; i < listen_fds_num; i++)
2108   {
2109     close (listen_fds[i].fd);
2110
2111     if (listen_fds[i].family == PF_UNIX)
2112       unlink(listen_fds[i].addr);
2113   }
2114
2115   free (listen_fds);
2116   listen_fds = NULL;
2117   listen_fds_num = 0;
2118
2119   return (0);
2120 } /* }}} int close_listen_sockets */
2121
2122 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2123 {
2124   struct pollfd *pollfds;
2125   int pollfds_num;
2126   int status;
2127   int i;
2128
2129   for (i = 0; i < config_listen_address_list_len; i++)
2130     open_listen_socket (config_listen_address_list[i]);
2131
2132   if (config_listen_address_list_len < 1)
2133   {
2134     listen_socket_t sock;
2135     memset(&sock, 0, sizeof(sock));
2136     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2137     open_listen_socket (&sock);
2138   }
2139
2140   if (listen_fds_num < 1)
2141   {
2142     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2143         "could be opened. Sorry.");
2144     return (NULL);
2145   }
2146
2147   pollfds_num = listen_fds_num;
2148   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2149   if (pollfds == NULL)
2150   {
2151     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2152     return (NULL);
2153   }
2154   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2155
2156   RRDD_LOG(LOG_INFO, "listening for connections");
2157
2158   while (do_shutdown == 0)
2159   {
2160     assert (pollfds_num == ((int) listen_fds_num));
2161     for (i = 0; i < pollfds_num; i++)
2162     {
2163       pollfds[i].fd = listen_fds[i].fd;
2164       pollfds[i].events = POLLIN | POLLPRI;
2165       pollfds[i].revents = 0;
2166     }
2167
2168     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2169     if (do_shutdown)
2170       break;
2171     else if (status == 0) /* timeout */
2172       continue;
2173     else if (status < 0) /* error */
2174     {
2175       status = errno;
2176       if (status != EINTR)
2177       {
2178         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2179       }
2180       continue;
2181     }
2182
2183     for (i = 0; i < pollfds_num; i++)
2184     {
2185       listen_socket_t *client_sock;
2186       struct sockaddr_storage client_sa;
2187       socklen_t client_sa_size;
2188       pthread_t tid;
2189       pthread_attr_t attr;
2190
2191       if (pollfds[i].revents == 0)
2192         continue;
2193
2194       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2195       {
2196         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2197             "poll(2) returned something unexpected for listen FD #%i.",
2198             pollfds[i].fd);
2199         continue;
2200       }
2201
2202       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2203       if (client_sock == NULL)
2204       {
2205         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2206         continue;
2207       }
2208       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2209
2210       client_sa_size = sizeof (client_sa);
2211       client_sock->fd = accept (pollfds[i].fd,
2212           (struct sockaddr *) &client_sa, &client_sa_size);
2213       if (client_sock->fd < 0)
2214       {
2215         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2216         free(client_sock);
2217         continue;
2218       }
2219
2220       pthread_attr_init (&attr);
2221       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2222
2223       status = pthread_create (&tid, &attr, connection_thread_main,
2224                                client_sock);
2225       if (status != 0)
2226       {
2227         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2228         close_connection(client_sock);
2229         continue;
2230       }
2231     } /* for (pollfds_num) */
2232   } /* while (do_shutdown == 0) */
2233
2234   RRDD_LOG(LOG_INFO, "starting shutdown");
2235
2236   close_listen_sockets ();
2237
2238   pthread_mutex_lock (&connection_threads_lock);
2239   while (connection_threads_num > 0)
2240   {
2241     pthread_t wait_for;
2242
2243     wait_for = connection_threads[0];
2244
2245     pthread_mutex_unlock (&connection_threads_lock);
2246     pthread_join (wait_for, /* retval = */ NULL);
2247     pthread_mutex_lock (&connection_threads_lock);
2248   }
2249   pthread_mutex_unlock (&connection_threads_lock);
2250
2251   return (NULL);
2252 } /* }}} void *listen_thread_main */
2253
2254 static int daemonize (void) /* {{{ */
2255 {
2256   int status;
2257   int fd;
2258   char *base_dir;
2259
2260   daemon_uid = geteuid();
2261
2262   fd = open_pidfile();
2263   if (fd < 0) return fd;
2264
2265   if (!stay_foreground)
2266   {
2267     pid_t child;
2268
2269     child = fork ();
2270     if (child < 0)
2271     {
2272       fprintf (stderr, "daemonize: fork(2) failed.\n");
2273       return (-1);
2274     }
2275     else if (child > 0)
2276     {
2277       return (1);
2278     }
2279
2280     /* Become session leader */
2281     setsid ();
2282
2283     /* Open the first three file descriptors to /dev/null */
2284     close (2);
2285     close (1);
2286     close (0);
2287
2288     open ("/dev/null", O_RDWR);
2289     dup (0);
2290     dup (0);
2291   } /* if (!stay_foreground) */
2292
2293   /* Change into the /tmp directory. */
2294   base_dir = (config_base_dir != NULL)
2295     ? config_base_dir
2296     : "/tmp";
2297   status = chdir (base_dir);
2298   if (status != 0)
2299   {
2300     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2301     return (-1);
2302   }
2303
2304   install_signal_handlers();
2305
2306   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2307   RRDD_LOG(LOG_INFO, "starting up");
2308
2309   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2310   if (cache_tree == NULL)
2311   {
2312     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2313     return (-1);
2314   }
2315
2316   status = write_pidfile (fd);
2317   return status;
2318 } /* }}} int daemonize */
2319
2320 static int cleanup (void) /* {{{ */
2321 {
2322   do_shutdown++;
2323
2324   pthread_cond_signal (&cache_cond);
2325   pthread_join (queue_thread, /* return = */ NULL);
2326
2327   remove_pidfile ();
2328
2329   RRDD_LOG(LOG_INFO, "goodbye");
2330   closelog ();
2331
2332   return (0);
2333 } /* }}} int cleanup */
2334
2335 static int read_options (int argc, char **argv) /* {{{ */
2336 {
2337   int option;
2338   int status = 0;
2339
2340   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2341   {
2342     switch (option)
2343     {
2344       case 'g':
2345         stay_foreground=1;
2346         break;
2347
2348       case 'L':
2349       case 'l':
2350       {
2351         listen_socket_t **temp;
2352         listen_socket_t *new;
2353
2354         new = malloc(sizeof(listen_socket_t));
2355         if (new == NULL)
2356         {
2357           fprintf(stderr, "read_options: malloc failed.\n");
2358           return(2);
2359         }
2360         memset(new, 0, sizeof(listen_socket_t));
2361
2362         temp = (listen_socket_t **) realloc (config_listen_address_list,
2363             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2364         if (temp == NULL)
2365         {
2366           fprintf (stderr, "read_options: realloc failed.\n");
2367           return (2);
2368         }
2369         config_listen_address_list = temp;
2370
2371         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2372         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2373
2374         temp[config_listen_address_list_len] = new;
2375         config_listen_address_list_len++;
2376       }
2377       break;
2378
2379       case 'f':
2380       {
2381         int temp;
2382
2383         temp = atoi (optarg);
2384         if (temp > 0)
2385           config_flush_interval = temp;
2386         else
2387         {
2388           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2389           status = 3;
2390         }
2391       }
2392       break;
2393
2394       case 'w':
2395       {
2396         int temp;
2397
2398         temp = atoi (optarg);
2399         if (temp > 0)
2400           config_write_interval = temp;
2401         else
2402         {
2403           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2404           status = 2;
2405         }
2406       }
2407       break;
2408
2409       case 'z':
2410       {
2411         int temp;
2412
2413         temp = atoi(optarg);
2414         if (temp > 0)
2415           config_write_jitter = temp;
2416         else
2417         {
2418           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2419           status = 2;
2420         }
2421
2422         break;
2423       }
2424
2425       case 'B':
2426         config_write_base_only = 1;
2427         break;
2428
2429       case 'b':
2430       {
2431         size_t len;
2432
2433         if (config_base_dir != NULL)
2434           free (config_base_dir);
2435         config_base_dir = strdup (optarg);
2436         if (config_base_dir == NULL)
2437         {
2438           fprintf (stderr, "read_options: strdup failed.\n");
2439           return (3);
2440         }
2441
2442         len = strlen (config_base_dir);
2443         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2444         {
2445           config_base_dir[len - 1] = 0;
2446           len--;
2447         }
2448
2449         if (len < 1)
2450         {
2451           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2452           return (4);
2453         }
2454
2455         _config_base_dir_len = len;
2456       }
2457       break;
2458
2459       case 'p':
2460       {
2461         if (config_pid_file != NULL)
2462           free (config_pid_file);
2463         config_pid_file = strdup (optarg);
2464         if (config_pid_file == NULL)
2465         {
2466           fprintf (stderr, "read_options: strdup failed.\n");
2467           return (3);
2468         }
2469       }
2470       break;
2471
2472       case 'F':
2473         config_flush_at_shutdown = 1;
2474         break;
2475
2476       case 'j':
2477       {
2478         struct stat statbuf;
2479         const char *dir = optarg;
2480
2481         status = stat(dir, &statbuf);
2482         if (status != 0)
2483         {
2484           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2485           return 6;
2486         }
2487
2488         if (!S_ISDIR(statbuf.st_mode)
2489             || access(dir, R_OK|W_OK|X_OK) != 0)
2490         {
2491           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2492                   errno ? rrd_strerror(errno) : "");
2493           return 6;
2494         }
2495
2496         journal_cur = malloc(PATH_MAX + 1);
2497         journal_old = malloc(PATH_MAX + 1);
2498         if (journal_cur == NULL || journal_old == NULL)
2499         {
2500           fprintf(stderr, "malloc failure for journal files\n");
2501           return 6;
2502         }
2503         else 
2504         {
2505           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2506           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2507         }
2508       }
2509       break;
2510
2511       case 'h':
2512       case '?':
2513         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2514             "\n"
2515             "Usage: rrdcached [options]\n"
2516             "\n"
2517             "Valid options are:\n"
2518             "  -l <address>  Socket address to listen to.\n"
2519             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2520             "  -w <seconds>  Interval in which to write data.\n"
2521             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2522             "  -f <seconds>  Interval in which to flush dead data.\n"
2523             "  -p <file>     Location of the PID-file.\n"
2524             "  -b <dir>      Base directory to change to.\n"
2525             "  -B            Restrict file access to paths within -b <dir>\n"
2526             "  -g            Do not fork and run in the foreground.\n"
2527             "  -j <dir>      Directory in which to create the journal files.\n"
2528             "  -F            Always flush all updates at shutdown\n"
2529             "\n"
2530             "For more information and a detailed description of all options "
2531             "please refer\n"
2532             "to the rrdcached(1) manual page.\n",
2533             VERSION);
2534         status = -1;
2535         break;
2536     } /* switch (option) */
2537   } /* while (getopt) */
2538
2539   /* advise the user when values are not sane */
2540   if (config_flush_interval < 2 * config_write_interval)
2541     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2542             " 2x write interval (-w) !\n");
2543   if (config_write_jitter > config_write_interval)
2544     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2545             " write interval (-w) !\n");
2546
2547   if (config_write_base_only && config_base_dir == NULL)
2548     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2549             "  Consult the rrdcached documentation\n");
2550
2551   if (journal_cur == NULL)
2552     config_flush_at_shutdown = 1;
2553
2554   return (status);
2555 } /* }}} int read_options */
2556
2557 int main (int argc, char **argv)
2558 {
2559   int status;
2560
2561   status = read_options (argc, argv);
2562   if (status != 0)
2563   {
2564     if (status < 0)
2565       status = 0;
2566     return (status);
2567   }
2568
2569   status = daemonize ();
2570   if (status == 1)
2571   {
2572     struct sigaction sigchld;
2573
2574     memset (&sigchld, 0, sizeof (sigchld));
2575     sigchld.sa_handler = SIG_IGN;
2576     sigaction (SIGCHLD, &sigchld, NULL);
2577
2578     return (0);
2579   }
2580   else if (status != 0)
2581   {
2582     fprintf (stderr, "daemonize failed, exiting.\n");
2583     return (1);
2584   }
2585
2586   journal_init();
2587
2588   /* start the queue thread */
2589   memset (&queue_thread, 0, sizeof (queue_thread));
2590   status = pthread_create (&queue_thread,
2591                            NULL, /* attr */
2592                            queue_thread_main,
2593                            NULL); /* args */
2594   if (status != 0)
2595   {
2596     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2597     cleanup();
2598     return (1);
2599   }
2600
2601   listen_thread_main (NULL);
2602   cleanup ();
2603
2604   return (0);
2605 } /* int main */
2606
2607 /*
2608  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2609  */