destroy flush condition variable
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
161
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
167
168   char *syntax;
169   char *help;
170 };
171
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
188
189 struct callback_flush_data_s
190 {
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
197
198 enum queue_side_e
199 {
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
204
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
208
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
214
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
217
218 static int do_shutdown = 0;
219
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
223
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
230
231 /* Cache stuff */
232 static GTree          *cache_tree = NULL;
233 static cache_item_t   *cache_queue_head = NULL;
234 static cache_item_t   *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
236
237 static int config_write_interval = 300;
238 static int config_write_jitter   = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
245
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
248
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
257
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
266
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
269
270 /* 
271  * Functions
272  */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276   do_shutdown++;
277   pthread_cond_broadcast(&flush_cond);
278   pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
280
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283   sig_common("INT");
284 } /* }}} void sig_int_handler */
285
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288   sig_common("TERM");
289 } /* }}} void sig_term_handler */
290
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293   config_flush_at_shutdown = 1;
294   sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
296
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299   config_flush_at_shutdown = 0;
300   sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
302
303 static void install_signal_handlers(void) /* {{{ */
304 {
305   /* These structures are static, because `sigaction' behaves weird if the are
306    * overwritten.. */
307   static struct sigaction sa_int;
308   static struct sigaction sa_term;
309   static struct sigaction sa_pipe;
310   static struct sigaction sa_usr1;
311   static struct sigaction sa_usr2;
312
313   /* Install signal handlers */
314   memset (&sa_int, 0, sizeof (sa_int));
315   sa_int.sa_handler = sig_int_handler;
316   sigaction (SIGINT, &sa_int, NULL);
317
318   memset (&sa_term, 0, sizeof (sa_term));
319   sa_term.sa_handler = sig_term_handler;
320   sigaction (SIGTERM, &sa_term, NULL);
321
322   memset (&sa_pipe, 0, sizeof (sa_pipe));
323   sa_pipe.sa_handler = SIG_IGN;
324   sigaction (SIGPIPE, &sa_pipe, NULL);
325
326   memset (&sa_pipe, 0, sizeof (sa_usr1));
327   sa_usr1.sa_handler = sig_usr1_handler;
328   sigaction (SIGUSR1, &sa_usr1, NULL);
329
330   memset (&sa_usr2, 0, sizeof (sa_usr2));
331   sa_usr2.sa_handler = sig_usr2_handler;
332   sigaction (SIGUSR2, &sa_usr2, NULL);
333
334 } /* }}} void install_signal_handlers */
335
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338   int fd;
339   char *file;
340
341   file = (config_pid_file != NULL)
342     ? config_pid_file
343     : LOCALSTATEDIR "/run/rrdcached.pid";
344
345   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346   if (fd < 0)
347     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348             action, file, rrd_strerror(errno));
349
350   return(fd);
351 } /* }}} static int open_pidfile */
352
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356   int pid_fd;
357   pid_t pid;
358   char pid_str[16];
359
360   pid_fd = open_pidfile("open", O_RDWR);
361   if (pid_fd < 0)
362     return pid_fd;
363
364   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365     return -1;
366
367   pid = atoi(pid_str);
368   if (pid <= 0)
369     return -1;
370
371   /* another running process that we can signal COULD be
372    * a competing rrdcached */
373   if (pid != getpid() && kill(pid, 0) == 0)
374   {
375     fprintf(stderr,
376             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377     close(pid_fd);
378     return -1;
379   }
380
381   lseek(pid_fd, 0, SEEK_SET);
382   if (ftruncate(pid_fd, 0) == -1)
383   {
384     fprintf(stderr,
385             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
386     close(pid_fd);
387     return -1;
388   }
389
390   fprintf(stderr,
391           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
392           "rrdcached: starting normally.\n", pid);
393
394   return pid_fd;
395 } /* }}} static int check_pidfile */
396
397 static int write_pidfile (int fd) /* {{{ */
398 {
399   pid_t pid;
400   FILE *fh;
401
402   pid = getpid ();
403
404   fh = fdopen (fd, "w");
405   if (fh == NULL)
406   {
407     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
408     close(fd);
409     return (-1);
410   }
411
412   fprintf (fh, "%i\n", (int) pid);
413   fclose (fh);
414
415   return (0);
416 } /* }}} int write_pidfile */
417
418 static int remove_pidfile (void) /* {{{ */
419 {
420   char *file;
421   int status;
422
423   file = (config_pid_file != NULL)
424     ? config_pid_file
425     : LOCALSTATEDIR "/run/rrdcached.pid";
426
427   status = unlink (file);
428   if (status == 0)
429     return (0);
430   return (errno);
431 } /* }}} int remove_pidfile */
432
433 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
434 {
435   char *eol;
436
437   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
438                sock->next_read - sock->next_cmd);
439
440   if (eol == NULL)
441   {
442     /* no commands left, move remainder back to front of rbuf */
443     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
444             sock->next_read - sock->next_cmd);
445     sock->next_read -= sock->next_cmd;
446     sock->next_cmd = 0;
447     *len = 0;
448     return NULL;
449   }
450   else
451   {
452     char *cmd = sock->rbuf + sock->next_cmd;
453     *eol = '\0';
454
455     sock->next_cmd = eol - sock->rbuf + 1;
456
457     if (eol > sock->rbuf && *(eol-1) == '\r')
458       *(--eol) = '\0'; /* handle "\r\n" EOL */
459
460     *len = eol - cmd;
461
462     return cmd;
463   }
464
465   /* NOTREACHED */
466   assert(1==0);
467 }
468
469 /* add the characters directly to the write buffer */
470 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
471 {
472   char *new_buf;
473
474   assert(sock != NULL);
475
476   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
477   if (new_buf == NULL)
478   {
479     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
480     return -1;
481   }
482
483   strncpy(new_buf + sock->wbuf_len, str, len + 1);
484
485   sock->wbuf = new_buf;
486   sock->wbuf_len += len;
487
488   return 0;
489 } /* }}} static int add_to_wbuf */
490
491 /* add the text to the "extra" info that's sent after the status line */
492 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
493 {
494   va_list argp;
495   char buffer[CMD_MAX];
496   int len;
497
498   if (sock == NULL) return 0; /* journal replay mode */
499   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
500
501   va_start(argp, fmt);
502 #ifdef HAVE_VSNPRINTF
503   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
504 #else
505   len = vsprintf(buffer, fmt, argp);
506 #endif
507   va_end(argp);
508   if (len < 0)
509   {
510     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
511     return -1;
512   }
513
514   return add_to_wbuf(sock, buffer, len);
515 } /* }}} static int add_response_info */
516
517 static int count_lines(char *str) /* {{{ */
518 {
519   int lines = 0;
520
521   if (str != NULL)
522   {
523     while ((str = strchr(str, '\n')) != NULL)
524     {
525       ++lines;
526       ++str;
527     }
528   }
529
530   return lines;
531 } /* }}} static int count_lines */
532
533 /* send the response back to the user.
534  * returns 0 on success, -1 on error
535  * write buffer is always zeroed after this call */
536 static int send_response (listen_socket_t *sock, response_code rc,
537                           char *fmt, ...) /* {{{ */
538 {
539   va_list argp;
540   char buffer[CMD_MAX];
541   int lines;
542   ssize_t wrote;
543   int rclen, len;
544
545   if (sock == NULL) return rc;  /* journal replay mode */
546
547   if (sock->batch_start)
548   {
549     if (rc == RESP_OK)
550       return rc; /* no response on success during BATCH */
551     lines = sock->batch_cmd;
552   }
553   else if (rc == RESP_OK)
554     lines = count_lines(sock->wbuf);
555   else
556     lines = -1;
557
558   rclen = sprintf(buffer, "%d ", lines);
559   va_start(argp, fmt);
560 #ifdef HAVE_VSNPRINTF
561   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
562 #else
563   len = vsprintf(buffer+rclen, fmt, argp);
564 #endif
565   va_end(argp);
566   if (len < 0)
567     return -1;
568
569   len += rclen;
570
571   /* append the result to the wbuf, don't write to the user */
572   if (sock->batch_start)
573     return add_to_wbuf(sock, buffer, len);
574
575   /* first write must be complete */
576   if (len != write(sock->fd, buffer, len))
577   {
578     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
579     return -1;
580   }
581
582   if (sock->wbuf != NULL && rc == RESP_OK)
583   {
584     wrote = 0;
585     while (wrote < sock->wbuf_len)
586     {
587       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
588       if (wb <= 0)
589       {
590         RRDD_LOG(LOG_INFO, "send_response: could not write results");
591         return -1;
592       }
593       wrote += wb;
594     }
595   }
596
597   free(sock->wbuf); sock->wbuf = NULL;
598   sock->wbuf_len = 0;
599
600   return 0;
601 } /* }}} */
602
603 static void wipe_ci_values(cache_item_t *ci, time_t when)
604 {
605   ci->values = NULL;
606   ci->values_num = 0;
607
608   ci->last_flush_time = when;
609   if (config_write_jitter > 0)
610     ci->last_flush_time += (rrd_random() % config_write_jitter);
611 }
612
613 /* remove_from_queue
614  * remove a "cache_item_t" item from the queue.
615  * must hold 'cache_lock' when calling this
616  */
617 static void remove_from_queue(cache_item_t *ci) /* {{{ */
618 {
619   if (ci == NULL) return;
620   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
621
622   if (ci->prev == NULL)
623     cache_queue_head = ci->next; /* reset head */
624   else
625     ci->prev->next = ci->next;
626
627   if (ci->next == NULL)
628     cache_queue_tail = ci->prev; /* reset the tail */
629   else
630     ci->next->prev = ci->prev;
631
632   ci->next = ci->prev = NULL;
633   ci->flags &= ~CI_FLAGS_IN_QUEUE;
634
635   pthread_mutex_lock (&stats_lock);
636   assert (stats_queue_length > 0);
637   stats_queue_length--;
638   pthread_mutex_unlock (&stats_lock);
639
640 } /* }}} static void remove_from_queue */
641
642 /* free the resources associated with the cache_item_t
643  * must hold cache_lock when calling this function
644  */
645 static void *free_cache_item(cache_item_t *ci) /* {{{ */
646 {
647   if (ci == NULL) return NULL;
648
649   remove_from_queue(ci);
650
651   for (size_t i=0; i < ci->values_num; i++)
652     free(ci->values[i]);
653
654   free (ci->values);
655   free (ci->file);
656
657   /* in case anyone is waiting */
658   pthread_cond_broadcast(&ci->flushed);
659   pthread_cond_destroy(&ci->flushed);
660
661   free (ci);
662
663   return NULL;
664 } /* }}} static void *free_cache_item */
665
666 /*
667  * enqueue_cache_item:
668  * `cache_lock' must be acquired before calling this function!
669  */
670 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
671     queue_side_t side)
672 {
673   if (ci == NULL)
674     return (-1);
675
676   if (ci->values_num == 0)
677     return (0);
678
679   if (side == HEAD)
680   {
681     if (cache_queue_head == ci)
682       return 0;
683
684     /* remove if further down in queue */
685     remove_from_queue(ci);
686
687     ci->prev = NULL;
688     ci->next = cache_queue_head;
689     if (ci->next != NULL)
690       ci->next->prev = ci;
691     cache_queue_head = ci;
692
693     if (cache_queue_tail == NULL)
694       cache_queue_tail = cache_queue_head;
695   }
696   else /* (side == TAIL) */
697   {
698     /* We don't move values back in the list.. */
699     if (ci->flags & CI_FLAGS_IN_QUEUE)
700       return (0);
701
702     assert (ci->next == NULL);
703     assert (ci->prev == NULL);
704
705     ci->prev = cache_queue_tail;
706
707     if (cache_queue_tail == NULL)
708       cache_queue_head = ci;
709     else
710       cache_queue_tail->next = ci;
711
712     cache_queue_tail = ci;
713   }
714
715   ci->flags |= CI_FLAGS_IN_QUEUE;
716
717   pthread_cond_signal(&queue_cond);
718   pthread_mutex_lock (&stats_lock);
719   stats_queue_length++;
720   pthread_mutex_unlock (&stats_lock);
721
722   return (0);
723 } /* }}} int enqueue_cache_item */
724
725 /*
726  * tree_callback_flush:
727  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
728  * while this is in progress.
729  */
730 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
731     gpointer data)
732 {
733   cache_item_t *ci;
734   callback_flush_data_t *cfd;
735
736   ci = (cache_item_t *) value;
737   cfd = (callback_flush_data_t *) data;
738
739   if (ci->flags & CI_FLAGS_IN_QUEUE)
740     return FALSE;
741
742   if ((ci->last_flush_time <= cfd->abs_timeout)
743       && (ci->values_num > 0))
744   {
745     enqueue_cache_item (ci, TAIL);
746   }
747   else if ((do_shutdown != 0)
748       && (ci->values_num > 0))
749   {
750     enqueue_cache_item (ci, TAIL);
751   }
752   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
753       && (ci->values_num <= 0))
754   {
755     assert ((char *) key == ci->file);
756     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
757     {
758       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
759       return (FALSE);
760     }
761   }
762
763   return (FALSE);
764 } /* }}} gboolean tree_callback_flush */
765
766 static int flush_old_values (int max_age)
767 {
768   callback_flush_data_t cfd;
769   size_t k;
770
771   memset (&cfd, 0, sizeof (cfd));
772   /* Pass the current time as user data so that we don't need to call
773    * `time' for each node. */
774   cfd.now = time (NULL);
775   cfd.keys = NULL;
776   cfd.keys_num = 0;
777
778   if (max_age > 0)
779     cfd.abs_timeout = cfd.now - max_age;
780   else
781     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
782
783   /* `tree_callback_flush' will return the keys of all values that haven't
784    * been touched in the last `config_flush_interval' seconds in `cfd'.
785    * The char*'s in this array point to the same memory as ci->file, so we
786    * don't need to free them separately. */
787   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
788
789   for (k = 0; k < cfd.keys_num; k++)
790   {
791     /* should never fail, since we have held the cache_lock
792      * the entire time */
793     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
794   }
795
796   if (cfd.keys != NULL)
797   {
798     free (cfd.keys);
799     cfd.keys = NULL;
800   }
801
802   return (0);
803 } /* int flush_old_values */
804
805 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
806 {
807   struct timeval now;
808   struct timespec next_flush;
809   int status;
810
811   gettimeofday (&now, NULL);
812   next_flush.tv_sec = now.tv_sec + config_flush_interval;
813   next_flush.tv_nsec = 1000 * now.tv_usec;
814
815   pthread_mutex_lock(&cache_lock);
816
817   while (!do_shutdown)
818   {
819     gettimeofday (&now, NULL);
820     if ((now.tv_sec > next_flush.tv_sec)
821         || ((now.tv_sec == next_flush.tv_sec)
822           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
823     {
824       /* Flush all values that haven't been written in the last
825        * `config_write_interval' seconds. */
826       flush_old_values (config_write_interval);
827
828       /* Determine the time of the next cache flush. */
829       next_flush.tv_sec =
830         now.tv_sec + next_flush.tv_sec % config_flush_interval;
831
832       /* unlock the cache while we rotate so we don't block incoming
833        * updates if the fsync() blocks on disk I/O */
834       pthread_mutex_unlock(&cache_lock);
835       journal_rotate();
836       pthread_mutex_lock(&cache_lock);
837     }
838
839     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
840     if (status != 0 && status != ETIMEDOUT)
841     {
842       RRDD_LOG (LOG_ERR, "flush_thread_main: "
843                 "pthread_cond_timedwait returned %i.", status);
844     }
845   }
846
847   if (config_flush_at_shutdown)
848     flush_old_values (-1); /* flush everything */
849
850   pthread_mutex_unlock(&cache_lock);
851
852   return NULL;
853 } /* void *flush_thread_main */
854
855 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
856 {
857   pthread_mutex_lock (&cache_lock);
858
859   while (!do_shutdown
860          || (cache_queue_head != NULL && config_flush_at_shutdown))
861   {
862     cache_item_t *ci;
863     char *file;
864     char **values;
865     size_t values_num;
866     int status;
867
868     /* Now, check if there's something to store away. If not, wait until
869      * something comes in.  if we are shutting down, do not wait around.  */
870     if (cache_queue_head == NULL && !do_shutdown)
871     {
872       status = pthread_cond_wait (&queue_cond, &cache_lock);
873       if ((status != 0) && (status != ETIMEDOUT))
874       {
875         RRDD_LOG (LOG_ERR, "queue_thread_main: "
876             "pthread_cond_wait returned %i.", status);
877       }
878     }
879
880     /* Check if a value has arrived. This may be NULL if we timed out or there
881      * was an interrupt such as a signal. */
882     if (cache_queue_head == NULL)
883       continue;
884
885     ci = cache_queue_head;
886
887     /* copy the relevant parts */
888     file = strdup (ci->file);
889     if (file == NULL)
890     {
891       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
892       continue;
893     }
894
895     assert(ci->values != NULL);
896     assert(ci->values_num > 0);
897
898     values = ci->values;
899     values_num = ci->values_num;
900
901     wipe_ci_values(ci, time(NULL));
902     remove_from_queue(ci);
903
904     pthread_mutex_unlock (&cache_lock);
905
906     rrd_clear_error ();
907     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
908     if (status != 0)
909     {
910       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
911           "rrd_update_r (%s) failed with status %i. (%s)",
912           file, status, rrd_get_error());
913     }
914
915     journal_write("wrote", file);
916     pthread_cond_broadcast(&ci->flushed);
917
918     rrd_free_ptrs((void ***) &values, &values_num);
919     free(file);
920
921     if (status == 0)
922     {
923       pthread_mutex_lock (&stats_lock);
924       stats_updates_written++;
925       stats_data_sets_written += values_num;
926       pthread_mutex_unlock (&stats_lock);
927     }
928
929     pthread_mutex_lock (&cache_lock);
930   }
931   pthread_mutex_unlock (&cache_lock);
932
933   return (NULL);
934 } /* }}} void *queue_thread_main */
935
936 static int buffer_get_field (char **buffer_ret, /* {{{ */
937     size_t *buffer_size_ret, char **field_ret)
938 {
939   char *buffer;
940   size_t buffer_pos;
941   size_t buffer_size;
942   char *field;
943   size_t field_size;
944   int status;
945
946   buffer = *buffer_ret;
947   buffer_pos = 0;
948   buffer_size = *buffer_size_ret;
949   field = *buffer_ret;
950   field_size = 0;
951
952   if (buffer_size <= 0)
953     return (-1);
954
955   /* This is ensured by `handle_request'. */
956   assert (buffer[buffer_size - 1] == '\0');
957
958   status = -1;
959   while (buffer_pos < buffer_size)
960   {
961     /* Check for end-of-field or end-of-buffer */
962     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
963     {
964       field[field_size] = 0;
965       field_size++;
966       buffer_pos++;
967       status = 0;
968       break;
969     }
970     /* Handle escaped characters. */
971     else if (buffer[buffer_pos] == '\\')
972     {
973       if (buffer_pos >= (buffer_size - 1))
974         break;
975       buffer_pos++;
976       field[field_size] = buffer[buffer_pos];
977       field_size++;
978       buffer_pos++;
979     }
980     /* Normal operation */ 
981     else
982     {
983       field[field_size] = buffer[buffer_pos];
984       field_size++;
985       buffer_pos++;
986     }
987   } /* while (buffer_pos < buffer_size) */
988
989   if (status != 0)
990     return (status);
991
992   *buffer_ret = buffer + buffer_pos;
993   *buffer_size_ret = buffer_size - buffer_pos;
994   *field_ret = field;
995
996   return (0);
997 } /* }}} int buffer_get_field */
998
999 /* if we're restricting writes to the base directory,
1000  * check whether the file falls within the dir
1001  * returns 1 if OK, otherwise 0
1002  */
1003 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1004 {
1005   assert(file != NULL);
1006
1007   if (!config_write_base_only
1008       || sock == NULL /* journal replay */
1009       || config_base_dir == NULL)
1010     return 1;
1011
1012   if (strstr(file, "../") != NULL) goto err;
1013
1014   /* relative paths without "../" are ok */
1015   if (*file != '/') return 1;
1016
1017   /* file must be of the format base + "/" + <1+ char filename> */
1018   if (strlen(file) < _config_base_dir_len + 2) goto err;
1019   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1020   if (*(file + _config_base_dir_len) != '/') goto err;
1021
1022   return 1;
1023
1024 err:
1025   if (sock != NULL && sock->fd >= 0)
1026     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1027
1028   return 0;
1029 } /* }}} static int check_file_access */
1030
1031 /* when using a base dir, convert relative paths to absolute paths.
1032  * if necessary, modifies the "filename" pointer to point
1033  * to the new path created in "tmp".  "tmp" is provided
1034  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1035  *
1036  * this allows us to optimize for the expected case (absolute path)
1037  * with a no-op.
1038  */
1039 static void get_abs_path(char **filename, char *tmp)
1040 {
1041   assert(tmp != NULL);
1042   assert(filename != NULL && *filename != NULL);
1043
1044   if (config_base_dir == NULL || **filename == '/')
1045     return;
1046
1047   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1048   *filename = tmp;
1049 } /* }}} static int get_abs_path */
1050
1051 /* returns 1 if we have the required privilege level,
1052  * otherwise issue an error to the user on sock */
1053 static int has_privilege (listen_socket_t *sock, /* {{{ */
1054                           socket_privilege priv)
1055 {
1056   if (sock == NULL) /* journal replay */
1057     return 1;
1058
1059   if (sock->privilege >= priv)
1060     return 1;
1061
1062   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1063 } /* }}} static int has_privilege */
1064
1065 static int flush_file (const char *filename) /* {{{ */
1066 {
1067   cache_item_t *ci;
1068
1069   pthread_mutex_lock (&cache_lock);
1070
1071   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1072   if (ci == NULL)
1073   {
1074     pthread_mutex_unlock (&cache_lock);
1075     return (ENOENT);
1076   }
1077
1078   if (ci->values_num > 0)
1079   {
1080     /* Enqueue at head */
1081     enqueue_cache_item (ci, HEAD);
1082     pthread_cond_wait(&ci->flushed, &cache_lock);
1083   }
1084
1085   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1086    * may have been purged during our cond_wait() */
1087
1088   pthread_mutex_unlock(&cache_lock);
1089
1090   return (0);
1091 } /* }}} int flush_file */
1092
1093 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1094 {
1095   char *err = "Syntax error.\n";
1096
1097   if (cmd && cmd->syntax)
1098     err = cmd->syntax;
1099
1100   return send_response(sock, RESP_ERR, "Usage: %s", err);
1101 } /* }}} static int syntax_error() */
1102
1103 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1104 {
1105   uint64_t copy_queue_length;
1106   uint64_t copy_updates_received;
1107   uint64_t copy_flush_received;
1108   uint64_t copy_updates_written;
1109   uint64_t copy_data_sets_written;
1110   uint64_t copy_journal_bytes;
1111   uint64_t copy_journal_rotate;
1112
1113   uint64_t tree_nodes_number;
1114   uint64_t tree_depth;
1115
1116   pthread_mutex_lock (&stats_lock);
1117   copy_queue_length       = stats_queue_length;
1118   copy_updates_received   = stats_updates_received;
1119   copy_flush_received     = stats_flush_received;
1120   copy_updates_written    = stats_updates_written;
1121   copy_data_sets_written  = stats_data_sets_written;
1122   copy_journal_bytes      = stats_journal_bytes;
1123   copy_journal_rotate     = stats_journal_rotate;
1124   pthread_mutex_unlock (&stats_lock);
1125
1126   pthread_mutex_lock (&cache_lock);
1127   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1128   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1129   pthread_mutex_unlock (&cache_lock);
1130
1131   add_response_info(sock,
1132                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1133   add_response_info(sock,
1134                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1135   add_response_info(sock,
1136                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1137   add_response_info(sock,
1138                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1139   add_response_info(sock,
1140                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1141   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1142   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1143   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1144   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1145
1146   send_response(sock, RESP_OK, "Statistics follow\n");
1147
1148   return (0);
1149 } /* }}} int handle_request_stats */
1150
1151 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1152 {
1153   char *file, file_tmp[PATH_MAX];
1154   int status;
1155
1156   status = buffer_get_field (&buffer, &buffer_size, &file);
1157   if (status != 0)
1158   {
1159     return syntax_error(sock,cmd);
1160   }
1161   else
1162   {
1163     pthread_mutex_lock(&stats_lock);
1164     stats_flush_received++;
1165     pthread_mutex_unlock(&stats_lock);
1166
1167     get_abs_path(&file, file_tmp);
1168     if (!check_file_access(file, sock)) return 0;
1169
1170     status = flush_file (file);
1171     if (status == 0)
1172       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1173     else if (status == ENOENT)
1174     {
1175       /* no file in our tree; see whether it exists at all */
1176       struct stat statbuf;
1177
1178       memset(&statbuf, 0, sizeof(statbuf));
1179       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1180         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1181       else
1182         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1183     }
1184     else if (status < 0)
1185       return send_response(sock, RESP_ERR, "Internal error.\n");
1186     else
1187       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1188   }
1189
1190   /* NOTREACHED */
1191   assert(1==0);
1192 } /* }}} int handle_request_flush */
1193
1194 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1195 {
1196   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1197
1198   pthread_mutex_lock(&cache_lock);
1199   flush_old_values(-1);
1200   pthread_mutex_unlock(&cache_lock);
1201
1202   return send_response(sock, RESP_OK, "Started flush.\n");
1203 } /* }}} static int handle_request_flushall */
1204
1205 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1206 {
1207   int status;
1208   char *file, file_tmp[PATH_MAX];
1209   cache_item_t *ci;
1210
1211   status = buffer_get_field(&buffer, &buffer_size, &file);
1212   if (status != 0)
1213     return syntax_error(sock,cmd);
1214
1215   get_abs_path(&file, file_tmp);
1216
1217   pthread_mutex_lock(&cache_lock);
1218   ci = g_tree_lookup(cache_tree, file);
1219   if (ci == NULL)
1220   {
1221     pthread_mutex_unlock(&cache_lock);
1222     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1223   }
1224
1225   for (size_t i=0; i < ci->values_num; i++)
1226     add_response_info(sock, "%s\n", ci->values[i]);
1227
1228   pthread_mutex_unlock(&cache_lock);
1229   return send_response(sock, RESP_OK, "updates pending\n");
1230 } /* }}} static int handle_request_pending */
1231
1232 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1233 {
1234   int status;
1235   gboolean found;
1236   char *file, file_tmp[PATH_MAX];
1237
1238   status = buffer_get_field(&buffer, &buffer_size, &file);
1239   if (status != 0)
1240     return syntax_error(sock,cmd);
1241
1242   get_abs_path(&file, file_tmp);
1243   if (!check_file_access(file, sock)) return 0;
1244
1245   pthread_mutex_lock(&cache_lock);
1246   found = g_tree_remove(cache_tree, file);
1247   pthread_mutex_unlock(&cache_lock);
1248
1249   if (found == TRUE)
1250   {
1251     if (sock != NULL)
1252       journal_write("forget", file);
1253
1254     return send_response(sock, RESP_OK, "Gone!\n");
1255   }
1256   else
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258
1259   /* NOTREACHED */
1260   assert(1==0);
1261 } /* }}} static int handle_request_forget */
1262
1263 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1264 {
1265   cache_item_t *ci;
1266
1267   pthread_mutex_lock(&cache_lock);
1268
1269   ci = cache_queue_head;
1270   while (ci != NULL)
1271   {
1272     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1273     ci = ci->next;
1274   }
1275
1276   pthread_mutex_unlock(&cache_lock);
1277
1278   return send_response(sock, RESP_OK, "in queue.\n");
1279 } /* }}} int handle_request_queue */
1280
1281 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1282 {
1283   char *file, file_tmp[PATH_MAX];
1284   int values_num = 0;
1285   int status;
1286   char orig_buf[CMD_MAX];
1287
1288   cache_item_t *ci;
1289
1290   /* save it for the journal later */
1291   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1292
1293   status = buffer_get_field (&buffer, &buffer_size, &file);
1294   if (status != 0)
1295     return syntax_error(sock,cmd);
1296
1297   pthread_mutex_lock(&stats_lock);
1298   stats_updates_received++;
1299   pthread_mutex_unlock(&stats_lock);
1300
1301   get_abs_path(&file, file_tmp);
1302   if (!check_file_access(file, sock)) return 0;
1303
1304   pthread_mutex_lock (&cache_lock);
1305   ci = g_tree_lookup (cache_tree, file);
1306
1307   if (ci == NULL) /* {{{ */
1308   {
1309     struct stat statbuf;
1310
1311     /* don't hold the lock while we setup; stat(2) might block */
1312     pthread_mutex_unlock(&cache_lock);
1313
1314     memset (&statbuf, 0, sizeof (statbuf));
1315     status = stat (file, &statbuf);
1316     if (status != 0)
1317     {
1318       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1319
1320       status = errno;
1321       if (status == ENOENT)
1322         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1323       else
1324         return send_response(sock, RESP_ERR,
1325                              "stat failed with error %i.\n", status);
1326     }
1327     if (!S_ISREG (statbuf.st_mode))
1328       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1329
1330     if (access(file, R_OK|W_OK) != 0)
1331       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1332                            file, rrd_strerror(errno));
1333
1334     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1335     if (ci == NULL)
1336     {
1337       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1338
1339       return send_response(sock, RESP_ERR, "malloc failed.\n");
1340     }
1341     memset (ci, 0, sizeof (cache_item_t));
1342
1343     ci->file = strdup (file);
1344     if (ci->file == NULL)
1345     {
1346       free (ci);
1347       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1348
1349       return send_response(sock, RESP_ERR, "strdup failed.\n");
1350     }
1351
1352     wipe_ci_values(ci, now);
1353     ci->flags = CI_FLAGS_IN_TREE;
1354     pthread_cond_init(&ci->flushed, NULL);
1355
1356     pthread_mutex_lock(&cache_lock);
1357     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1358   } /* }}} */
1359   assert (ci != NULL);
1360
1361   /* don't re-write updates in replay mode */
1362   if (sock != NULL)
1363     journal_write("update", orig_buf);
1364
1365   while (buffer_size > 0)
1366   {
1367     char *value;
1368     time_t stamp;
1369     char *eostamp;
1370
1371     status = buffer_get_field (&buffer, &buffer_size, &value);
1372     if (status != 0)
1373     {
1374       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1375       break;
1376     }
1377
1378     /* make sure update time is always moving forward */
1379     stamp = strtol(value, &eostamp, 10);
1380     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1381     {
1382       pthread_mutex_unlock(&cache_lock);
1383       return send_response(sock, RESP_ERR,
1384                            "Cannot find timestamp in '%s'!\n", value);
1385     }
1386     else if (stamp <= ci->last_update_stamp)
1387     {
1388       pthread_mutex_unlock(&cache_lock);
1389       return send_response(sock, RESP_ERR,
1390                            "illegal attempt to update using time %ld when last"
1391                            " update time is %ld (minimum one second step)\n",
1392                            stamp, ci->last_update_stamp);
1393     }
1394     else
1395       ci->last_update_stamp = stamp;
1396
1397     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1398     {
1399       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1400       continue;
1401     }
1402
1403     values_num++;
1404   }
1405
1406   if (((now - ci->last_flush_time) >= config_write_interval)
1407       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1408       && (ci->values_num > 0))
1409   {
1410     enqueue_cache_item (ci, TAIL);
1411   }
1412
1413   pthread_mutex_unlock (&cache_lock);
1414
1415   if (values_num < 1)
1416     return send_response(sock, RESP_ERR, "No values updated.\n");
1417   else
1418     return send_response(sock, RESP_OK,
1419                          "errors, enqueued %i value(s).\n", values_num);
1420
1421   /* NOTREACHED */
1422   assert(1==0);
1423
1424 } /* }}} int handle_request_update */
1425
1426 /* we came across a "WROTE" entry during journal replay.
1427  * throw away any values that we have accumulated for this file
1428  */
1429 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1430 {
1431   cache_item_t *ci;
1432   const char *file = buffer;
1433
1434   pthread_mutex_lock(&cache_lock);
1435
1436   ci = g_tree_lookup(cache_tree, file);
1437   if (ci == NULL)
1438   {
1439     pthread_mutex_unlock(&cache_lock);
1440     return (0);
1441   }
1442
1443   if (ci->values)
1444     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1445
1446   wipe_ci_values(ci, now);
1447   remove_from_queue(ci);
1448
1449   pthread_mutex_unlock(&cache_lock);
1450   return (0);
1451 } /* }}} int handle_request_wrote */
1452
1453 /* start "BATCH" processing */
1454 static int batch_start (HANDLER_PROTO) /* {{{ */
1455 {
1456   int status;
1457   if (sock->batch_start)
1458     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1459
1460   status = send_response(sock, RESP_OK,
1461                          "Go ahead.  End with dot '.' on its own line.\n");
1462   sock->batch_start = time(NULL);
1463   sock->batch_cmd = 0;
1464
1465   return status;
1466 } /* }}} static int batch_start */
1467
1468 /* finish "BATCH" processing and return results to the client */
1469 static int batch_done (HANDLER_PROTO) /* {{{ */
1470 {
1471   assert(sock->batch_start);
1472   sock->batch_start = 0;
1473   sock->batch_cmd  = 0;
1474   return send_response(sock, RESP_OK, "errors\n");
1475 } /* }}} static int batch_done */
1476
1477 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1478 {
1479   return -1;
1480 } /* }}} static int handle_request_quit */
1481
1482 struct command COMMANDS[] = {
1483   {
1484     "UPDATE",
1485     handle_request_update,
1486     PRIV_HIGH,
1487     CMD_CONTEXT_ANY,
1488     "UPDATE <filename> <values> [<values> ...]\n"
1489     ,
1490     "Adds the given file to the internal cache if it is not yet known and\n"
1491     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1492     "for details.\n"
1493     "\n"
1494     "Each <values> has the following form:\n"
1495     "  <values> = <time>:<value>[:<value>[...]]\n"
1496     "See the rrdupdate(1) manpage for details.\n"
1497   },
1498   {
1499     "WROTE",
1500     handle_request_wrote,
1501     PRIV_HIGH,
1502     CMD_CONTEXT_JOURNAL,
1503     NULL,
1504     NULL
1505   },
1506   {
1507     "FLUSH",
1508     handle_request_flush,
1509     PRIV_LOW,
1510     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1511     "FLUSH <filename>\n"
1512     ,
1513     "Adds the given filename to the head of the update queue and returns\n"
1514     "after it has been dequeued.\n"
1515   },
1516   {
1517     "FLUSHALL",
1518     handle_request_flushall,
1519     PRIV_HIGH,
1520     CMD_CONTEXT_CLIENT,
1521     "FLUSHALL\n"
1522     ,
1523     "Triggers writing of all pending updates.  Returns immediately.\n"
1524   },
1525   {
1526     "PENDING",
1527     handle_request_pending,
1528     PRIV_HIGH,
1529     CMD_CONTEXT_CLIENT,
1530     "PENDING <filename>\n"
1531     ,
1532     "Shows any 'pending' updates for a file, in order.\n"
1533     "The updates shown have not yet been written to the underlying RRD file.\n"
1534   },
1535   {
1536     "FORGET",
1537     handle_request_forget,
1538     PRIV_HIGH,
1539     CMD_CONTEXT_ANY,
1540     "FORGET <filename>\n"
1541     ,
1542     "Removes the file completely from the cache.\n"
1543     "Any pending updates for the file will be lost.\n"
1544   },
1545   {
1546     "QUEUE",
1547     handle_request_queue,
1548     PRIV_LOW,
1549     CMD_CONTEXT_CLIENT,
1550     "QUEUE\n"
1551     ,
1552         "Shows all files in the output queue.\n"
1553     "The output is zero or more lines in the following format:\n"
1554     "(where <num_vals> is the number of values to be written)\n"
1555     "\n"
1556     "<num_vals> <filename>\n"
1557   },
1558   {
1559     "STATS",
1560     handle_request_stats,
1561     PRIV_LOW,
1562     CMD_CONTEXT_CLIENT,
1563     "STATS\n"
1564     ,
1565     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1566     "a description of the values.\n"
1567   },
1568   {
1569     "HELP",
1570     handle_request_help,
1571     PRIV_LOW,
1572     CMD_CONTEXT_CLIENT,
1573     "HELP [<command>]\n",
1574     NULL, /* special! */
1575   },
1576   {
1577     "BATCH",
1578     batch_start,
1579     PRIV_LOW,
1580     CMD_CONTEXT_CLIENT,
1581     "BATCH\n"
1582     ,
1583     "The 'BATCH' command permits the client to initiate a bulk load\n"
1584     "   of commands to rrdcached.\n"
1585     "\n"
1586     "Usage:\n"
1587     "\n"
1588     "    client: BATCH\n"
1589     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1590     "    client: command #1\n"
1591     "    client: command #2\n"
1592     "    client: ... and so on\n"
1593     "    client: .\n"
1594     "    server: 2 errors\n"
1595     "    server: 7 message for command #7\n"
1596     "    server: 9 message for command #9\n"
1597     "\n"
1598     "For more information, consult the rrdcached(1) documentation.\n"
1599   },
1600   {
1601     ".",   /* BATCH terminator */
1602     batch_done,
1603     PRIV_LOW,
1604     CMD_CONTEXT_BATCH,
1605     NULL,
1606     NULL
1607   },
1608   {
1609     "QUIT",
1610     handle_request_quit,
1611     PRIV_LOW,
1612     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1613     "QUIT\n"
1614     ,
1615     "Disconnect from rrdcached.\n"
1616   },
1617   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1618 };
1619
1620 static struct command *find_command(char *cmd)
1621 {
1622   struct command *c = COMMANDS;
1623
1624   while (c->cmd != NULL)
1625   {
1626     if (strcasecmp(cmd, c->cmd) == 0)
1627       break;
1628     c++;
1629   }
1630
1631   if (c->cmd == NULL)
1632     return NULL;
1633   else
1634     return c;
1635 }
1636
1637 /* check whether commands are received in the expected context */
1638 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1639 {
1640   if (sock == NULL)
1641     return (cmd->context & CMD_CONTEXT_JOURNAL);
1642   else if (sock->batch_start)
1643     return (cmd->context & CMD_CONTEXT_BATCH);
1644   else
1645     return (cmd->context & CMD_CONTEXT_CLIENT);
1646
1647   /* NOTREACHED */
1648   assert(1==0);
1649 }
1650
1651 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1652 {
1653   int status;
1654   char *cmd_str;
1655   char *resp_txt;
1656   struct command *help = NULL;
1657
1658   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1659   if (status == 0)
1660     help = find_command(cmd_str);
1661
1662   if (help && (help->syntax || help->help))
1663   {
1664     char tmp[CMD_MAX];
1665
1666     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1667     resp_txt = tmp;
1668
1669     if (help->syntax)
1670       add_response_info(sock, "Usage: %s\n", help->syntax);
1671
1672     if (help->help)
1673       add_response_info(sock, "%s\n", help->help);
1674   }
1675   else
1676   {
1677     help = COMMANDS;
1678     resp_txt = "Command overview\n";
1679
1680     while (help->cmd)
1681     {
1682       if (help->syntax)
1683         add_response_info(sock, "%s", help->syntax);
1684       help++;
1685     }
1686   }
1687
1688   return send_response(sock, RESP_OK, resp_txt);
1689 } /* }}} int handle_request_help */
1690
1691 /* if sock==NULL, we are in journal replay mode */
1692 static int handle_request (DISPATCH_PROTO) /* {{{ */
1693 {
1694   char *buffer_ptr = buffer;
1695   char *cmd_str = NULL;
1696   struct command *cmd = NULL;
1697   int status;
1698
1699   assert (buffer[buffer_size - 1] == '\0');
1700
1701   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1702   if (status != 0)
1703   {
1704     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1705     return (-1);
1706   }
1707
1708   if (sock != NULL && sock->batch_start)
1709     sock->batch_cmd++;
1710
1711   cmd = find_command(cmd_str);
1712   if (!cmd)
1713     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1714
1715   status = has_privilege(sock, cmd->min_priv);
1716   if (status <= 0)
1717     return status;
1718
1719   if (!command_check_context(sock, cmd))
1720     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1721
1722   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1723 } /* }}} int handle_request */
1724
1725 /* MUST NOT hold journal_lock before calling this */
1726 static void journal_rotate(void) /* {{{ */
1727 {
1728   FILE *old_fh = NULL;
1729   int new_fd;
1730
1731   if (journal_cur == NULL || journal_old == NULL)
1732     return;
1733
1734   pthread_mutex_lock(&journal_lock);
1735
1736   /* we rotate this way (rename before close) so that the we can release
1737    * the journal lock as fast as possible.  Journal writes to the new
1738    * journal can proceed immediately after the new file is opened.  The
1739    * fclose can then block without affecting new updates.
1740    */
1741   if (journal_fh != NULL)
1742   {
1743     old_fh = journal_fh;
1744     journal_fh = NULL;
1745     rename(journal_cur, journal_old);
1746     ++stats_journal_rotate;
1747   }
1748
1749   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1750                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1751   if (new_fd >= 0)
1752   {
1753     journal_fh = fdopen(new_fd, "a");
1754     if (journal_fh == NULL)
1755       close(new_fd);
1756   }
1757
1758   pthread_mutex_unlock(&journal_lock);
1759
1760   if (old_fh != NULL)
1761     fclose(old_fh);
1762
1763   if (journal_fh == NULL)
1764   {
1765     RRDD_LOG(LOG_CRIT,
1766              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1767              journal_cur, rrd_strerror(errno));
1768
1769     RRDD_LOG(LOG_ERR,
1770              "JOURNALING DISABLED: All values will be flushed at shutdown");
1771     config_flush_at_shutdown = 1;
1772   }
1773
1774 } /* }}} static void journal_rotate */
1775
1776 static void journal_done(void) /* {{{ */
1777 {
1778   if (journal_cur == NULL)
1779     return;
1780
1781   pthread_mutex_lock(&journal_lock);
1782   if (journal_fh != NULL)
1783   {
1784     fclose(journal_fh);
1785     journal_fh = NULL;
1786   }
1787
1788   if (config_flush_at_shutdown)
1789   {
1790     RRDD_LOG(LOG_INFO, "removing journals");
1791     unlink(journal_old);
1792     unlink(journal_cur);
1793   }
1794   else
1795   {
1796     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1797              "journals will be used at next startup");
1798   }
1799
1800   pthread_mutex_unlock(&journal_lock);
1801
1802 } /* }}} static void journal_done */
1803
1804 static int journal_write(char *cmd, char *args) /* {{{ */
1805 {
1806   int chars;
1807
1808   if (journal_fh == NULL)
1809     return 0;
1810
1811   pthread_mutex_lock(&journal_lock);
1812   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1813   pthread_mutex_unlock(&journal_lock);
1814
1815   if (chars > 0)
1816   {
1817     pthread_mutex_lock(&stats_lock);
1818     stats_journal_bytes += chars;
1819     pthread_mutex_unlock(&stats_lock);
1820   }
1821
1822   return chars;
1823 } /* }}} static int journal_write */
1824
1825 static int journal_replay (const char *file) /* {{{ */
1826 {
1827   FILE *fh;
1828   int entry_cnt = 0;
1829   int fail_cnt = 0;
1830   uint64_t line = 0;
1831   char entry[CMD_MAX];
1832   time_t now;
1833
1834   if (file == NULL) return 0;
1835
1836   {
1837     char *reason = "unknown error";
1838     int status = 0;
1839     struct stat statbuf;
1840
1841     memset(&statbuf, 0, sizeof(statbuf));
1842     if (stat(file, &statbuf) != 0)
1843     {
1844       if (errno == ENOENT)
1845         return 0;
1846
1847       reason = "stat error";
1848       status = errno;
1849     }
1850     else if (!S_ISREG(statbuf.st_mode))
1851     {
1852       reason = "not a regular file";
1853       status = EPERM;
1854     }
1855     if (statbuf.st_uid != daemon_uid)
1856     {
1857       reason = "not owned by daemon user";
1858       status = EACCES;
1859     }
1860     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1861     {
1862       reason = "must not be user/group writable";
1863       status = EACCES;
1864     }
1865
1866     if (status != 0)
1867     {
1868       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1869                file, rrd_strerror(status), reason);
1870       return 0;
1871     }
1872   }
1873
1874   fh = fopen(file, "r");
1875   if (fh == NULL)
1876   {
1877     if (errno != ENOENT)
1878       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1879                file, rrd_strerror(errno));
1880     return 0;
1881   }
1882   else
1883     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1884
1885   now = time(NULL);
1886
1887   while(!feof(fh))
1888   {
1889     size_t entry_len;
1890
1891     ++line;
1892     if (fgets(entry, sizeof(entry), fh) == NULL)
1893       break;
1894     entry_len = strlen(entry);
1895
1896     /* check \n termination in case journal writing crashed mid-line */
1897     if (entry_len == 0)
1898       continue;
1899     else if (entry[entry_len - 1] != '\n')
1900     {
1901       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1902       ++fail_cnt;
1903       continue;
1904     }
1905
1906     entry[entry_len - 1] = '\0';
1907
1908     if (handle_request(NULL, now, entry, entry_len) == 0)
1909       ++entry_cnt;
1910     else
1911       ++fail_cnt;
1912   }
1913
1914   fclose(fh);
1915
1916   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1917            entry_cnt, fail_cnt);
1918
1919   return entry_cnt > 0 ? 1 : 0;
1920 } /* }}} static int journal_replay */
1921
1922 static void journal_init(void) /* {{{ */
1923 {
1924   int had_journal = 0;
1925
1926   if (journal_cur == NULL) return;
1927
1928   pthread_mutex_lock(&journal_lock);
1929
1930   RRDD_LOG(LOG_INFO, "checking for journal files");
1931
1932   had_journal += journal_replay(journal_old);
1933   had_journal += journal_replay(journal_cur);
1934
1935   /* it must have been a crash.  start a flush */
1936   if (had_journal && config_flush_at_shutdown)
1937     flush_old_values(-1);
1938
1939   pthread_mutex_unlock(&journal_lock);
1940   journal_rotate();
1941
1942   RRDD_LOG(LOG_INFO, "journal processing complete");
1943
1944 } /* }}} static void journal_init */
1945
1946 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1947 {
1948   assert(sock != NULL);
1949
1950   free(sock->rbuf);  sock->rbuf = NULL;
1951   free(sock->wbuf);  sock->wbuf = NULL;
1952   free(sock);
1953 } /* }}} void free_listen_socket */
1954
1955 static void close_connection(listen_socket_t *sock) /* {{{ */
1956 {
1957   if (sock->fd >= 0)
1958   {
1959     close(sock->fd);
1960     sock->fd = -1;
1961   }
1962
1963   free_listen_socket(sock);
1964
1965 } /* }}} void close_connection */
1966
1967 static void *connection_thread_main (void *args) /* {{{ */
1968 {
1969   listen_socket_t *sock;
1970   int fd;
1971
1972   sock = (listen_socket_t *) args;
1973   fd = sock->fd;
1974
1975   /* init read buffers */
1976   sock->next_read = sock->next_cmd = 0;
1977   sock->rbuf = malloc(RBUF_SIZE);
1978   if (sock->rbuf == NULL)
1979   {
1980     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1981     close_connection(sock);
1982     return NULL;
1983   }
1984
1985   pthread_mutex_lock (&connection_threads_lock);
1986   connection_threads_num++;
1987   pthread_mutex_unlock (&connection_threads_lock);
1988
1989   while (do_shutdown == 0)
1990   {
1991     char *cmd;
1992     ssize_t cmd_len;
1993     ssize_t rbytes;
1994     time_t now;
1995
1996     struct pollfd pollfd;
1997     int status;
1998
1999     pollfd.fd = fd;
2000     pollfd.events = POLLIN | POLLPRI;
2001     pollfd.revents = 0;
2002
2003     status = poll (&pollfd, 1, /* timeout = */ 500);
2004     if (do_shutdown)
2005       break;
2006     else if (status == 0) /* timeout */
2007       continue;
2008     else if (status < 0) /* error */
2009     {
2010       status = errno;
2011       if (status != EINTR)
2012         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2013       continue;
2014     }
2015
2016     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2017       break;
2018     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2019     {
2020       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2021           "poll(2) returned something unexpected: %#04hx",
2022           pollfd.revents);
2023       break;
2024     }
2025
2026     rbytes = read(fd, sock->rbuf + sock->next_read,
2027                   RBUF_SIZE - sock->next_read);
2028     if (rbytes < 0)
2029     {
2030       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2031       break;
2032     }
2033     else if (rbytes == 0)
2034       break; /* eof */
2035
2036     sock->next_read += rbytes;
2037
2038     if (sock->batch_start)
2039       now = sock->batch_start;
2040     else
2041       now = time(NULL);
2042
2043     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2044     {
2045       status = handle_request (sock, now, cmd, cmd_len+1);
2046       if (status != 0)
2047         goto out_close;
2048     }
2049   }
2050
2051 out_close:
2052   close_connection(sock);
2053
2054   /* Remove this thread from the connection threads list */
2055   pthread_mutex_lock (&connection_threads_lock);
2056   connection_threads_num--;
2057   if (connection_threads_num <= 0)
2058     pthread_cond_broadcast(&connection_threads_done);
2059   pthread_mutex_unlock (&connection_threads_lock);
2060
2061   return (NULL);
2062 } /* }}} void *connection_thread_main */
2063
2064 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2065 {
2066   int fd;
2067   struct sockaddr_un sa;
2068   listen_socket_t *temp;
2069   int status;
2070   const char *path;
2071
2072   path = sock->addr;
2073   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2074     path += strlen("unix:");
2075
2076   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2077       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2078   if (temp == NULL)
2079   {
2080     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2081     return (-1);
2082   }
2083   listen_fds = temp;
2084   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2085
2086   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2087   if (fd < 0)
2088   {
2089     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2090              rrd_strerror(errno));
2091     return (-1);
2092   }
2093
2094   memset (&sa, 0, sizeof (sa));
2095   sa.sun_family = AF_UNIX;
2096   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2097
2098   /* if we've gotten this far, we own the pid file.  any daemon started
2099    * with the same args must not be alive.  therefore, ensure that we can
2100    * create the socket...
2101    */
2102   unlink(path);
2103
2104   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2105   if (status != 0)
2106   {
2107     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2108              path, rrd_strerror(errno));
2109     close (fd);
2110     return (-1);
2111   }
2112
2113   status = listen (fd, /* backlog = */ 10);
2114   if (status != 0)
2115   {
2116     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2117              path, rrd_strerror(errno));
2118     close (fd);
2119     unlink (path);
2120     return (-1);
2121   }
2122
2123   listen_fds[listen_fds_num].fd = fd;
2124   listen_fds[listen_fds_num].family = PF_UNIX;
2125   strncpy(listen_fds[listen_fds_num].addr, path,
2126           sizeof (listen_fds[listen_fds_num].addr) - 1);
2127   listen_fds_num++;
2128
2129   return (0);
2130 } /* }}} int open_listen_socket_unix */
2131
2132 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2133 {
2134   struct addrinfo ai_hints;
2135   struct addrinfo *ai_res;
2136   struct addrinfo *ai_ptr;
2137   char addr_copy[NI_MAXHOST];
2138   char *addr;
2139   char *port;
2140   int status;
2141
2142   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2143   addr_copy[sizeof (addr_copy) - 1] = 0;
2144   addr = addr_copy;
2145
2146   memset (&ai_hints, 0, sizeof (ai_hints));
2147   ai_hints.ai_flags = 0;
2148 #ifdef AI_ADDRCONFIG
2149   ai_hints.ai_flags |= AI_ADDRCONFIG;
2150 #endif
2151   ai_hints.ai_family = AF_UNSPEC;
2152   ai_hints.ai_socktype = SOCK_STREAM;
2153
2154   port = NULL;
2155   if (*addr == '[') /* IPv6+port format */
2156   {
2157     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2158     addr++;
2159
2160     port = strchr (addr, ']');
2161     if (port == NULL)
2162     {
2163       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2164       return (-1);
2165     }
2166     *port = 0;
2167     port++;
2168
2169     if (*port == ':')
2170       port++;
2171     else if (*port == 0)
2172       port = NULL;
2173     else
2174     {
2175       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2176       return (-1);
2177     }
2178   } /* if (*addr = ']') */
2179   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2180   {
2181     port = rindex(addr, ':');
2182     if (port != NULL)
2183     {
2184       *port = 0;
2185       port++;
2186     }
2187   }
2188   ai_res = NULL;
2189   status = getaddrinfo (addr,
2190                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2191                         &ai_hints, &ai_res);
2192   if (status != 0)
2193   {
2194     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2195              addr, gai_strerror (status));
2196     return (-1);
2197   }
2198
2199   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2200   {
2201     int fd;
2202     listen_socket_t *temp;
2203     int one = 1;
2204
2205     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2206         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2207     if (temp == NULL)
2208     {
2209       fprintf (stderr,
2210                "rrdcached: open_listen_socket_network: realloc failed.\n");
2211       continue;
2212     }
2213     listen_fds = temp;
2214     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2215
2216     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2217     if (fd < 0)
2218     {
2219       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2220                rrd_strerror(errno));
2221       continue;
2222     }
2223
2224     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2225
2226     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2227     if (status != 0)
2228     {
2229       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2230                sock->addr, rrd_strerror(errno));
2231       close (fd);
2232       continue;
2233     }
2234
2235     status = listen (fd, /* backlog = */ 10);
2236     if (status != 0)
2237     {
2238       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2239                sock->addr, rrd_strerror(errno));
2240       close (fd);
2241       freeaddrinfo(ai_res);
2242       return (-1);
2243     }
2244
2245     listen_fds[listen_fds_num].fd = fd;
2246     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2247     listen_fds_num++;
2248   } /* for (ai_ptr) */
2249
2250   freeaddrinfo(ai_res);
2251   return (0);
2252 } /* }}} static int open_listen_socket_network */
2253
2254 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2255 {
2256   assert(sock != NULL);
2257   assert(sock->addr != NULL);
2258
2259   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2260       || sock->addr[0] == '/')
2261     return (open_listen_socket_unix(sock));
2262   else
2263     return (open_listen_socket_network(sock));
2264 } /* }}} int open_listen_socket */
2265
2266 static int close_listen_sockets (void) /* {{{ */
2267 {
2268   size_t i;
2269
2270   for (i = 0; i < listen_fds_num; i++)
2271   {
2272     close (listen_fds[i].fd);
2273
2274     if (listen_fds[i].family == PF_UNIX)
2275       unlink(listen_fds[i].addr);
2276   }
2277
2278   free (listen_fds);
2279   listen_fds = NULL;
2280   listen_fds_num = 0;
2281
2282   return (0);
2283 } /* }}} int close_listen_sockets */
2284
2285 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2286 {
2287   struct pollfd *pollfds;
2288   int pollfds_num;
2289   int status;
2290   int i;
2291
2292   if (listen_fds_num < 1)
2293   {
2294     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2295     return (NULL);
2296   }
2297
2298   pollfds_num = listen_fds_num;
2299   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2300   if (pollfds == NULL)
2301   {
2302     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2303     return (NULL);
2304   }
2305   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2306
2307   RRDD_LOG(LOG_INFO, "listening for connections");
2308
2309   while (do_shutdown == 0)
2310   {
2311     for (i = 0; i < pollfds_num; i++)
2312     {
2313       pollfds[i].fd = listen_fds[i].fd;
2314       pollfds[i].events = POLLIN | POLLPRI;
2315       pollfds[i].revents = 0;
2316     }
2317
2318     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2319     if (do_shutdown)
2320       break;
2321     else if (status == 0) /* timeout */
2322       continue;
2323     else if (status < 0) /* error */
2324     {
2325       status = errno;
2326       if (status != EINTR)
2327       {
2328         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2329       }
2330       continue;
2331     }
2332
2333     for (i = 0; i < pollfds_num; i++)
2334     {
2335       listen_socket_t *client_sock;
2336       struct sockaddr_storage client_sa;
2337       socklen_t client_sa_size;
2338       pthread_t tid;
2339       pthread_attr_t attr;
2340
2341       if (pollfds[i].revents == 0)
2342         continue;
2343
2344       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2345       {
2346         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2347             "poll(2) returned something unexpected for listen FD #%i.",
2348             pollfds[i].fd);
2349         continue;
2350       }
2351
2352       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2353       if (client_sock == NULL)
2354       {
2355         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2356         continue;
2357       }
2358       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2359
2360       client_sa_size = sizeof (client_sa);
2361       client_sock->fd = accept (pollfds[i].fd,
2362           (struct sockaddr *) &client_sa, &client_sa_size);
2363       if (client_sock->fd < 0)
2364       {
2365         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2366         free(client_sock);
2367         continue;
2368       }
2369
2370       pthread_attr_init (&attr);
2371       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2372
2373       status = pthread_create (&tid, &attr, connection_thread_main,
2374                                client_sock);
2375       if (status != 0)
2376       {
2377         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2378         close_connection(client_sock);
2379         continue;
2380       }
2381     } /* for (pollfds_num) */
2382   } /* while (do_shutdown == 0) */
2383
2384   RRDD_LOG(LOG_INFO, "starting shutdown");
2385
2386   close_listen_sockets ();
2387
2388   pthread_mutex_lock (&connection_threads_lock);
2389   while (connection_threads_num > 0)
2390     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2391   pthread_mutex_unlock (&connection_threads_lock);
2392
2393   free(pollfds);
2394
2395   return (NULL);
2396 } /* }}} void *listen_thread_main */
2397
2398 static int daemonize (void) /* {{{ */
2399 {
2400   int pid_fd;
2401   char *base_dir;
2402
2403   daemon_uid = geteuid();
2404
2405   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2406   if (pid_fd < 0)
2407     pid_fd = check_pidfile();
2408   if (pid_fd < 0)
2409     return pid_fd;
2410
2411   /* open all the listen sockets */
2412   if (config_listen_address_list_len > 0)
2413   {
2414     for (size_t i = 0; i < config_listen_address_list_len; i++)
2415       open_listen_socket (config_listen_address_list[i]);
2416
2417     rrd_free_ptrs((void ***) &config_listen_address_list,
2418                   &config_listen_address_list_len);
2419   }
2420   else
2421   {
2422     listen_socket_t sock;
2423     memset(&sock, 0, sizeof(sock));
2424     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2425     open_listen_socket (&sock);
2426   }
2427
2428   if (listen_fds_num < 1)
2429   {
2430     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2431     goto error;
2432   }
2433
2434   if (!stay_foreground)
2435   {
2436     pid_t child;
2437
2438     child = fork ();
2439     if (child < 0)
2440     {
2441       fprintf (stderr, "daemonize: fork(2) failed.\n");
2442       goto error;
2443     }
2444     else if (child > 0)
2445       exit(0);
2446
2447     /* Become session leader */
2448     setsid ();
2449
2450     /* Open the first three file descriptors to /dev/null */
2451     close (2);
2452     close (1);
2453     close (0);
2454
2455     open ("/dev/null", O_RDWR);
2456     if (dup(0) == -1 || dup(0) == -1){
2457         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2458     }
2459   } /* if (!stay_foreground) */
2460
2461   /* Change into the /tmp directory. */
2462   base_dir = (config_base_dir != NULL)
2463     ? config_base_dir
2464     : "/tmp";
2465
2466   if (chdir (base_dir) != 0)
2467   {
2468     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2469     goto error;
2470   }
2471
2472   install_signal_handlers();
2473
2474   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2475   RRDD_LOG(LOG_INFO, "starting up");
2476
2477   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2478                                 (GDestroyNotify) free_cache_item);
2479   if (cache_tree == NULL)
2480   {
2481     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2482     goto error;
2483   }
2484
2485   return write_pidfile (pid_fd);
2486
2487 error:
2488   remove_pidfile();
2489   return -1;
2490 } /* }}} int daemonize */
2491
2492 static int cleanup (void) /* {{{ */
2493 {
2494   do_shutdown++;
2495
2496   pthread_cond_broadcast (&flush_cond);
2497   pthread_join (flush_thread, NULL);
2498
2499   pthread_cond_broadcast (&queue_cond);
2500   for (int i = 0; i < config_queue_threads; i++)
2501     pthread_join (queue_threads[i], NULL);
2502
2503   if (config_flush_at_shutdown)
2504   {
2505     assert(cache_queue_head == NULL);
2506     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2507   }
2508
2509   journal_done();
2510   remove_pidfile ();
2511
2512   free(queue_threads);
2513   free(config_base_dir);
2514   free(config_pid_file);
2515   free(journal_cur);
2516   free(journal_old);
2517
2518   pthread_mutex_lock(&cache_lock);
2519   g_tree_destroy(cache_tree);
2520
2521   RRDD_LOG(LOG_INFO, "goodbye");
2522   closelog ();
2523
2524   return (0);
2525 } /* }}} int cleanup */
2526
2527 static int read_options (int argc, char **argv) /* {{{ */
2528 {
2529   int option;
2530   int status = 0;
2531
2532   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2533   {
2534     switch (option)
2535     {
2536       case 'g':
2537         stay_foreground=1;
2538         break;
2539
2540       case 'L':
2541       case 'l':
2542       {
2543         listen_socket_t *new;
2544
2545         new = malloc(sizeof(listen_socket_t));
2546         if (new == NULL)
2547         {
2548           fprintf(stderr, "read_options: malloc failed.\n");
2549           return(2);
2550         }
2551         memset(new, 0, sizeof(listen_socket_t));
2552
2553         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2554         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2555
2556         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2557                          &config_listen_address_list_len, new))
2558         {
2559           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2560           return (2);
2561         }
2562       }
2563       break;
2564
2565       case 'f':
2566       {
2567         int temp;
2568
2569         temp = atoi (optarg);
2570         if (temp > 0)
2571           config_flush_interval = temp;
2572         else
2573         {
2574           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2575           status = 3;
2576         }
2577       }
2578       break;
2579
2580       case 'w':
2581       {
2582         int temp;
2583
2584         temp = atoi (optarg);
2585         if (temp > 0)
2586           config_write_interval = temp;
2587         else
2588         {
2589           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2590           status = 2;
2591         }
2592       }
2593       break;
2594
2595       case 'z':
2596       {
2597         int temp;
2598
2599         temp = atoi(optarg);
2600         if (temp > 0)
2601           config_write_jitter = temp;
2602         else
2603         {
2604           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2605           status = 2;
2606         }
2607
2608         break;
2609       }
2610
2611       case 't':
2612       {
2613         int threads;
2614         threads = atoi(optarg);
2615         if (threads >= 1)
2616           config_queue_threads = threads;
2617         else
2618         {
2619           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2620           return 1;
2621         }
2622       }
2623       break;
2624
2625       case 'B':
2626         config_write_base_only = 1;
2627         break;
2628
2629       case 'b':
2630       {
2631         size_t len;
2632         char base_realpath[PATH_MAX];
2633
2634         if (config_base_dir != NULL)
2635           free (config_base_dir);
2636         config_base_dir = strdup (optarg);
2637         if (config_base_dir == NULL)
2638         {
2639           fprintf (stderr, "read_options: strdup failed.\n");
2640           return (3);
2641         }
2642
2643         /* make sure that the base directory is not resolved via
2644          * symbolic links.  this makes some performance-enhancing
2645          * assumptions possible (we don't have to resolve paths
2646          * that start with a "/")
2647          */
2648         if (realpath(config_base_dir, base_realpath) == NULL)
2649         {
2650           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2651           return 5;
2652         }
2653         else if (strncmp(config_base_dir,
2654                          base_realpath, sizeof(base_realpath)) != 0)
2655         {
2656           fprintf(stderr,
2657                   "Base directory (-b) resolved via file system links!\n"
2658                   "Please consult rrdcached '-b' documentation!\n"
2659                   "Consider specifying the real directory (%s)\n",
2660                   base_realpath);
2661           return 5;
2662         }
2663
2664         len = strlen (config_base_dir);
2665         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2666         {
2667           config_base_dir[len - 1] = 0;
2668           len--;
2669         }
2670
2671         if (len < 1)
2672         {
2673           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2674           return (4);
2675         }
2676
2677         _config_base_dir_len = len;
2678       }
2679       break;
2680
2681       case 'p':
2682       {
2683         if (config_pid_file != NULL)
2684           free (config_pid_file);
2685         config_pid_file = strdup (optarg);
2686         if (config_pid_file == NULL)
2687         {
2688           fprintf (stderr, "read_options: strdup failed.\n");
2689           return (3);
2690         }
2691       }
2692       break;
2693
2694       case 'F':
2695         config_flush_at_shutdown = 1;
2696         break;
2697
2698       case 'j':
2699       {
2700         struct stat statbuf;
2701         const char *dir = optarg;
2702
2703         status = stat(dir, &statbuf);
2704         if (status != 0)
2705         {
2706           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2707           return 6;
2708         }
2709
2710         if (!S_ISDIR(statbuf.st_mode)
2711             || access(dir, R_OK|W_OK|X_OK) != 0)
2712         {
2713           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2714                   errno ? rrd_strerror(errno) : "");
2715           return 6;
2716         }
2717
2718         journal_cur = malloc(PATH_MAX + 1);
2719         journal_old = malloc(PATH_MAX + 1);
2720         if (journal_cur == NULL || journal_old == NULL)
2721         {
2722           fprintf(stderr, "malloc failure for journal files\n");
2723           return 6;
2724         }
2725         else 
2726         {
2727           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2728           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2729         }
2730       }
2731       break;
2732
2733       case 'h':
2734       case '?':
2735         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2736             "\n"
2737             "Usage: rrdcached [options]\n"
2738             "\n"
2739             "Valid options are:\n"
2740             "  -l <address>  Socket address to listen to.\n"
2741             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2742             "  -w <seconds>  Interval in which to write data.\n"
2743             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2744             "  -t <threads>  Number of write threads.\n"
2745             "  -f <seconds>  Interval in which to flush dead data.\n"
2746             "  -p <file>     Location of the PID-file.\n"
2747             "  -b <dir>      Base directory to change to.\n"
2748             "  -B            Restrict file access to paths within -b <dir>\n"
2749             "  -g            Do not fork and run in the foreground.\n"
2750             "  -j <dir>      Directory in which to create the journal files.\n"
2751             "  -F            Always flush all updates at shutdown\n"
2752             "\n"
2753             "For more information and a detailed description of all options "
2754             "please refer\n"
2755             "to the rrdcached(1) manual page.\n",
2756             VERSION);
2757         status = -1;
2758         break;
2759     } /* switch (option) */
2760   } /* while (getopt) */
2761
2762   /* advise the user when values are not sane */
2763   if (config_flush_interval < 2 * config_write_interval)
2764     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2765             " 2x write interval (-w) !\n");
2766   if (config_write_jitter > config_write_interval)
2767     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2768             " write interval (-w) !\n");
2769
2770   if (config_write_base_only && config_base_dir == NULL)
2771     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2772             "  Consult the rrdcached documentation\n");
2773
2774   if (journal_cur == NULL)
2775     config_flush_at_shutdown = 1;
2776
2777   return (status);
2778 } /* }}} int read_options */
2779
2780 int main (int argc, char **argv)
2781 {
2782   int status;
2783
2784   status = read_options (argc, argv);
2785   if (status != 0)
2786   {
2787     if (status < 0)
2788       status = 0;
2789     return (status);
2790   }
2791
2792   status = daemonize ();
2793   if (status != 0)
2794   {
2795     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2796     return (1);
2797   }
2798
2799   journal_init();
2800
2801   /* start the queue threads */
2802   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2803   if (queue_threads == NULL)
2804   {
2805     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2806     cleanup();
2807     return (1);
2808   }
2809   for (int i = 0; i < config_queue_threads; i++)
2810   {
2811     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2812     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2813     if (status != 0)
2814     {
2815       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2816       cleanup();
2817       return (1);
2818     }
2819   }
2820
2821   /* start the flush thread */
2822   memset(&flush_thread, 0, sizeof(flush_thread));
2823   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2824   if (status != 0)
2825   {
2826     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2827     cleanup();
2828     return (1);
2829   }
2830
2831   listen_thread_main (NULL);
2832   cleanup ();
2833
2834   return (0);
2835 } /* int main */
2836
2837 /*
2838  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2839  */