53904a331357b2d0a745bb625249191cb26f28c4
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
161
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
167
168   char *syntax;
169   char *help;
170 };
171
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
188
189 struct callback_flush_data_s
190 {
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
197
198 enum queue_side_e
199 {
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
204
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
208
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
214
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
217
218 static int do_shutdown = 0;
219
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
223
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
230
231 /* Cache stuff */
232 static GTree          *cache_tree = NULL;
233 static cache_item_t   *cache_queue_head = NULL;
234 static cache_item_t   *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
236
237 static int config_write_interval = 300;
238 static int config_write_jitter   = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
245
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
248
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
257
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
266
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
269
270 /* 
271  * Functions
272  */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276   do_shutdown++;
277   pthread_cond_broadcast(&flush_cond);
278   pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
280
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283   sig_common("INT");
284 } /* }}} void sig_int_handler */
285
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288   sig_common("TERM");
289 } /* }}} void sig_term_handler */
290
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293   config_flush_at_shutdown = 1;
294   sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
296
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299   config_flush_at_shutdown = 0;
300   sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
302
303 static void install_signal_handlers(void) /* {{{ */
304 {
305   /* These structures are static, because `sigaction' behaves weird if the are
306    * overwritten.. */
307   static struct sigaction sa_int;
308   static struct sigaction sa_term;
309   static struct sigaction sa_pipe;
310   static struct sigaction sa_usr1;
311   static struct sigaction sa_usr2;
312
313   /* Install signal handlers */
314   memset (&sa_int, 0, sizeof (sa_int));
315   sa_int.sa_handler = sig_int_handler;
316   sigaction (SIGINT, &sa_int, NULL);
317
318   memset (&sa_term, 0, sizeof (sa_term));
319   sa_term.sa_handler = sig_term_handler;
320   sigaction (SIGTERM, &sa_term, NULL);
321
322   memset (&sa_pipe, 0, sizeof (sa_pipe));
323   sa_pipe.sa_handler = SIG_IGN;
324   sigaction (SIGPIPE, &sa_pipe, NULL);
325
326   memset (&sa_pipe, 0, sizeof (sa_usr1));
327   sa_usr1.sa_handler = sig_usr1_handler;
328   sigaction (SIGUSR1, &sa_usr1, NULL);
329
330   memset (&sa_usr2, 0, sizeof (sa_usr2));
331   sa_usr2.sa_handler = sig_usr2_handler;
332   sigaction (SIGUSR2, &sa_usr2, NULL);
333
334 } /* }}} void install_signal_handlers */
335
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338   int fd;
339   char *file;
340
341   file = (config_pid_file != NULL)
342     ? config_pid_file
343     : LOCALSTATEDIR "/run/rrdcached.pid";
344
345   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346   if (fd < 0)
347     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348             action, file, rrd_strerror(errno));
349
350   return(fd);
351 } /* }}} static int open_pidfile */
352
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356   int pid_fd;
357   pid_t pid;
358   char pid_str[16];
359
360   pid_fd = open_pidfile("open", O_RDWR);
361   if (pid_fd < 0)
362     return pid_fd;
363
364   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365     return -1;
366
367   pid = atoi(pid_str);
368   if (pid <= 0)
369     return -1;
370
371   /* another running process that we can signal COULD be
372    * a competing rrdcached */
373   if (pid != getpid() && kill(pid, 0) == 0)
374   {
375     fprintf(stderr,
376             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377     close(pid_fd);
378     return -1;
379   }
380
381   lseek(pid_fd, 0, SEEK_SET);
382   ftruncate(pid_fd, 0);
383
384   fprintf(stderr,
385           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
386           "rrdcached: starting normally.\n", pid);
387
388   return pid_fd;
389 } /* }}} static int check_pidfile */
390
391 static int write_pidfile (int fd) /* {{{ */
392 {
393   pid_t pid;
394   FILE *fh;
395
396   pid = getpid ();
397
398   fh = fdopen (fd, "w");
399   if (fh == NULL)
400   {
401     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
402     close(fd);
403     return (-1);
404   }
405
406   fprintf (fh, "%i\n", (int) pid);
407   fclose (fh);
408
409   return (0);
410 } /* }}} int write_pidfile */
411
412 static int remove_pidfile (void) /* {{{ */
413 {
414   char *file;
415   int status;
416
417   file = (config_pid_file != NULL)
418     ? config_pid_file
419     : LOCALSTATEDIR "/run/rrdcached.pid";
420
421   status = unlink (file);
422   if (status == 0)
423     return (0);
424   return (errno);
425 } /* }}} int remove_pidfile */
426
427 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
428 {
429   char *eol;
430
431   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
432                sock->next_read - sock->next_cmd);
433
434   if (eol == NULL)
435   {
436     /* no commands left, move remainder back to front of rbuf */
437     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
438             sock->next_read - sock->next_cmd);
439     sock->next_read -= sock->next_cmd;
440     sock->next_cmd = 0;
441     *len = 0;
442     return NULL;
443   }
444   else
445   {
446     char *cmd = sock->rbuf + sock->next_cmd;
447     *eol = '\0';
448
449     sock->next_cmd = eol - sock->rbuf + 1;
450
451     if (eol > sock->rbuf && *(eol-1) == '\r')
452       *(--eol) = '\0'; /* handle "\r\n" EOL */
453
454     *len = eol - cmd;
455
456     return cmd;
457   }
458
459   /* NOTREACHED */
460   assert(1==0);
461 }
462
463 /* add the characters directly to the write buffer */
464 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
465 {
466   char *new_buf;
467
468   assert(sock != NULL);
469
470   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
471   if (new_buf == NULL)
472   {
473     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
474     return -1;
475   }
476
477   strncpy(new_buf + sock->wbuf_len, str, len + 1);
478
479   sock->wbuf = new_buf;
480   sock->wbuf_len += len;
481
482   return 0;
483 } /* }}} static int add_to_wbuf */
484
485 /* add the text to the "extra" info that's sent after the status line */
486 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
487 {
488   va_list argp;
489   char buffer[CMD_MAX];
490   int len;
491
492   if (sock == NULL) return 0; /* journal replay mode */
493   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
494
495   va_start(argp, fmt);
496 #ifdef HAVE_VSNPRINTF
497   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
498 #else
499   len = vsprintf(buffer, fmt, argp);
500 #endif
501   va_end(argp);
502   if (len < 0)
503   {
504     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
505     return -1;
506   }
507
508   return add_to_wbuf(sock, buffer, len);
509 } /* }}} static int add_response_info */
510
511 static int count_lines(char *str) /* {{{ */
512 {
513   int lines = 0;
514
515   if (str != NULL)
516   {
517     while ((str = strchr(str, '\n')) != NULL)
518     {
519       ++lines;
520       ++str;
521     }
522   }
523
524   return lines;
525 } /* }}} static int count_lines */
526
527 /* send the response back to the user.
528  * returns 0 on success, -1 on error
529  * write buffer is always zeroed after this call */
530 static int send_response (listen_socket_t *sock, response_code rc,
531                           char *fmt, ...) /* {{{ */
532 {
533   va_list argp;
534   char buffer[CMD_MAX];
535   int lines;
536   ssize_t wrote;
537   int rclen, len;
538
539   if (sock == NULL) return rc;  /* journal replay mode */
540
541   if (sock->batch_start)
542   {
543     if (rc == RESP_OK)
544       return rc; /* no response on success during BATCH */
545     lines = sock->batch_cmd;
546   }
547   else if (rc == RESP_OK)
548     lines = count_lines(sock->wbuf);
549   else
550     lines = -1;
551
552   rclen = sprintf(buffer, "%d ", lines);
553   va_start(argp, fmt);
554 #ifdef HAVE_VSNPRINTF
555   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
556 #else
557   len = vsprintf(buffer+rclen, fmt, argp);
558 #endif
559   va_end(argp);
560   if (len < 0)
561     return -1;
562
563   len += rclen;
564
565   /* append the result to the wbuf, don't write to the user */
566   if (sock->batch_start)
567     return add_to_wbuf(sock, buffer, len);
568
569   /* first write must be complete */
570   if (len != write(sock->fd, buffer, len))
571   {
572     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
573     return -1;
574   }
575
576   if (sock->wbuf != NULL && rc == RESP_OK)
577   {
578     wrote = 0;
579     while (wrote < sock->wbuf_len)
580     {
581       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
582       if (wb <= 0)
583       {
584         RRDD_LOG(LOG_INFO, "send_response: could not write results");
585         return -1;
586       }
587       wrote += wb;
588     }
589   }
590
591   free(sock->wbuf); sock->wbuf = NULL;
592   sock->wbuf_len = 0;
593
594   return 0;
595 } /* }}} */
596
597 static void wipe_ci_values(cache_item_t *ci, time_t when)
598 {
599   ci->values = NULL;
600   ci->values_num = 0;
601
602   ci->last_flush_time = when;
603   if (config_write_jitter > 0)
604     ci->last_flush_time += (rrd_random() % config_write_jitter);
605 }
606
607 /* remove_from_queue
608  * remove a "cache_item_t" item from the queue.
609  * must hold 'cache_lock' when calling this
610  */
611 static void remove_from_queue(cache_item_t *ci) /* {{{ */
612 {
613   if (ci == NULL) return;
614   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
615
616   if (ci->prev == NULL)
617     cache_queue_head = ci->next; /* reset head */
618   else
619     ci->prev->next = ci->next;
620
621   if (ci->next == NULL)
622     cache_queue_tail = ci->prev; /* reset the tail */
623   else
624     ci->next->prev = ci->prev;
625
626   ci->next = ci->prev = NULL;
627   ci->flags &= ~CI_FLAGS_IN_QUEUE;
628
629   pthread_mutex_lock (&stats_lock);
630   assert (stats_queue_length > 0);
631   stats_queue_length--;
632   pthread_mutex_unlock (&stats_lock);
633
634 } /* }}} static void remove_from_queue */
635
636 /* free the resources associated with the cache_item_t
637  * must hold cache_lock when calling this function
638  */
639 static void *free_cache_item(cache_item_t *ci) /* {{{ */
640 {
641   if (ci == NULL) return NULL;
642
643   remove_from_queue(ci);
644
645   for (size_t i=0; i < ci->values_num; i++)
646     free(ci->values[i]);
647
648   free (ci->values);
649   free (ci->file);
650
651   /* in case anyone is waiting */
652   pthread_cond_broadcast(&ci->flushed);
653
654   free (ci);
655
656   return NULL;
657 } /* }}} static void *free_cache_item */
658
659 /*
660  * enqueue_cache_item:
661  * `cache_lock' must be acquired before calling this function!
662  */
663 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
664     queue_side_t side)
665 {
666   if (ci == NULL)
667     return (-1);
668
669   if (ci->values_num == 0)
670     return (0);
671
672   if (side == HEAD)
673   {
674     if (cache_queue_head == ci)
675       return 0;
676
677     /* remove if further down in queue */
678     remove_from_queue(ci);
679
680     ci->prev = NULL;
681     ci->next = cache_queue_head;
682     if (ci->next != NULL)
683       ci->next->prev = ci;
684     cache_queue_head = ci;
685
686     if (cache_queue_tail == NULL)
687       cache_queue_tail = cache_queue_head;
688   }
689   else /* (side == TAIL) */
690   {
691     /* We don't move values back in the list.. */
692     if (ci->flags & CI_FLAGS_IN_QUEUE)
693       return (0);
694
695     assert (ci->next == NULL);
696     assert (ci->prev == NULL);
697
698     ci->prev = cache_queue_tail;
699
700     if (cache_queue_tail == NULL)
701       cache_queue_head = ci;
702     else
703       cache_queue_tail->next = ci;
704
705     cache_queue_tail = ci;
706   }
707
708   ci->flags |= CI_FLAGS_IN_QUEUE;
709
710   pthread_cond_signal(&queue_cond);
711   pthread_mutex_lock (&stats_lock);
712   stats_queue_length++;
713   pthread_mutex_unlock (&stats_lock);
714
715   return (0);
716 } /* }}} int enqueue_cache_item */
717
718 /*
719  * tree_callback_flush:
720  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
721  * while this is in progress.
722  */
723 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
724     gpointer data)
725 {
726   cache_item_t *ci;
727   callback_flush_data_t *cfd;
728
729   ci = (cache_item_t *) value;
730   cfd = (callback_flush_data_t *) data;
731
732   if (ci->flags & CI_FLAGS_IN_QUEUE)
733     return FALSE;
734
735   if ((ci->last_flush_time <= cfd->abs_timeout)
736       && (ci->values_num > 0))
737   {
738     enqueue_cache_item (ci, TAIL);
739   }
740   else if ((do_shutdown != 0)
741       && (ci->values_num > 0))
742   {
743     enqueue_cache_item (ci, TAIL);
744   }
745   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
746       && (ci->values_num <= 0))
747   {
748     assert ((char *) key == ci->file);
749     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
750     {
751       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
752       return (FALSE);
753     }
754   }
755
756   return (FALSE);
757 } /* }}} gboolean tree_callback_flush */
758
759 static int flush_old_values (int max_age)
760 {
761   callback_flush_data_t cfd;
762   size_t k;
763
764   memset (&cfd, 0, sizeof (cfd));
765   /* Pass the current time as user data so that we don't need to call
766    * `time' for each node. */
767   cfd.now = time (NULL);
768   cfd.keys = NULL;
769   cfd.keys_num = 0;
770
771   if (max_age > 0)
772     cfd.abs_timeout = cfd.now - max_age;
773   else
774     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
775
776   /* `tree_callback_flush' will return the keys of all values that haven't
777    * been touched in the last `config_flush_interval' seconds in `cfd'.
778    * The char*'s in this array point to the same memory as ci->file, so we
779    * don't need to free them separately. */
780   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
781
782   for (k = 0; k < cfd.keys_num; k++)
783   {
784     /* should never fail, since we have held the cache_lock
785      * the entire time */
786     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
787   }
788
789   if (cfd.keys != NULL)
790   {
791     free (cfd.keys);
792     cfd.keys = NULL;
793   }
794
795   return (0);
796 } /* int flush_old_values */
797
798 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
799 {
800   struct timeval now;
801   struct timespec next_flush;
802   int status;
803
804   gettimeofday (&now, NULL);
805   next_flush.tv_sec = now.tv_sec + config_flush_interval;
806   next_flush.tv_nsec = 1000 * now.tv_usec;
807
808   pthread_mutex_lock(&cache_lock);
809
810   while (!do_shutdown)
811   {
812     gettimeofday (&now, NULL);
813     if ((now.tv_sec > next_flush.tv_sec)
814         || ((now.tv_sec == next_flush.tv_sec)
815           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
816     {
817       /* Flush all values that haven't been written in the last
818        * `config_write_interval' seconds. */
819       flush_old_values (config_write_interval);
820
821       /* Determine the time of the next cache flush. */
822       next_flush.tv_sec =
823         now.tv_sec + next_flush.tv_sec % config_flush_interval;
824
825       /* unlock the cache while we rotate so we don't block incoming
826        * updates if the fsync() blocks on disk I/O */
827       pthread_mutex_unlock(&cache_lock);
828       journal_rotate();
829       pthread_mutex_lock(&cache_lock);
830     }
831
832     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
833     if (status != 0 && status != ETIMEDOUT)
834     {
835       RRDD_LOG (LOG_ERR, "flush_thread_main: "
836                 "pthread_cond_timedwait returned %i.", status);
837     }
838   }
839
840   if (config_flush_at_shutdown)
841     flush_old_values (-1); /* flush everything */
842
843   pthread_mutex_unlock(&cache_lock);
844
845   return NULL;
846 } /* void *flush_thread_main */
847
848 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
849 {
850   pthread_mutex_lock (&cache_lock);
851
852   while (!do_shutdown
853          || (cache_queue_head != NULL && config_flush_at_shutdown))
854   {
855     cache_item_t *ci;
856     char *file;
857     char **values;
858     size_t values_num;
859     int status;
860
861     /* Now, check if there's something to store away. If not, wait until
862      * something comes in.  if we are shutting down, do not wait around.  */
863     if (cache_queue_head == NULL && !do_shutdown)
864     {
865       status = pthread_cond_wait (&queue_cond, &cache_lock);
866       if ((status != 0) && (status != ETIMEDOUT))
867       {
868         RRDD_LOG (LOG_ERR, "queue_thread_main: "
869             "pthread_cond_wait returned %i.", status);
870       }
871     }
872
873     /* Check if a value has arrived. This may be NULL if we timed out or there
874      * was an interrupt such as a signal. */
875     if (cache_queue_head == NULL)
876       continue;
877
878     ci = cache_queue_head;
879
880     /* copy the relevant parts */
881     file = strdup (ci->file);
882     if (file == NULL)
883     {
884       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
885       continue;
886     }
887
888     assert(ci->values != NULL);
889     assert(ci->values_num > 0);
890
891     values = ci->values;
892     values_num = ci->values_num;
893
894     wipe_ci_values(ci, time(NULL));
895     remove_from_queue(ci);
896
897     pthread_mutex_unlock (&cache_lock);
898
899     rrd_clear_error ();
900     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
901     if (status != 0)
902     {
903       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
904           "rrd_update_r (%s) failed with status %i. (%s)",
905           file, status, rrd_get_error());
906     }
907
908     journal_write("wrote", file);
909     pthread_cond_broadcast(&ci->flushed);
910
911     rrd_free_ptrs((void ***) &values, &values_num);
912     free(file);
913
914     if (status == 0)
915     {
916       pthread_mutex_lock (&stats_lock);
917       stats_updates_written++;
918       stats_data_sets_written += values_num;
919       pthread_mutex_unlock (&stats_lock);
920     }
921
922     pthread_mutex_lock (&cache_lock);
923   }
924   pthread_mutex_unlock (&cache_lock);
925
926   return (NULL);
927 } /* }}} void *queue_thread_main */
928
929 static int buffer_get_field (char **buffer_ret, /* {{{ */
930     size_t *buffer_size_ret, char **field_ret)
931 {
932   char *buffer;
933   size_t buffer_pos;
934   size_t buffer_size;
935   char *field;
936   size_t field_size;
937   int status;
938
939   buffer = *buffer_ret;
940   buffer_pos = 0;
941   buffer_size = *buffer_size_ret;
942   field = *buffer_ret;
943   field_size = 0;
944
945   if (buffer_size <= 0)
946     return (-1);
947
948   /* This is ensured by `handle_request'. */
949   assert (buffer[buffer_size - 1] == '\0');
950
951   status = -1;
952   while (buffer_pos < buffer_size)
953   {
954     /* Check for end-of-field or end-of-buffer */
955     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
956     {
957       field[field_size] = 0;
958       field_size++;
959       buffer_pos++;
960       status = 0;
961       break;
962     }
963     /* Handle escaped characters. */
964     else if (buffer[buffer_pos] == '\\')
965     {
966       if (buffer_pos >= (buffer_size - 1))
967         break;
968       buffer_pos++;
969       field[field_size] = buffer[buffer_pos];
970       field_size++;
971       buffer_pos++;
972     }
973     /* Normal operation */ 
974     else
975     {
976       field[field_size] = buffer[buffer_pos];
977       field_size++;
978       buffer_pos++;
979     }
980   } /* while (buffer_pos < buffer_size) */
981
982   if (status != 0)
983     return (status);
984
985   *buffer_ret = buffer + buffer_pos;
986   *buffer_size_ret = buffer_size - buffer_pos;
987   *field_ret = field;
988
989   return (0);
990 } /* }}} int buffer_get_field */
991
992 /* if we're restricting writes to the base directory,
993  * check whether the file falls within the dir
994  * returns 1 if OK, otherwise 0
995  */
996 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
997 {
998   assert(file != NULL);
999
1000   if (!config_write_base_only
1001       || sock == NULL /* journal replay */
1002       || config_base_dir == NULL)
1003     return 1;
1004
1005   if (strstr(file, "../") != NULL) goto err;
1006
1007   /* relative paths without "../" are ok */
1008   if (*file != '/') return 1;
1009
1010   /* file must be of the format base + "/" + <1+ char filename> */
1011   if (strlen(file) < _config_base_dir_len + 2) goto err;
1012   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1013   if (*(file + _config_base_dir_len) != '/') goto err;
1014
1015   return 1;
1016
1017 err:
1018   if (sock != NULL && sock->fd >= 0)
1019     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1020
1021   return 0;
1022 } /* }}} static int check_file_access */
1023
1024 /* when using a base dir, convert relative paths to absolute paths.
1025  * if necessary, modifies the "filename" pointer to point
1026  * to the new path created in "tmp".  "tmp" is provided
1027  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1028  *
1029  * this allows us to optimize for the expected case (absolute path)
1030  * with a no-op.
1031  */
1032 static void get_abs_path(char **filename, char *tmp)
1033 {
1034   assert(tmp != NULL);
1035   assert(filename != NULL && *filename != NULL);
1036
1037   if (config_base_dir == NULL || **filename == '/')
1038     return;
1039
1040   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1041   *filename = tmp;
1042 } /* }}} static int get_abs_path */
1043
1044 /* returns 1 if we have the required privilege level,
1045  * otherwise issue an error to the user on sock */
1046 static int has_privilege (listen_socket_t *sock, /* {{{ */
1047                           socket_privilege priv)
1048 {
1049   if (sock == NULL) /* journal replay */
1050     return 1;
1051
1052   if (sock->privilege >= priv)
1053     return 1;
1054
1055   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1056 } /* }}} static int has_privilege */
1057
1058 static int flush_file (const char *filename) /* {{{ */
1059 {
1060   cache_item_t *ci;
1061
1062   pthread_mutex_lock (&cache_lock);
1063
1064   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1065   if (ci == NULL)
1066   {
1067     pthread_mutex_unlock (&cache_lock);
1068     return (ENOENT);
1069   }
1070
1071   if (ci->values_num > 0)
1072   {
1073     /* Enqueue at head */
1074     enqueue_cache_item (ci, HEAD);
1075     pthread_cond_wait(&ci->flushed, &cache_lock);
1076   }
1077
1078   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1079    * may have been purged during our cond_wait() */
1080
1081   pthread_mutex_unlock(&cache_lock);
1082
1083   return (0);
1084 } /* }}} int flush_file */
1085
1086 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1087 {
1088   char *err = "Syntax error.\n";
1089
1090   if (cmd && cmd->syntax)
1091     err = cmd->syntax;
1092
1093   return send_response(sock, RESP_ERR, "Usage: %s", err);
1094 } /* }}} static int syntax_error() */
1095
1096 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1097 {
1098   uint64_t copy_queue_length;
1099   uint64_t copy_updates_received;
1100   uint64_t copy_flush_received;
1101   uint64_t copy_updates_written;
1102   uint64_t copy_data_sets_written;
1103   uint64_t copy_journal_bytes;
1104   uint64_t copy_journal_rotate;
1105
1106   uint64_t tree_nodes_number;
1107   uint64_t tree_depth;
1108
1109   pthread_mutex_lock (&stats_lock);
1110   copy_queue_length       = stats_queue_length;
1111   copy_updates_received   = stats_updates_received;
1112   copy_flush_received     = stats_flush_received;
1113   copy_updates_written    = stats_updates_written;
1114   copy_data_sets_written  = stats_data_sets_written;
1115   copy_journal_bytes      = stats_journal_bytes;
1116   copy_journal_rotate     = stats_journal_rotate;
1117   pthread_mutex_unlock (&stats_lock);
1118
1119   pthread_mutex_lock (&cache_lock);
1120   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1121   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1122   pthread_mutex_unlock (&cache_lock);
1123
1124   add_response_info(sock,
1125                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1126   add_response_info(sock,
1127                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1128   add_response_info(sock,
1129                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1130   add_response_info(sock,
1131                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1132   add_response_info(sock,
1133                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1134   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1135   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1136   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1137   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1138
1139   send_response(sock, RESP_OK, "Statistics follow\n");
1140
1141   return (0);
1142 } /* }}} int handle_request_stats */
1143
1144 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1145 {
1146   char *file, file_tmp[PATH_MAX];
1147   int status;
1148
1149   status = buffer_get_field (&buffer, &buffer_size, &file);
1150   if (status != 0)
1151   {
1152     return syntax_error(sock,cmd);
1153   }
1154   else
1155   {
1156     pthread_mutex_lock(&stats_lock);
1157     stats_flush_received++;
1158     pthread_mutex_unlock(&stats_lock);
1159
1160     get_abs_path(&file, file_tmp);
1161     if (!check_file_access(file, sock)) return 0;
1162
1163     status = flush_file (file);
1164     if (status == 0)
1165       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1166     else if (status == ENOENT)
1167     {
1168       /* no file in our tree; see whether it exists at all */
1169       struct stat statbuf;
1170
1171       memset(&statbuf, 0, sizeof(statbuf));
1172       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1173         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1174       else
1175         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1176     }
1177     else if (status < 0)
1178       return send_response(sock, RESP_ERR, "Internal error.\n");
1179     else
1180       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1181   }
1182
1183   /* NOTREACHED */
1184   assert(1==0);
1185 } /* }}} int handle_request_flush */
1186
1187 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1188 {
1189   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1190
1191   pthread_mutex_lock(&cache_lock);
1192   flush_old_values(-1);
1193   pthread_mutex_unlock(&cache_lock);
1194
1195   return send_response(sock, RESP_OK, "Started flush.\n");
1196 } /* }}} static int handle_request_flushall */
1197
1198 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1199 {
1200   int status;
1201   char *file, file_tmp[PATH_MAX];
1202   cache_item_t *ci;
1203
1204   status = buffer_get_field(&buffer, &buffer_size, &file);
1205   if (status != 0)
1206     return syntax_error(sock,cmd);
1207
1208   get_abs_path(&file, file_tmp);
1209
1210   pthread_mutex_lock(&cache_lock);
1211   ci = g_tree_lookup(cache_tree, file);
1212   if (ci == NULL)
1213   {
1214     pthread_mutex_unlock(&cache_lock);
1215     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1216   }
1217
1218   for (size_t i=0; i < ci->values_num; i++)
1219     add_response_info(sock, "%s\n", ci->values[i]);
1220
1221   pthread_mutex_unlock(&cache_lock);
1222   return send_response(sock, RESP_OK, "updates pending\n");
1223 } /* }}} static int handle_request_pending */
1224
1225 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1226 {
1227   int status;
1228   gboolean found;
1229   char *file, file_tmp[PATH_MAX];
1230
1231   status = buffer_get_field(&buffer, &buffer_size, &file);
1232   if (status != 0)
1233     return syntax_error(sock,cmd);
1234
1235   get_abs_path(&file, file_tmp);
1236   if (!check_file_access(file, sock)) return 0;
1237
1238   pthread_mutex_lock(&cache_lock);
1239   found = g_tree_remove(cache_tree, file);
1240   pthread_mutex_unlock(&cache_lock);
1241
1242   if (found == TRUE)
1243   {
1244     if (sock != NULL)
1245       journal_write("forget", file);
1246
1247     return send_response(sock, RESP_OK, "Gone!\n");
1248   }
1249   else
1250     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1251
1252   /* NOTREACHED */
1253   assert(1==0);
1254 } /* }}} static int handle_request_forget */
1255
1256 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1257 {
1258   cache_item_t *ci;
1259
1260   pthread_mutex_lock(&cache_lock);
1261
1262   ci = cache_queue_head;
1263   while (ci != NULL)
1264   {
1265     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1266     ci = ci->next;
1267   }
1268
1269   pthread_mutex_unlock(&cache_lock);
1270
1271   return send_response(sock, RESP_OK, "in queue.\n");
1272 } /* }}} int handle_request_queue */
1273
1274 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1275 {
1276   char *file, file_tmp[PATH_MAX];
1277   int values_num = 0;
1278   int status;
1279   char orig_buf[CMD_MAX];
1280
1281   cache_item_t *ci;
1282
1283   /* save it for the journal later */
1284   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1285
1286   status = buffer_get_field (&buffer, &buffer_size, &file);
1287   if (status != 0)
1288     return syntax_error(sock,cmd);
1289
1290   pthread_mutex_lock(&stats_lock);
1291   stats_updates_received++;
1292   pthread_mutex_unlock(&stats_lock);
1293
1294   get_abs_path(&file, file_tmp);
1295   if (!check_file_access(file, sock)) return 0;
1296
1297   pthread_mutex_lock (&cache_lock);
1298   ci = g_tree_lookup (cache_tree, file);
1299
1300   if (ci == NULL) /* {{{ */
1301   {
1302     struct stat statbuf;
1303
1304     /* don't hold the lock while we setup; stat(2) might block */
1305     pthread_mutex_unlock(&cache_lock);
1306
1307     memset (&statbuf, 0, sizeof (statbuf));
1308     status = stat (file, &statbuf);
1309     if (status != 0)
1310     {
1311       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1312
1313       status = errno;
1314       if (status == ENOENT)
1315         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1316       else
1317         return send_response(sock, RESP_ERR,
1318                              "stat failed with error %i.\n", status);
1319     }
1320     if (!S_ISREG (statbuf.st_mode))
1321       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1322
1323     if (access(file, R_OK|W_OK) != 0)
1324       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1325                            file, rrd_strerror(errno));
1326
1327     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1328     if (ci == NULL)
1329     {
1330       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1331
1332       return send_response(sock, RESP_ERR, "malloc failed.\n");
1333     }
1334     memset (ci, 0, sizeof (cache_item_t));
1335
1336     ci->file = strdup (file);
1337     if (ci->file == NULL)
1338     {
1339       free (ci);
1340       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1341
1342       return send_response(sock, RESP_ERR, "strdup failed.\n");
1343     }
1344
1345     wipe_ci_values(ci, now);
1346     ci->flags = CI_FLAGS_IN_TREE;
1347     pthread_cond_init(&ci->flushed, NULL);
1348
1349     pthread_mutex_lock(&cache_lock);
1350     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1351   } /* }}} */
1352   assert (ci != NULL);
1353
1354   /* don't re-write updates in replay mode */
1355   if (sock != NULL)
1356     journal_write("update", orig_buf);
1357
1358   while (buffer_size > 0)
1359   {
1360     char *value;
1361     time_t stamp;
1362     char *eostamp;
1363
1364     status = buffer_get_field (&buffer, &buffer_size, &value);
1365     if (status != 0)
1366     {
1367       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1368       break;
1369     }
1370
1371     /* make sure update time is always moving forward */
1372     stamp = strtol(value, &eostamp, 10);
1373     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1374     {
1375       pthread_mutex_unlock(&cache_lock);
1376       return send_response(sock, RESP_ERR,
1377                            "Cannot find timestamp in '%s'!\n", value);
1378     }
1379     else if (stamp <= ci->last_update_stamp)
1380     {
1381       pthread_mutex_unlock(&cache_lock);
1382       return send_response(sock, RESP_ERR,
1383                            "illegal attempt to update using time %ld when last"
1384                            " update time is %ld (minimum one second step)\n",
1385                            stamp, ci->last_update_stamp);
1386     }
1387     else
1388       ci->last_update_stamp = stamp;
1389
1390     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1391     {
1392       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1393       continue;
1394     }
1395
1396     values_num++;
1397   }
1398
1399   if (((now - ci->last_flush_time) >= config_write_interval)
1400       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1401       && (ci->values_num > 0))
1402   {
1403     enqueue_cache_item (ci, TAIL);
1404   }
1405
1406   pthread_mutex_unlock (&cache_lock);
1407
1408   if (values_num < 1)
1409     return send_response(sock, RESP_ERR, "No values updated.\n");
1410   else
1411     return send_response(sock, RESP_OK,
1412                          "errors, enqueued %i value(s).\n", values_num);
1413
1414   /* NOTREACHED */
1415   assert(1==0);
1416
1417 } /* }}} int handle_request_update */
1418
1419 /* we came across a "WROTE" entry during journal replay.
1420  * throw away any values that we have accumulated for this file
1421  */
1422 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1423 {
1424   cache_item_t *ci;
1425   const char *file = buffer;
1426
1427   pthread_mutex_lock(&cache_lock);
1428
1429   ci = g_tree_lookup(cache_tree, file);
1430   if (ci == NULL)
1431   {
1432     pthread_mutex_unlock(&cache_lock);
1433     return (0);
1434   }
1435
1436   if (ci->values)
1437     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1438
1439   wipe_ci_values(ci, now);
1440   remove_from_queue(ci);
1441
1442   pthread_mutex_unlock(&cache_lock);
1443   return (0);
1444 } /* }}} int handle_request_wrote */
1445
1446 /* start "BATCH" processing */
1447 static int batch_start (HANDLER_PROTO) /* {{{ */
1448 {
1449   int status;
1450   if (sock->batch_start)
1451     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1452
1453   status = send_response(sock, RESP_OK,
1454                          "Go ahead.  End with dot '.' on its own line.\n");
1455   sock->batch_start = time(NULL);
1456   sock->batch_cmd = 0;
1457
1458   return status;
1459 } /* }}} static int batch_start */
1460
1461 /* finish "BATCH" processing and return results to the client */
1462 static int batch_done (HANDLER_PROTO) /* {{{ */
1463 {
1464   assert(sock->batch_start);
1465   sock->batch_start = 0;
1466   sock->batch_cmd  = 0;
1467   return send_response(sock, RESP_OK, "errors\n");
1468 } /* }}} static int batch_done */
1469
1470 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1471 {
1472   return -1;
1473 } /* }}} static int handle_request_quit */
1474
1475 struct command COMMANDS[] = {
1476   {
1477     "UPDATE",
1478     handle_request_update,
1479     PRIV_HIGH,
1480     CMD_CONTEXT_ANY,
1481     "UPDATE <filename> <values> [<values> ...]\n"
1482     ,
1483     "Adds the given file to the internal cache if it is not yet known and\n"
1484     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1485     "for details.\n"
1486     "\n"
1487     "Each <values> has the following form:\n"
1488     "  <values> = <time>:<value>[:<value>[...]]\n"
1489     "See the rrdupdate(1) manpage for details.\n"
1490   },
1491   {
1492     "WROTE",
1493     handle_request_wrote,
1494     PRIV_HIGH,
1495     CMD_CONTEXT_JOURNAL,
1496     NULL,
1497     NULL
1498   },
1499   {
1500     "FLUSH",
1501     handle_request_flush,
1502     PRIV_LOW,
1503     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1504     "FLUSH <filename>\n"
1505     ,
1506     "Adds the given filename to the head of the update queue and returns\n"
1507     "after it has been dequeued.\n"
1508   },
1509   {
1510     "FLUSHALL",
1511     handle_request_flushall,
1512     PRIV_HIGH,
1513     CMD_CONTEXT_CLIENT,
1514     "FLUSHALL\n"
1515     ,
1516     "Triggers writing of all pending updates.  Returns immediately.\n"
1517   },
1518   {
1519     "PENDING",
1520     handle_request_pending,
1521     PRIV_HIGH,
1522     CMD_CONTEXT_CLIENT,
1523     "PENDING <filename>\n"
1524     ,
1525     "Shows any 'pending' updates for a file, in order.\n"
1526     "The updates shown have not yet been written to the underlying RRD file.\n"
1527   },
1528   {
1529     "FORGET",
1530     handle_request_forget,
1531     PRIV_HIGH,
1532     CMD_CONTEXT_ANY,
1533     "FORGET <filename>\n"
1534     ,
1535     "Removes the file completely from the cache.\n"
1536     "Any pending updates for the file will be lost.\n"
1537   },
1538   {
1539     "QUEUE",
1540     handle_request_queue,
1541     PRIV_LOW,
1542     CMD_CONTEXT_CLIENT,
1543     "QUEUE\n"
1544     ,
1545         "Shows all files in the output queue.\n"
1546     "The output is zero or more lines in the following format:\n"
1547     "(where <num_vals> is the number of values to be written)\n"
1548     "\n"
1549     "<num_vals> <filename>\n"
1550   },
1551   {
1552     "STATS",
1553     handle_request_stats,
1554     PRIV_LOW,
1555     CMD_CONTEXT_CLIENT,
1556     "STATS\n"
1557     ,
1558     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1559     "a description of the values.\n"
1560   },
1561   {
1562     "HELP",
1563     handle_request_help,
1564     PRIV_LOW,
1565     CMD_CONTEXT_CLIENT,
1566     "HELP [<command>]\n",
1567     NULL, /* special! */
1568   },
1569   {
1570     "BATCH",
1571     batch_start,
1572     PRIV_LOW,
1573     CMD_CONTEXT_CLIENT,
1574     "BATCH\n"
1575     ,
1576     "The 'BATCH' command permits the client to initiate a bulk load\n"
1577     "   of commands to rrdcached.\n"
1578     "\n"
1579     "Usage:\n"
1580     "\n"
1581     "    client: BATCH\n"
1582     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1583     "    client: command #1\n"
1584     "    client: command #2\n"
1585     "    client: ... and so on\n"
1586     "    client: .\n"
1587     "    server: 2 errors\n"
1588     "    server: 7 message for command #7\n"
1589     "    server: 9 message for command #9\n"
1590     "\n"
1591     "For more information, consult the rrdcached(1) documentation.\n"
1592   },
1593   {
1594     ".",   /* BATCH terminator */
1595     batch_done,
1596     PRIV_LOW,
1597     CMD_CONTEXT_BATCH,
1598     NULL,
1599     NULL
1600   },
1601   {
1602     "QUIT",
1603     handle_request_quit,
1604     PRIV_LOW,
1605     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1606     "QUIT\n"
1607     ,
1608     "Disconnect from rrdcached.\n"
1609   },
1610   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1611 };
1612
1613 static struct command *find_command(char *cmd)
1614 {
1615   struct command *c = COMMANDS;
1616
1617   while (c->cmd != NULL)
1618   {
1619     if (strcasecmp(cmd, c->cmd) == 0)
1620       break;
1621     c++;
1622   }
1623
1624   if (c->cmd == NULL)
1625     return NULL;
1626   else
1627     return c;
1628 }
1629
1630 /* check whether commands are received in the expected context */
1631 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1632 {
1633   if (sock == NULL)
1634     return (cmd->context & CMD_CONTEXT_JOURNAL);
1635   else if (sock->batch_start)
1636     return (cmd->context & CMD_CONTEXT_BATCH);
1637   else
1638     return (cmd->context & CMD_CONTEXT_CLIENT);
1639
1640   /* NOTREACHED */
1641   assert(1==0);
1642 }
1643
1644 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1645 {
1646   int status;
1647   char *cmd_str;
1648   char *resp_txt;
1649   struct command *help = NULL;
1650
1651   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1652   if (status == 0)
1653     help = find_command(cmd_str);
1654
1655   if (help && (help->syntax || help->help))
1656   {
1657     char tmp[CMD_MAX];
1658
1659     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1660     resp_txt = tmp;
1661
1662     if (help->syntax)
1663       add_response_info(sock, "Usage: %s\n", help->syntax);
1664
1665     if (help->help)
1666       add_response_info(sock, "%s\n", help->help);
1667   }
1668   else
1669   {
1670     help = COMMANDS;
1671     resp_txt = "Command overview\n";
1672
1673     while (help->cmd)
1674     {
1675       if (help->syntax)
1676         add_response_info(sock, "%s", help->syntax);
1677       help++;
1678     }
1679   }
1680
1681   return send_response(sock, RESP_OK, resp_txt);
1682 } /* }}} int handle_request_help */
1683
1684 /* if sock==NULL, we are in journal replay mode */
1685 static int handle_request (DISPATCH_PROTO) /* {{{ */
1686 {
1687   char *buffer_ptr = buffer;
1688   char *cmd_str = NULL;
1689   struct command *cmd = NULL;
1690   int status;
1691
1692   assert (buffer[buffer_size - 1] == '\0');
1693
1694   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1695   if (status != 0)
1696   {
1697     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1698     return (-1);
1699   }
1700
1701   if (sock != NULL && sock->batch_start)
1702     sock->batch_cmd++;
1703
1704   cmd = find_command(cmd_str);
1705   if (!cmd)
1706     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1707
1708   status = has_privilege(sock, cmd->min_priv);
1709   if (status <= 0)
1710     return status;
1711
1712   if (!command_check_context(sock, cmd))
1713     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1714
1715   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1716 } /* }}} int handle_request */
1717
1718 /* MUST NOT hold journal_lock before calling this */
1719 static void journal_rotate(void) /* {{{ */
1720 {
1721   FILE *old_fh = NULL;
1722   int new_fd;
1723
1724   if (journal_cur == NULL || journal_old == NULL)
1725     return;
1726
1727   pthread_mutex_lock(&journal_lock);
1728
1729   /* we rotate this way (rename before close) so that the we can release
1730    * the journal lock as fast as possible.  Journal writes to the new
1731    * journal can proceed immediately after the new file is opened.  The
1732    * fclose can then block without affecting new updates.
1733    */
1734   if (journal_fh != NULL)
1735   {
1736     old_fh = journal_fh;
1737     journal_fh = NULL;
1738     rename(journal_cur, journal_old);
1739     ++stats_journal_rotate;
1740   }
1741
1742   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1743                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1744   if (new_fd >= 0)
1745   {
1746     journal_fh = fdopen(new_fd, "a");
1747     if (journal_fh == NULL)
1748       close(new_fd);
1749   }
1750
1751   pthread_mutex_unlock(&journal_lock);
1752
1753   if (old_fh != NULL)
1754     fclose(old_fh);
1755
1756   if (journal_fh == NULL)
1757   {
1758     RRDD_LOG(LOG_CRIT,
1759              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1760              journal_cur, rrd_strerror(errno));
1761
1762     RRDD_LOG(LOG_ERR,
1763              "JOURNALING DISABLED: All values will be flushed at shutdown");
1764     config_flush_at_shutdown = 1;
1765   }
1766
1767 } /* }}} static void journal_rotate */
1768
1769 static void journal_done(void) /* {{{ */
1770 {
1771   if (journal_cur == NULL)
1772     return;
1773
1774   pthread_mutex_lock(&journal_lock);
1775   if (journal_fh != NULL)
1776   {
1777     fclose(journal_fh);
1778     journal_fh = NULL;
1779   }
1780
1781   if (config_flush_at_shutdown)
1782   {
1783     RRDD_LOG(LOG_INFO, "removing journals");
1784     unlink(journal_old);
1785     unlink(journal_cur);
1786   }
1787   else
1788   {
1789     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1790              "journals will be used at next startup");
1791   }
1792
1793   pthread_mutex_unlock(&journal_lock);
1794
1795 } /* }}} static void journal_done */
1796
1797 static int journal_write(char *cmd, char *args) /* {{{ */
1798 {
1799   int chars;
1800
1801   if (journal_fh == NULL)
1802     return 0;
1803
1804   pthread_mutex_lock(&journal_lock);
1805   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1806   pthread_mutex_unlock(&journal_lock);
1807
1808   if (chars > 0)
1809   {
1810     pthread_mutex_lock(&stats_lock);
1811     stats_journal_bytes += chars;
1812     pthread_mutex_unlock(&stats_lock);
1813   }
1814
1815   return chars;
1816 } /* }}} static int journal_write */
1817
1818 static int journal_replay (const char *file) /* {{{ */
1819 {
1820   FILE *fh;
1821   int entry_cnt = 0;
1822   int fail_cnt = 0;
1823   uint64_t line = 0;
1824   char entry[CMD_MAX];
1825   time_t now;
1826
1827   if (file == NULL) return 0;
1828
1829   {
1830     char *reason = "unknown error";
1831     int status = 0;
1832     struct stat statbuf;
1833
1834     memset(&statbuf, 0, sizeof(statbuf));
1835     if (stat(file, &statbuf) != 0)
1836     {
1837       if (errno == ENOENT)
1838         return 0;
1839
1840       reason = "stat error";
1841       status = errno;
1842     }
1843     else if (!S_ISREG(statbuf.st_mode))
1844     {
1845       reason = "not a regular file";
1846       status = EPERM;
1847     }
1848     if (statbuf.st_uid != daemon_uid)
1849     {
1850       reason = "not owned by daemon user";
1851       status = EACCES;
1852     }
1853     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1854     {
1855       reason = "must not be user/group writable";
1856       status = EACCES;
1857     }
1858
1859     if (status != 0)
1860     {
1861       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1862                file, rrd_strerror(status), reason);
1863       return 0;
1864     }
1865   }
1866
1867   fh = fopen(file, "r");
1868   if (fh == NULL)
1869   {
1870     if (errno != ENOENT)
1871       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1872                file, rrd_strerror(errno));
1873     return 0;
1874   }
1875   else
1876     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1877
1878   now = time(NULL);
1879
1880   while(!feof(fh))
1881   {
1882     size_t entry_len;
1883
1884     ++line;
1885     if (fgets(entry, sizeof(entry), fh) == NULL)
1886       break;
1887     entry_len = strlen(entry);
1888
1889     /* check \n termination in case journal writing crashed mid-line */
1890     if (entry_len == 0)
1891       continue;
1892     else if (entry[entry_len - 1] != '\n')
1893     {
1894       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1895       ++fail_cnt;
1896       continue;
1897     }
1898
1899     entry[entry_len - 1] = '\0';
1900
1901     if (handle_request(NULL, now, entry, entry_len) == 0)
1902       ++entry_cnt;
1903     else
1904       ++fail_cnt;
1905   }
1906
1907   fclose(fh);
1908
1909   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1910            entry_cnt, fail_cnt);
1911
1912   return entry_cnt > 0 ? 1 : 0;
1913 } /* }}} static int journal_replay */
1914
1915 static void journal_init(void) /* {{{ */
1916 {
1917   int had_journal = 0;
1918
1919   if (journal_cur == NULL) return;
1920
1921   pthread_mutex_lock(&journal_lock);
1922
1923   RRDD_LOG(LOG_INFO, "checking for journal files");
1924
1925   had_journal += journal_replay(journal_old);
1926   had_journal += journal_replay(journal_cur);
1927
1928   /* it must have been a crash.  start a flush */
1929   if (had_journal && config_flush_at_shutdown)
1930     flush_old_values(-1);
1931
1932   pthread_mutex_unlock(&journal_lock);
1933   journal_rotate();
1934
1935   RRDD_LOG(LOG_INFO, "journal processing complete");
1936
1937 } /* }}} static void journal_init */
1938
1939 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1940 {
1941   assert(sock != NULL);
1942
1943   free(sock->rbuf);  sock->rbuf = NULL;
1944   free(sock->wbuf);  sock->wbuf = NULL;
1945   free(sock);
1946 } /* }}} void free_listen_socket */
1947
1948 static void close_connection(listen_socket_t *sock) /* {{{ */
1949 {
1950   if (sock->fd >= 0)
1951   {
1952     close(sock->fd);
1953     sock->fd = -1;
1954   }
1955
1956   free_listen_socket(sock);
1957
1958 } /* }}} void close_connection */
1959
1960 static void *connection_thread_main (void *args) /* {{{ */
1961 {
1962   listen_socket_t *sock;
1963   int fd;
1964
1965   sock = (listen_socket_t *) args;
1966   fd = sock->fd;
1967
1968   /* init read buffers */
1969   sock->next_read = sock->next_cmd = 0;
1970   sock->rbuf = malloc(RBUF_SIZE);
1971   if (sock->rbuf == NULL)
1972   {
1973     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1974     close_connection(sock);
1975     return NULL;
1976   }
1977
1978   pthread_mutex_lock (&connection_threads_lock);
1979   connection_threads_num++;
1980   pthread_mutex_unlock (&connection_threads_lock);
1981
1982   while (do_shutdown == 0)
1983   {
1984     char *cmd;
1985     ssize_t cmd_len;
1986     ssize_t rbytes;
1987     time_t now;
1988
1989     struct pollfd pollfd;
1990     int status;
1991
1992     pollfd.fd = fd;
1993     pollfd.events = POLLIN | POLLPRI;
1994     pollfd.revents = 0;
1995
1996     status = poll (&pollfd, 1, /* timeout = */ 500);
1997     if (do_shutdown)
1998       break;
1999     else if (status == 0) /* timeout */
2000       continue;
2001     else if (status < 0) /* error */
2002     {
2003       status = errno;
2004       if (status != EINTR)
2005         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2006       continue;
2007     }
2008
2009     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2010       break;
2011     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2012     {
2013       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2014           "poll(2) returned something unexpected: %#04hx",
2015           pollfd.revents);
2016       break;
2017     }
2018
2019     rbytes = read(fd, sock->rbuf + sock->next_read,
2020                   RBUF_SIZE - sock->next_read);
2021     if (rbytes < 0)
2022     {
2023       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2024       break;
2025     }
2026     else if (rbytes == 0)
2027       break; /* eof */
2028
2029     sock->next_read += rbytes;
2030
2031     if (sock->batch_start)
2032       now = sock->batch_start;
2033     else
2034       now = time(NULL);
2035
2036     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2037     {
2038       status = handle_request (sock, now, cmd, cmd_len+1);
2039       if (status != 0)
2040         goto out_close;
2041     }
2042   }
2043
2044 out_close:
2045   close_connection(sock);
2046
2047   /* Remove this thread from the connection threads list */
2048   pthread_mutex_lock (&connection_threads_lock);
2049   connection_threads_num--;
2050   if (connection_threads_num <= 0)
2051     pthread_cond_broadcast(&connection_threads_done);
2052   pthread_mutex_unlock (&connection_threads_lock);
2053
2054   return (NULL);
2055 } /* }}} void *connection_thread_main */
2056
2057 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2058 {
2059   int fd;
2060   struct sockaddr_un sa;
2061   listen_socket_t *temp;
2062   int status;
2063   const char *path;
2064
2065   path = sock->addr;
2066   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2067     path += strlen("unix:");
2068
2069   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2070       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2071   if (temp == NULL)
2072   {
2073     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2074     return (-1);
2075   }
2076   listen_fds = temp;
2077   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2078
2079   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2080   if (fd < 0)
2081   {
2082     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2083              rrd_strerror(errno));
2084     return (-1);
2085   }
2086
2087   memset (&sa, 0, sizeof (sa));
2088   sa.sun_family = AF_UNIX;
2089   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2090
2091   /* if we've gotten this far, we own the pid file.  any daemon started
2092    * with the same args must not be alive.  therefore, ensure that we can
2093    * create the socket...
2094    */
2095   unlink(path);
2096
2097   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2098   if (status != 0)
2099   {
2100     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2101              path, rrd_strerror(errno));
2102     close (fd);
2103     return (-1);
2104   }
2105
2106   status = listen (fd, /* backlog = */ 10);
2107   if (status != 0)
2108   {
2109     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2110              path, rrd_strerror(errno));
2111     close (fd);
2112     unlink (path);
2113     return (-1);
2114   }
2115
2116   listen_fds[listen_fds_num].fd = fd;
2117   listen_fds[listen_fds_num].family = PF_UNIX;
2118   strncpy(listen_fds[listen_fds_num].addr, path,
2119           sizeof (listen_fds[listen_fds_num].addr) - 1);
2120   listen_fds_num++;
2121
2122   return (0);
2123 } /* }}} int open_listen_socket_unix */
2124
2125 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2126 {
2127   struct addrinfo ai_hints;
2128   struct addrinfo *ai_res;
2129   struct addrinfo *ai_ptr;
2130   char addr_copy[NI_MAXHOST];
2131   char *addr;
2132   char *port;
2133   int status;
2134
2135   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2136   addr_copy[sizeof (addr_copy) - 1] = 0;
2137   addr = addr_copy;
2138
2139   memset (&ai_hints, 0, sizeof (ai_hints));
2140   ai_hints.ai_flags = 0;
2141 #ifdef AI_ADDRCONFIG
2142   ai_hints.ai_flags |= AI_ADDRCONFIG;
2143 #endif
2144   ai_hints.ai_family = AF_UNSPEC;
2145   ai_hints.ai_socktype = SOCK_STREAM;
2146
2147   port = NULL;
2148   if (*addr == '[') /* IPv6+port format */
2149   {
2150     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2151     addr++;
2152
2153     port = strchr (addr, ']');
2154     if (port == NULL)
2155     {
2156       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2157       return (-1);
2158     }
2159     *port = 0;
2160     port++;
2161
2162     if (*port == ':')
2163       port++;
2164     else if (*port == 0)
2165       port = NULL;
2166     else
2167     {
2168       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2169       return (-1);
2170     }
2171   } /* if (*addr = ']') */
2172   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2173   {
2174     port = rindex(addr, ':');
2175     if (port != NULL)
2176     {
2177       *port = 0;
2178       port++;
2179     }
2180   }
2181   ai_res = NULL;
2182   status = getaddrinfo (addr,
2183                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2184                         &ai_hints, &ai_res);
2185   if (status != 0)
2186   {
2187     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2188              addr, gai_strerror (status));
2189     return (-1);
2190   }
2191
2192   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2193   {
2194     int fd;
2195     listen_socket_t *temp;
2196     int one = 1;
2197
2198     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2199         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2200     if (temp == NULL)
2201     {
2202       fprintf (stderr,
2203                "rrdcached: open_listen_socket_network: realloc failed.\n");
2204       continue;
2205     }
2206     listen_fds = temp;
2207     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2208
2209     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2210     if (fd < 0)
2211     {
2212       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2213                rrd_strerror(errno));
2214       continue;
2215     }
2216
2217     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2218
2219     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2220     if (status != 0)
2221     {
2222       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2223                sock->addr, rrd_strerror(errno));
2224       close (fd);
2225       continue;
2226     }
2227
2228     status = listen (fd, /* backlog = */ 10);
2229     if (status != 0)
2230     {
2231       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2232                sock->addr, rrd_strerror(errno));
2233       close (fd);
2234       freeaddrinfo(ai_res);
2235       return (-1);
2236     }
2237
2238     listen_fds[listen_fds_num].fd = fd;
2239     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2240     listen_fds_num++;
2241   } /* for (ai_ptr) */
2242
2243   freeaddrinfo(ai_res);
2244   return (0);
2245 } /* }}} static int open_listen_socket_network */
2246
2247 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2248 {
2249   assert(sock != NULL);
2250   assert(sock->addr != NULL);
2251
2252   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2253       || sock->addr[0] == '/')
2254     return (open_listen_socket_unix(sock));
2255   else
2256     return (open_listen_socket_network(sock));
2257 } /* }}} int open_listen_socket */
2258
2259 static int close_listen_sockets (void) /* {{{ */
2260 {
2261   size_t i;
2262
2263   for (i = 0; i < listen_fds_num; i++)
2264   {
2265     close (listen_fds[i].fd);
2266
2267     if (listen_fds[i].family == PF_UNIX)
2268       unlink(listen_fds[i].addr);
2269   }
2270
2271   free (listen_fds);
2272   listen_fds = NULL;
2273   listen_fds_num = 0;
2274
2275   return (0);
2276 } /* }}} int close_listen_sockets */
2277
2278 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2279 {
2280   struct pollfd *pollfds;
2281   int pollfds_num;
2282   int status;
2283   int i;
2284
2285   if (listen_fds_num < 1)
2286   {
2287     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2288     return (NULL);
2289   }
2290
2291   pollfds_num = listen_fds_num;
2292   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2293   if (pollfds == NULL)
2294   {
2295     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2296     return (NULL);
2297   }
2298   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2299
2300   RRDD_LOG(LOG_INFO, "listening for connections");
2301
2302   while (do_shutdown == 0)
2303   {
2304     for (i = 0; i < pollfds_num; i++)
2305     {
2306       pollfds[i].fd = listen_fds[i].fd;
2307       pollfds[i].events = POLLIN | POLLPRI;
2308       pollfds[i].revents = 0;
2309     }
2310
2311     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2312     if (do_shutdown)
2313       break;
2314     else if (status == 0) /* timeout */
2315       continue;
2316     else if (status < 0) /* error */
2317     {
2318       status = errno;
2319       if (status != EINTR)
2320       {
2321         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2322       }
2323       continue;
2324     }
2325
2326     for (i = 0; i < pollfds_num; i++)
2327     {
2328       listen_socket_t *client_sock;
2329       struct sockaddr_storage client_sa;
2330       socklen_t client_sa_size;
2331       pthread_t tid;
2332       pthread_attr_t attr;
2333
2334       if (pollfds[i].revents == 0)
2335         continue;
2336
2337       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2338       {
2339         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2340             "poll(2) returned something unexpected for listen FD #%i.",
2341             pollfds[i].fd);
2342         continue;
2343       }
2344
2345       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2346       if (client_sock == NULL)
2347       {
2348         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2349         continue;
2350       }
2351       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2352
2353       client_sa_size = sizeof (client_sa);
2354       client_sock->fd = accept (pollfds[i].fd,
2355           (struct sockaddr *) &client_sa, &client_sa_size);
2356       if (client_sock->fd < 0)
2357       {
2358         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2359         free(client_sock);
2360         continue;
2361       }
2362
2363       pthread_attr_init (&attr);
2364       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2365
2366       status = pthread_create (&tid, &attr, connection_thread_main,
2367                                client_sock);
2368       if (status != 0)
2369       {
2370         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2371         close_connection(client_sock);
2372         continue;
2373       }
2374     } /* for (pollfds_num) */
2375   } /* while (do_shutdown == 0) */
2376
2377   RRDD_LOG(LOG_INFO, "starting shutdown");
2378
2379   close_listen_sockets ();
2380
2381   pthread_mutex_lock (&connection_threads_lock);
2382   while (connection_threads_num > 0)
2383     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2384   pthread_mutex_unlock (&connection_threads_lock);
2385
2386   free(pollfds);
2387
2388   return (NULL);
2389 } /* }}} void *listen_thread_main */
2390
2391 static int daemonize (void) /* {{{ */
2392 {
2393   int pid_fd;
2394   char *base_dir;
2395
2396   daemon_uid = geteuid();
2397
2398   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2399   if (pid_fd < 0)
2400     pid_fd = check_pidfile();
2401   if (pid_fd < 0)
2402     return pid_fd;
2403
2404   /* open all the listen sockets */
2405   if (config_listen_address_list_len > 0)
2406   {
2407     for (size_t i = 0; i < config_listen_address_list_len; i++)
2408       open_listen_socket (config_listen_address_list[i]);
2409
2410     rrd_free_ptrs((void ***) &config_listen_address_list,
2411                   &config_listen_address_list_len);
2412   }
2413   else
2414   {
2415     listen_socket_t sock;
2416     memset(&sock, 0, sizeof(sock));
2417     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2418     open_listen_socket (&sock);
2419   }
2420
2421   if (listen_fds_num < 1)
2422   {
2423     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2424     goto error;
2425   }
2426
2427   if (!stay_foreground)
2428   {
2429     pid_t child;
2430
2431     child = fork ();
2432     if (child < 0)
2433     {
2434       fprintf (stderr, "daemonize: fork(2) failed.\n");
2435       goto error;
2436     }
2437     else if (child > 0)
2438       exit(0);
2439
2440     /* Become session leader */
2441     setsid ();
2442
2443     /* Open the first three file descriptors to /dev/null */
2444     close (2);
2445     close (1);
2446     close (0);
2447
2448     open ("/dev/null", O_RDWR);
2449     dup (0);
2450     dup (0);
2451   } /* if (!stay_foreground) */
2452
2453   /* Change into the /tmp directory. */
2454   base_dir = (config_base_dir != NULL)
2455     ? config_base_dir
2456     : "/tmp";
2457
2458   if (chdir (base_dir) != 0)
2459   {
2460     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2461     goto error;
2462   }
2463
2464   install_signal_handlers();
2465
2466   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2467   RRDD_LOG(LOG_INFO, "starting up");
2468
2469   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2470                                 (GDestroyNotify) free_cache_item);
2471   if (cache_tree == NULL)
2472   {
2473     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2474     goto error;
2475   }
2476
2477   return write_pidfile (pid_fd);
2478
2479 error:
2480   remove_pidfile();
2481   return -1;
2482 } /* }}} int daemonize */
2483
2484 static int cleanup (void) /* {{{ */
2485 {
2486   do_shutdown++;
2487
2488   pthread_cond_broadcast (&flush_cond);
2489   pthread_join (flush_thread, NULL);
2490
2491   pthread_cond_broadcast (&queue_cond);
2492   for (int i = 0; i < config_queue_threads; i++)
2493     pthread_join (queue_threads[i], NULL);
2494
2495   if (config_flush_at_shutdown)
2496   {
2497     assert(cache_queue_head == NULL);
2498     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2499   }
2500
2501   journal_done();
2502   remove_pidfile ();
2503
2504   free(queue_threads);
2505   free(config_base_dir);
2506   free(config_pid_file);
2507   free(journal_cur);
2508   free(journal_old);
2509
2510   pthread_mutex_lock(&cache_lock);
2511   g_tree_destroy(cache_tree);
2512
2513   RRDD_LOG(LOG_INFO, "goodbye");
2514   closelog ();
2515
2516   return (0);
2517 } /* }}} int cleanup */
2518
2519 static int read_options (int argc, char **argv) /* {{{ */
2520 {
2521   int option;
2522   int status = 0;
2523
2524   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2525   {
2526     switch (option)
2527     {
2528       case 'g':
2529         stay_foreground=1;
2530         break;
2531
2532       case 'L':
2533       case 'l':
2534       {
2535         listen_socket_t *new;
2536
2537         new = malloc(sizeof(listen_socket_t));
2538         if (new == NULL)
2539         {
2540           fprintf(stderr, "read_options: malloc failed.\n");
2541           return(2);
2542         }
2543         memset(new, 0, sizeof(listen_socket_t));
2544
2545         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2546         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2547
2548         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2549                          &config_listen_address_list_len, new))
2550         {
2551           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2552           return (2);
2553         }
2554       }
2555       break;
2556
2557       case 'f':
2558       {
2559         int temp;
2560
2561         temp = atoi (optarg);
2562         if (temp > 0)
2563           config_flush_interval = temp;
2564         else
2565         {
2566           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2567           status = 3;
2568         }
2569       }
2570       break;
2571
2572       case 'w':
2573       {
2574         int temp;
2575
2576         temp = atoi (optarg);
2577         if (temp > 0)
2578           config_write_interval = temp;
2579         else
2580         {
2581           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2582           status = 2;
2583         }
2584       }
2585       break;
2586
2587       case 'z':
2588       {
2589         int temp;
2590
2591         temp = atoi(optarg);
2592         if (temp > 0)
2593           config_write_jitter = temp;
2594         else
2595         {
2596           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2597           status = 2;
2598         }
2599
2600         break;
2601       }
2602
2603       case 't':
2604       {
2605         int threads;
2606         threads = atoi(optarg);
2607         if (threads >= 1)
2608           config_queue_threads = threads;
2609         else
2610         {
2611           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2612           return 1;
2613         }
2614       }
2615       break;
2616
2617       case 'B':
2618         config_write_base_only = 1;
2619         break;
2620
2621       case 'b':
2622       {
2623         size_t len;
2624         char base_realpath[PATH_MAX];
2625
2626         if (config_base_dir != NULL)
2627           free (config_base_dir);
2628         config_base_dir = strdup (optarg);
2629         if (config_base_dir == NULL)
2630         {
2631           fprintf (stderr, "read_options: strdup failed.\n");
2632           return (3);
2633         }
2634
2635         /* make sure that the base directory is not resolved via
2636          * symbolic links.  this makes some performance-enhancing
2637          * assumptions possible (we don't have to resolve paths
2638          * that start with a "/")
2639          */
2640         if (realpath(config_base_dir, base_realpath) == NULL)
2641         {
2642           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2643           return 5;
2644         }
2645         else if (strncmp(config_base_dir,
2646                          base_realpath, sizeof(base_realpath)) != 0)
2647         {
2648           fprintf(stderr,
2649                   "Base directory (-b) resolved via file system links!\n"
2650                   "Please consult rrdcached '-b' documentation!\n"
2651                   "Consider specifying the real directory (%s)\n",
2652                   base_realpath);
2653           return 5;
2654         }
2655
2656         len = strlen (config_base_dir);
2657         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2658         {
2659           config_base_dir[len - 1] = 0;
2660           len--;
2661         }
2662
2663         if (len < 1)
2664         {
2665           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2666           return (4);
2667         }
2668
2669         _config_base_dir_len = len;
2670       }
2671       break;
2672
2673       case 'p':
2674       {
2675         if (config_pid_file != NULL)
2676           free (config_pid_file);
2677         config_pid_file = strdup (optarg);
2678         if (config_pid_file == NULL)
2679         {
2680           fprintf (stderr, "read_options: strdup failed.\n");
2681           return (3);
2682         }
2683       }
2684       break;
2685
2686       case 'F':
2687         config_flush_at_shutdown = 1;
2688         break;
2689
2690       case 'j':
2691       {
2692         struct stat statbuf;
2693         const char *dir = optarg;
2694
2695         status = stat(dir, &statbuf);
2696         if (status != 0)
2697         {
2698           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2699           return 6;
2700         }
2701
2702         if (!S_ISDIR(statbuf.st_mode)
2703             || access(dir, R_OK|W_OK|X_OK) != 0)
2704         {
2705           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2706                   errno ? rrd_strerror(errno) : "");
2707           return 6;
2708         }
2709
2710         journal_cur = malloc(PATH_MAX + 1);
2711         journal_old = malloc(PATH_MAX + 1);
2712         if (journal_cur == NULL || journal_old == NULL)
2713         {
2714           fprintf(stderr, "malloc failure for journal files\n");
2715           return 6;
2716         }
2717         else 
2718         {
2719           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2720           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2721         }
2722       }
2723       break;
2724
2725       case 'h':
2726       case '?':
2727         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2728             "\n"
2729             "Usage: rrdcached [options]\n"
2730             "\n"
2731             "Valid options are:\n"
2732             "  -l <address>  Socket address to listen to.\n"
2733             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2734             "  -w <seconds>  Interval in which to write data.\n"
2735             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2736             "  -t <threads>  Number of write threads.\n"
2737             "  -f <seconds>  Interval in which to flush dead data.\n"
2738             "  -p <file>     Location of the PID-file.\n"
2739             "  -b <dir>      Base directory to change to.\n"
2740             "  -B            Restrict file access to paths within -b <dir>\n"
2741             "  -g            Do not fork and run in the foreground.\n"
2742             "  -j <dir>      Directory in which to create the journal files.\n"
2743             "  -F            Always flush all updates at shutdown\n"
2744             "\n"
2745             "For more information and a detailed description of all options "
2746             "please refer\n"
2747             "to the rrdcached(1) manual page.\n",
2748             VERSION);
2749         status = -1;
2750         break;
2751     } /* switch (option) */
2752   } /* while (getopt) */
2753
2754   /* advise the user when values are not sane */
2755   if (config_flush_interval < 2 * config_write_interval)
2756     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2757             " 2x write interval (-w) !\n");
2758   if (config_write_jitter > config_write_interval)
2759     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2760             " write interval (-w) !\n");
2761
2762   if (config_write_base_only && config_base_dir == NULL)
2763     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2764             "  Consult the rrdcached documentation\n");
2765
2766   if (journal_cur == NULL)
2767     config_flush_at_shutdown = 1;
2768
2769   return (status);
2770 } /* }}} int read_options */
2771
2772 int main (int argc, char **argv)
2773 {
2774   int status;
2775
2776   status = read_options (argc, argv);
2777   if (status != 0)
2778   {
2779     if (status < 0)
2780       status = 0;
2781     return (status);
2782   }
2783
2784   status = daemonize ();
2785   if (status != 0)
2786   {
2787     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2788     return (1);
2789   }
2790
2791   journal_init();
2792
2793   /* start the queue threads */
2794   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2795   if (queue_threads == NULL)
2796   {
2797     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2798     cleanup();
2799     return (1);
2800   }
2801   for (int i = 0; i < config_queue_threads; i++)
2802   {
2803     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2804     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2805     if (status != 0)
2806     {
2807       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2808       cleanup();
2809       return (1);
2810     }
2811   }
2812
2813   /* start the flush thread */
2814   memset(&flush_thread, 0, sizeof(flush_thread));
2815   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2816   if (status != 0)
2817   {
2818     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2819     cleanup();
2820     return (1);
2821   }
2822
2823   listen_thread_main (NULL);
2824   cleanup ();
2825
2826   return (0);
2827 } /* int main */
2828
2829 /*
2830  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2831  */