Handle race condition for "UPDATE" with new files. Problem found by Sebastian Harl...
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
161
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
167
168   char *syntax;
169   char *help;
170 };
171
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
188
189 struct callback_flush_data_s
190 {
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
197
198 enum queue_side_e
199 {
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
204
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
208
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
214
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
217
218 static int do_shutdown = 0;
219
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
223
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
230
231 /* Cache stuff */
232 static GTree          *cache_tree = NULL;
233 static cache_item_t   *cache_queue_head = NULL;
234 static cache_item_t   *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
236
237 static int config_write_interval = 300;
238 static int config_write_jitter   = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
245
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
248
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
257
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
266
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
269
270 /* 
271  * Functions
272  */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276   do_shutdown++;
277   pthread_cond_broadcast(&flush_cond);
278   pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
280
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283   sig_common("INT");
284 } /* }}} void sig_int_handler */
285
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288   sig_common("TERM");
289 } /* }}} void sig_term_handler */
290
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293   config_flush_at_shutdown = 1;
294   sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
296
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299   config_flush_at_shutdown = 0;
300   sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
302
303 static void install_signal_handlers(void) /* {{{ */
304 {
305   /* These structures are static, because `sigaction' behaves weird if the are
306    * overwritten.. */
307   static struct sigaction sa_int;
308   static struct sigaction sa_term;
309   static struct sigaction sa_pipe;
310   static struct sigaction sa_usr1;
311   static struct sigaction sa_usr2;
312
313   /* Install signal handlers */
314   memset (&sa_int, 0, sizeof (sa_int));
315   sa_int.sa_handler = sig_int_handler;
316   sigaction (SIGINT, &sa_int, NULL);
317
318   memset (&sa_term, 0, sizeof (sa_term));
319   sa_term.sa_handler = sig_term_handler;
320   sigaction (SIGTERM, &sa_term, NULL);
321
322   memset (&sa_pipe, 0, sizeof (sa_pipe));
323   sa_pipe.sa_handler = SIG_IGN;
324   sigaction (SIGPIPE, &sa_pipe, NULL);
325
326   memset (&sa_pipe, 0, sizeof (sa_usr1));
327   sa_usr1.sa_handler = sig_usr1_handler;
328   sigaction (SIGUSR1, &sa_usr1, NULL);
329
330   memset (&sa_usr2, 0, sizeof (sa_usr2));
331   sa_usr2.sa_handler = sig_usr2_handler;
332   sigaction (SIGUSR2, &sa_usr2, NULL);
333
334 } /* }}} void install_signal_handlers */
335
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338   int fd;
339   char *file;
340
341   file = (config_pid_file != NULL)
342     ? config_pid_file
343     : LOCALSTATEDIR "/run/rrdcached.pid";
344
345   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346   if (fd < 0)
347     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348             action, file, rrd_strerror(errno));
349
350   return(fd);
351 } /* }}} static int open_pidfile */
352
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356   int pid_fd;
357   pid_t pid;
358   char pid_str[16];
359
360   pid_fd = open_pidfile("open", O_RDWR);
361   if (pid_fd < 0)
362     return pid_fd;
363
364   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365     return -1;
366
367   pid = atoi(pid_str);
368   if (pid <= 0)
369     return -1;
370
371   /* another running process that we can signal COULD be
372    * a competing rrdcached */
373   if (pid != getpid() && kill(pid, 0) == 0)
374   {
375     fprintf(stderr,
376             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377     close(pid_fd);
378     return -1;
379   }
380
381   lseek(pid_fd, 0, SEEK_SET);
382   if (ftruncate(pid_fd, 0) == -1)
383   {
384     fprintf(stderr,
385             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
386     close(pid_fd);
387     return -1;
388   }
389
390   fprintf(stderr,
391           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
392           "rrdcached: starting normally.\n", pid);
393
394   return pid_fd;
395 } /* }}} static int check_pidfile */
396
397 static int write_pidfile (int fd) /* {{{ */
398 {
399   pid_t pid;
400   FILE *fh;
401
402   pid = getpid ();
403
404   fh = fdopen (fd, "w");
405   if (fh == NULL)
406   {
407     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
408     close(fd);
409     return (-1);
410   }
411
412   fprintf (fh, "%i\n", (int) pid);
413   fclose (fh);
414
415   return (0);
416 } /* }}} int write_pidfile */
417
418 static int remove_pidfile (void) /* {{{ */
419 {
420   char *file;
421   int status;
422
423   file = (config_pid_file != NULL)
424     ? config_pid_file
425     : LOCALSTATEDIR "/run/rrdcached.pid";
426
427   status = unlink (file);
428   if (status == 0)
429     return (0);
430   return (errno);
431 } /* }}} int remove_pidfile */
432
433 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
434 {
435   char *eol;
436
437   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
438                sock->next_read - sock->next_cmd);
439
440   if (eol == NULL)
441   {
442     /* no commands left, move remainder back to front of rbuf */
443     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
444             sock->next_read - sock->next_cmd);
445     sock->next_read -= sock->next_cmd;
446     sock->next_cmd = 0;
447     *len = 0;
448     return NULL;
449   }
450   else
451   {
452     char *cmd = sock->rbuf + sock->next_cmd;
453     *eol = '\0';
454
455     sock->next_cmd = eol - sock->rbuf + 1;
456
457     if (eol > sock->rbuf && *(eol-1) == '\r')
458       *(--eol) = '\0'; /* handle "\r\n" EOL */
459
460     *len = eol - cmd;
461
462     return cmd;
463   }
464
465   /* NOTREACHED */
466   assert(1==0);
467 }
468
469 /* add the characters directly to the write buffer */
470 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
471 {
472   char *new_buf;
473
474   assert(sock != NULL);
475
476   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
477   if (new_buf == NULL)
478   {
479     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
480     return -1;
481   }
482
483   strncpy(new_buf + sock->wbuf_len, str, len + 1);
484
485   sock->wbuf = new_buf;
486   sock->wbuf_len += len;
487
488   return 0;
489 } /* }}} static int add_to_wbuf */
490
491 /* add the text to the "extra" info that's sent after the status line */
492 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
493 {
494   va_list argp;
495   char buffer[CMD_MAX];
496   int len;
497
498   if (sock == NULL) return 0; /* journal replay mode */
499   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
500
501   va_start(argp, fmt);
502 #ifdef HAVE_VSNPRINTF
503   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
504 #else
505   len = vsprintf(buffer, fmt, argp);
506 #endif
507   va_end(argp);
508   if (len < 0)
509   {
510     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
511     return -1;
512   }
513
514   return add_to_wbuf(sock, buffer, len);
515 } /* }}} static int add_response_info */
516
517 static int count_lines(char *str) /* {{{ */
518 {
519   int lines = 0;
520
521   if (str != NULL)
522   {
523     while ((str = strchr(str, '\n')) != NULL)
524     {
525       ++lines;
526       ++str;
527     }
528   }
529
530   return lines;
531 } /* }}} static int count_lines */
532
533 /* send the response back to the user.
534  * returns 0 on success, -1 on error
535  * write buffer is always zeroed after this call */
536 static int send_response (listen_socket_t *sock, response_code rc,
537                           char *fmt, ...) /* {{{ */
538 {
539   va_list argp;
540   char buffer[CMD_MAX];
541   int lines;
542   ssize_t wrote;
543   int rclen, len;
544
545   if (sock == NULL) return rc;  /* journal replay mode */
546
547   if (sock->batch_start)
548   {
549     if (rc == RESP_OK)
550       return rc; /* no response on success during BATCH */
551     lines = sock->batch_cmd;
552   }
553   else if (rc == RESP_OK)
554     lines = count_lines(sock->wbuf);
555   else
556     lines = -1;
557
558   rclen = sprintf(buffer, "%d ", lines);
559   va_start(argp, fmt);
560 #ifdef HAVE_VSNPRINTF
561   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
562 #else
563   len = vsprintf(buffer+rclen, fmt, argp);
564 #endif
565   va_end(argp);
566   if (len < 0)
567     return -1;
568
569   len += rclen;
570
571   /* append the result to the wbuf, don't write to the user */
572   if (sock->batch_start)
573     return add_to_wbuf(sock, buffer, len);
574
575   /* first write must be complete */
576   if (len != write(sock->fd, buffer, len))
577   {
578     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
579     return -1;
580   }
581
582   if (sock->wbuf != NULL && rc == RESP_OK)
583   {
584     wrote = 0;
585     while (wrote < sock->wbuf_len)
586     {
587       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
588       if (wb <= 0)
589       {
590         RRDD_LOG(LOG_INFO, "send_response: could not write results");
591         return -1;
592       }
593       wrote += wb;
594     }
595   }
596
597   free(sock->wbuf); sock->wbuf = NULL;
598   sock->wbuf_len = 0;
599
600   return 0;
601 } /* }}} */
602
603 static void wipe_ci_values(cache_item_t *ci, time_t when)
604 {
605   ci->values = NULL;
606   ci->values_num = 0;
607
608   ci->last_flush_time = when;
609   if (config_write_jitter > 0)
610     ci->last_flush_time += (rrd_random() % config_write_jitter);
611 }
612
613 /* remove_from_queue
614  * remove a "cache_item_t" item from the queue.
615  * must hold 'cache_lock' when calling this
616  */
617 static void remove_from_queue(cache_item_t *ci) /* {{{ */
618 {
619   if (ci == NULL) return;
620   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
621
622   if (ci->prev == NULL)
623     cache_queue_head = ci->next; /* reset head */
624   else
625     ci->prev->next = ci->next;
626
627   if (ci->next == NULL)
628     cache_queue_tail = ci->prev; /* reset the tail */
629   else
630     ci->next->prev = ci->prev;
631
632   ci->next = ci->prev = NULL;
633   ci->flags &= ~CI_FLAGS_IN_QUEUE;
634
635   pthread_mutex_lock (&stats_lock);
636   assert (stats_queue_length > 0);
637   stats_queue_length--;
638   pthread_mutex_unlock (&stats_lock);
639
640 } /* }}} static void remove_from_queue */
641
642 /* free the resources associated with the cache_item_t
643  * must hold cache_lock when calling this function
644  */
645 static void *free_cache_item(cache_item_t *ci) /* {{{ */
646 {
647   if (ci == NULL) return NULL;
648
649   remove_from_queue(ci);
650
651   for (size_t i=0; i < ci->values_num; i++)
652     free(ci->values[i]);
653
654   free (ci->values);
655   free (ci->file);
656
657   /* in case anyone is waiting */
658   pthread_cond_broadcast(&ci->flushed);
659   pthread_cond_destroy(&ci->flushed);
660
661   free (ci);
662
663   return NULL;
664 } /* }}} static void *free_cache_item */
665
666 /*
667  * enqueue_cache_item:
668  * `cache_lock' must be acquired before calling this function!
669  */
670 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
671     queue_side_t side)
672 {
673   if (ci == NULL)
674     return (-1);
675
676   if (ci->values_num == 0)
677     return (0);
678
679   if (side == HEAD)
680   {
681     if (cache_queue_head == ci)
682       return 0;
683
684     /* remove if further down in queue */
685     remove_from_queue(ci);
686
687     ci->prev = NULL;
688     ci->next = cache_queue_head;
689     if (ci->next != NULL)
690       ci->next->prev = ci;
691     cache_queue_head = ci;
692
693     if (cache_queue_tail == NULL)
694       cache_queue_tail = cache_queue_head;
695   }
696   else /* (side == TAIL) */
697   {
698     /* We don't move values back in the list.. */
699     if (ci->flags & CI_FLAGS_IN_QUEUE)
700       return (0);
701
702     assert (ci->next == NULL);
703     assert (ci->prev == NULL);
704
705     ci->prev = cache_queue_tail;
706
707     if (cache_queue_tail == NULL)
708       cache_queue_head = ci;
709     else
710       cache_queue_tail->next = ci;
711
712     cache_queue_tail = ci;
713   }
714
715   ci->flags |= CI_FLAGS_IN_QUEUE;
716
717   pthread_cond_signal(&queue_cond);
718   pthread_mutex_lock (&stats_lock);
719   stats_queue_length++;
720   pthread_mutex_unlock (&stats_lock);
721
722   return (0);
723 } /* }}} int enqueue_cache_item */
724
725 /*
726  * tree_callback_flush:
727  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
728  * while this is in progress.
729  */
730 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
731     gpointer data)
732 {
733   cache_item_t *ci;
734   callback_flush_data_t *cfd;
735
736   ci = (cache_item_t *) value;
737   cfd = (callback_flush_data_t *) data;
738
739   if (ci->flags & CI_FLAGS_IN_QUEUE)
740     return FALSE;
741
742   if ((ci->last_flush_time <= cfd->abs_timeout)
743       && (ci->values_num > 0))
744   {
745     enqueue_cache_item (ci, TAIL);
746   }
747   else if ((do_shutdown != 0)
748       && (ci->values_num > 0))
749   {
750     enqueue_cache_item (ci, TAIL);
751   }
752   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
753       && (ci->values_num <= 0))
754   {
755     assert ((char *) key == ci->file);
756     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
757     {
758       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
759       return (FALSE);
760     }
761   }
762
763   return (FALSE);
764 } /* }}} gboolean tree_callback_flush */
765
766 static int flush_old_values (int max_age)
767 {
768   callback_flush_data_t cfd;
769   size_t k;
770
771   memset (&cfd, 0, sizeof (cfd));
772   /* Pass the current time as user data so that we don't need to call
773    * `time' for each node. */
774   cfd.now = time (NULL);
775   cfd.keys = NULL;
776   cfd.keys_num = 0;
777
778   if (max_age > 0)
779     cfd.abs_timeout = cfd.now - max_age;
780   else
781     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
782
783   /* `tree_callback_flush' will return the keys of all values that haven't
784    * been touched in the last `config_flush_interval' seconds in `cfd'.
785    * The char*'s in this array point to the same memory as ci->file, so we
786    * don't need to free them separately. */
787   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
788
789   for (k = 0; k < cfd.keys_num; k++)
790   {
791     /* should never fail, since we have held the cache_lock
792      * the entire time */
793     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
794   }
795
796   if (cfd.keys != NULL)
797   {
798     free (cfd.keys);
799     cfd.keys = NULL;
800   }
801
802   return (0);
803 } /* int flush_old_values */
804
805 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
806 {
807   struct timeval now;
808   struct timespec next_flush;
809   int status;
810
811   gettimeofday (&now, NULL);
812   next_flush.tv_sec = now.tv_sec + config_flush_interval;
813   next_flush.tv_nsec = 1000 * now.tv_usec;
814
815   pthread_mutex_lock(&cache_lock);
816
817   while (!do_shutdown)
818   {
819     gettimeofday (&now, NULL);
820     if ((now.tv_sec > next_flush.tv_sec)
821         || ((now.tv_sec == next_flush.tv_sec)
822           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
823     {
824       /* Flush all values that haven't been written in the last
825        * `config_write_interval' seconds. */
826       flush_old_values (config_write_interval);
827
828       /* Determine the time of the next cache flush. */
829       next_flush.tv_sec =
830         now.tv_sec + next_flush.tv_sec % config_flush_interval;
831
832       /* unlock the cache while we rotate so we don't block incoming
833        * updates if the fsync() blocks on disk I/O */
834       pthread_mutex_unlock(&cache_lock);
835       journal_rotate();
836       pthread_mutex_lock(&cache_lock);
837     }
838
839     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
840     if (status != 0 && status != ETIMEDOUT)
841     {
842       RRDD_LOG (LOG_ERR, "flush_thread_main: "
843                 "pthread_cond_timedwait returned %i.", status);
844     }
845   }
846
847   if (config_flush_at_shutdown)
848     flush_old_values (-1); /* flush everything */
849
850   pthread_mutex_unlock(&cache_lock);
851
852   return NULL;
853 } /* void *flush_thread_main */
854
855 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
856 {
857   pthread_mutex_lock (&cache_lock);
858
859   while (!do_shutdown
860          || (cache_queue_head != NULL && config_flush_at_shutdown))
861   {
862     cache_item_t *ci;
863     char *file;
864     char **values;
865     size_t values_num;
866     int status;
867
868     /* Now, check if there's something to store away. If not, wait until
869      * something comes in.  if we are shutting down, do not wait around.  */
870     if (cache_queue_head == NULL && !do_shutdown)
871     {
872       status = pthread_cond_wait (&queue_cond, &cache_lock);
873       if ((status != 0) && (status != ETIMEDOUT))
874       {
875         RRDD_LOG (LOG_ERR, "queue_thread_main: "
876             "pthread_cond_wait returned %i.", status);
877       }
878     }
879
880     /* Check if a value has arrived. This may be NULL if we timed out or there
881      * was an interrupt such as a signal. */
882     if (cache_queue_head == NULL)
883       continue;
884
885     ci = cache_queue_head;
886
887     /* copy the relevant parts */
888     file = strdup (ci->file);
889     if (file == NULL)
890     {
891       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
892       continue;
893     }
894
895     assert(ci->values != NULL);
896     assert(ci->values_num > 0);
897
898     values = ci->values;
899     values_num = ci->values_num;
900
901     wipe_ci_values(ci, time(NULL));
902     remove_from_queue(ci);
903
904     pthread_mutex_unlock (&cache_lock);
905
906     rrd_clear_error ();
907     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
908     if (status != 0)
909     {
910       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
911           "rrd_update_r (%s) failed with status %i. (%s)",
912           file, status, rrd_get_error());
913     }
914
915     journal_write("wrote", file);
916
917     /* Search again in the tree.  It's possible someone issued a "FORGET"
918      * while we were writing the update values. */
919     pthread_mutex_lock(&cache_lock);
920     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
921     if (ci)
922       pthread_cond_broadcast(&ci->flushed);
923     pthread_mutex_unlock(&cache_lock);
924
925     rrd_free_ptrs((void ***) &values, &values_num);
926     free(file);
927
928     if (status == 0)
929     {
930       pthread_mutex_lock (&stats_lock);
931       stats_updates_written++;
932       stats_data_sets_written += values_num;
933       pthread_mutex_unlock (&stats_lock);
934     }
935
936     pthread_mutex_lock (&cache_lock);
937   }
938   pthread_mutex_unlock (&cache_lock);
939
940   return (NULL);
941 } /* }}} void *queue_thread_main */
942
943 static int buffer_get_field (char **buffer_ret, /* {{{ */
944     size_t *buffer_size_ret, char **field_ret)
945 {
946   char *buffer;
947   size_t buffer_pos;
948   size_t buffer_size;
949   char *field;
950   size_t field_size;
951   int status;
952
953   buffer = *buffer_ret;
954   buffer_pos = 0;
955   buffer_size = *buffer_size_ret;
956   field = *buffer_ret;
957   field_size = 0;
958
959   if (buffer_size <= 0)
960     return (-1);
961
962   /* This is ensured by `handle_request'. */
963   assert (buffer[buffer_size - 1] == '\0');
964
965   status = -1;
966   while (buffer_pos < buffer_size)
967   {
968     /* Check for end-of-field or end-of-buffer */
969     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
970     {
971       field[field_size] = 0;
972       field_size++;
973       buffer_pos++;
974       status = 0;
975       break;
976     }
977     /* Handle escaped characters. */
978     else if (buffer[buffer_pos] == '\\')
979     {
980       if (buffer_pos >= (buffer_size - 1))
981         break;
982       buffer_pos++;
983       field[field_size] = buffer[buffer_pos];
984       field_size++;
985       buffer_pos++;
986     }
987     /* Normal operation */ 
988     else
989     {
990       field[field_size] = buffer[buffer_pos];
991       field_size++;
992       buffer_pos++;
993     }
994   } /* while (buffer_pos < buffer_size) */
995
996   if (status != 0)
997     return (status);
998
999   *buffer_ret = buffer + buffer_pos;
1000   *buffer_size_ret = buffer_size - buffer_pos;
1001   *field_ret = field;
1002
1003   return (0);
1004 } /* }}} int buffer_get_field */
1005
1006 /* if we're restricting writes to the base directory,
1007  * check whether the file falls within the dir
1008  * returns 1 if OK, otherwise 0
1009  */
1010 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1011 {
1012   assert(file != NULL);
1013
1014   if (!config_write_base_only
1015       || sock == NULL /* journal replay */
1016       || config_base_dir == NULL)
1017     return 1;
1018
1019   if (strstr(file, "../") != NULL) goto err;
1020
1021   /* relative paths without "../" are ok */
1022   if (*file != '/') return 1;
1023
1024   /* file must be of the format base + "/" + <1+ char filename> */
1025   if (strlen(file) < _config_base_dir_len + 2) goto err;
1026   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1027   if (*(file + _config_base_dir_len) != '/') goto err;
1028
1029   return 1;
1030
1031 err:
1032   if (sock != NULL && sock->fd >= 0)
1033     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1034
1035   return 0;
1036 } /* }}} static int check_file_access */
1037
1038 /* when using a base dir, convert relative paths to absolute paths.
1039  * if necessary, modifies the "filename" pointer to point
1040  * to the new path created in "tmp".  "tmp" is provided
1041  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1042  *
1043  * this allows us to optimize for the expected case (absolute path)
1044  * with a no-op.
1045  */
1046 static void get_abs_path(char **filename, char *tmp)
1047 {
1048   assert(tmp != NULL);
1049   assert(filename != NULL && *filename != NULL);
1050
1051   if (config_base_dir == NULL || **filename == '/')
1052     return;
1053
1054   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1055   *filename = tmp;
1056 } /* }}} static int get_abs_path */
1057
1058 /* returns 1 if we have the required privilege level,
1059  * otherwise issue an error to the user on sock */
1060 static int has_privilege (listen_socket_t *sock, /* {{{ */
1061                           socket_privilege priv)
1062 {
1063   if (sock == NULL) /* journal replay */
1064     return 1;
1065
1066   if (sock->privilege >= priv)
1067     return 1;
1068
1069   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1070 } /* }}} static int has_privilege */
1071
1072 static int flush_file (const char *filename) /* {{{ */
1073 {
1074   cache_item_t *ci;
1075
1076   pthread_mutex_lock (&cache_lock);
1077
1078   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1079   if (ci == NULL)
1080   {
1081     pthread_mutex_unlock (&cache_lock);
1082     return (ENOENT);
1083   }
1084
1085   if (ci->values_num > 0)
1086   {
1087     /* Enqueue at head */
1088     enqueue_cache_item (ci, HEAD);
1089     pthread_cond_wait(&ci->flushed, &cache_lock);
1090   }
1091
1092   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1093    * may have been purged during our cond_wait() */
1094
1095   pthread_mutex_unlock(&cache_lock);
1096
1097   return (0);
1098 } /* }}} int flush_file */
1099
1100 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1101 {
1102   char *err = "Syntax error.\n";
1103
1104   if (cmd && cmd->syntax)
1105     err = cmd->syntax;
1106
1107   return send_response(sock, RESP_ERR, "Usage: %s", err);
1108 } /* }}} static int syntax_error() */
1109
1110 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1111 {
1112   uint64_t copy_queue_length;
1113   uint64_t copy_updates_received;
1114   uint64_t copy_flush_received;
1115   uint64_t copy_updates_written;
1116   uint64_t copy_data_sets_written;
1117   uint64_t copy_journal_bytes;
1118   uint64_t copy_journal_rotate;
1119
1120   uint64_t tree_nodes_number;
1121   uint64_t tree_depth;
1122
1123   pthread_mutex_lock (&stats_lock);
1124   copy_queue_length       = stats_queue_length;
1125   copy_updates_received   = stats_updates_received;
1126   copy_flush_received     = stats_flush_received;
1127   copy_updates_written    = stats_updates_written;
1128   copy_data_sets_written  = stats_data_sets_written;
1129   copy_journal_bytes      = stats_journal_bytes;
1130   copy_journal_rotate     = stats_journal_rotate;
1131   pthread_mutex_unlock (&stats_lock);
1132
1133   pthread_mutex_lock (&cache_lock);
1134   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1135   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1136   pthread_mutex_unlock (&cache_lock);
1137
1138   add_response_info(sock,
1139                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1140   add_response_info(sock,
1141                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1142   add_response_info(sock,
1143                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1144   add_response_info(sock,
1145                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1146   add_response_info(sock,
1147                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1148   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1149   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1150   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1151   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1152
1153   send_response(sock, RESP_OK, "Statistics follow\n");
1154
1155   return (0);
1156 } /* }}} int handle_request_stats */
1157
1158 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1159 {
1160   char *file, file_tmp[PATH_MAX];
1161   int status;
1162
1163   status = buffer_get_field (&buffer, &buffer_size, &file);
1164   if (status != 0)
1165   {
1166     return syntax_error(sock,cmd);
1167   }
1168   else
1169   {
1170     pthread_mutex_lock(&stats_lock);
1171     stats_flush_received++;
1172     pthread_mutex_unlock(&stats_lock);
1173
1174     get_abs_path(&file, file_tmp);
1175     if (!check_file_access(file, sock)) return 0;
1176
1177     status = flush_file (file);
1178     if (status == 0)
1179       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1180     else if (status == ENOENT)
1181     {
1182       /* no file in our tree; see whether it exists at all */
1183       struct stat statbuf;
1184
1185       memset(&statbuf, 0, sizeof(statbuf));
1186       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1187         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1188       else
1189         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1190     }
1191     else if (status < 0)
1192       return send_response(sock, RESP_ERR, "Internal error.\n");
1193     else
1194       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1195   }
1196
1197   /* NOTREACHED */
1198   assert(1==0);
1199 } /* }}} int handle_request_flush */
1200
1201 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1202 {
1203   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1204
1205   pthread_mutex_lock(&cache_lock);
1206   flush_old_values(-1);
1207   pthread_mutex_unlock(&cache_lock);
1208
1209   return send_response(sock, RESP_OK, "Started flush.\n");
1210 } /* }}} static int handle_request_flushall */
1211
1212 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1213 {
1214   int status;
1215   char *file, file_tmp[PATH_MAX];
1216   cache_item_t *ci;
1217
1218   status = buffer_get_field(&buffer, &buffer_size, &file);
1219   if (status != 0)
1220     return syntax_error(sock,cmd);
1221
1222   get_abs_path(&file, file_tmp);
1223
1224   pthread_mutex_lock(&cache_lock);
1225   ci = g_tree_lookup(cache_tree, file);
1226   if (ci == NULL)
1227   {
1228     pthread_mutex_unlock(&cache_lock);
1229     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1230   }
1231
1232   for (size_t i=0; i < ci->values_num; i++)
1233     add_response_info(sock, "%s\n", ci->values[i]);
1234
1235   pthread_mutex_unlock(&cache_lock);
1236   return send_response(sock, RESP_OK, "updates pending\n");
1237 } /* }}} static int handle_request_pending */
1238
1239 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1240 {
1241   int status;
1242   gboolean found;
1243   char *file, file_tmp[PATH_MAX];
1244
1245   status = buffer_get_field(&buffer, &buffer_size, &file);
1246   if (status != 0)
1247     return syntax_error(sock,cmd);
1248
1249   get_abs_path(&file, file_tmp);
1250   if (!check_file_access(file, sock)) return 0;
1251
1252   pthread_mutex_lock(&cache_lock);
1253   found = g_tree_remove(cache_tree, file);
1254   pthread_mutex_unlock(&cache_lock);
1255
1256   if (found == TRUE)
1257   {
1258     if (sock != NULL)
1259       journal_write("forget", file);
1260
1261     return send_response(sock, RESP_OK, "Gone!\n");
1262   }
1263   else
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265
1266   /* NOTREACHED */
1267   assert(1==0);
1268 } /* }}} static int handle_request_forget */
1269
1270 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1271 {
1272   cache_item_t *ci;
1273
1274   pthread_mutex_lock(&cache_lock);
1275
1276   ci = cache_queue_head;
1277   while (ci != NULL)
1278   {
1279     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1280     ci = ci->next;
1281   }
1282
1283   pthread_mutex_unlock(&cache_lock);
1284
1285   return send_response(sock, RESP_OK, "in queue.\n");
1286 } /* }}} int handle_request_queue */
1287
1288 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1289 {
1290   char *file, file_tmp[PATH_MAX];
1291   int values_num = 0;
1292   int status;
1293   char orig_buf[CMD_MAX];
1294
1295   cache_item_t *ci;
1296
1297   /* save it for the journal later */
1298   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1299
1300   status = buffer_get_field (&buffer, &buffer_size, &file);
1301   if (status != 0)
1302     return syntax_error(sock,cmd);
1303
1304   pthread_mutex_lock(&stats_lock);
1305   stats_updates_received++;
1306   pthread_mutex_unlock(&stats_lock);
1307
1308   get_abs_path(&file, file_tmp);
1309   if (!check_file_access(file, sock)) return 0;
1310
1311   pthread_mutex_lock (&cache_lock);
1312   ci = g_tree_lookup (cache_tree, file);
1313
1314   if (ci == NULL) /* {{{ */
1315   {
1316     struct stat statbuf;
1317     cache_item_t *tmp;
1318
1319     /* don't hold the lock while we setup; stat(2) might block */
1320     pthread_mutex_unlock(&cache_lock);
1321
1322     memset (&statbuf, 0, sizeof (statbuf));
1323     status = stat (file, &statbuf);
1324     if (status != 0)
1325     {
1326       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1327
1328       status = errno;
1329       if (status == ENOENT)
1330         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1331       else
1332         return send_response(sock, RESP_ERR,
1333                              "stat failed with error %i.\n", status);
1334     }
1335     if (!S_ISREG (statbuf.st_mode))
1336       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1337
1338     if (access(file, R_OK|W_OK) != 0)
1339       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1340                            file, rrd_strerror(errno));
1341
1342     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1343     if (ci == NULL)
1344     {
1345       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1346
1347       return send_response(sock, RESP_ERR, "malloc failed.\n");
1348     }
1349     memset (ci, 0, sizeof (cache_item_t));
1350
1351     ci->file = strdup (file);
1352     if (ci->file == NULL)
1353     {
1354       free (ci);
1355       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1356
1357       return send_response(sock, RESP_ERR, "strdup failed.\n");
1358     }
1359
1360     wipe_ci_values(ci, now);
1361     ci->flags = CI_FLAGS_IN_TREE;
1362     pthread_cond_init(&ci->flushed, NULL);
1363
1364     pthread_mutex_lock(&cache_lock);
1365
1366     /* another UPDATE might have added this entry in the meantime */
1367     tmp = g_tree_lookup (cache_tree, file);
1368     if (tmp == NULL)
1369       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1370     else
1371     {
1372       free_cache_item (ci);
1373       ci = tmp;
1374     }
1375   } /* }}} */
1376   assert (ci != NULL);
1377
1378   /* don't re-write updates in replay mode */
1379   if (sock != NULL)
1380     journal_write("update", orig_buf);
1381
1382   while (buffer_size > 0)
1383   {
1384     char *value;
1385     time_t stamp;
1386     char *eostamp;
1387
1388     status = buffer_get_field (&buffer, &buffer_size, &value);
1389     if (status != 0)
1390     {
1391       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1392       break;
1393     }
1394
1395     /* make sure update time is always moving forward */
1396     stamp = strtol(value, &eostamp, 10);
1397     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1398     {
1399       pthread_mutex_unlock(&cache_lock);
1400       return send_response(sock, RESP_ERR,
1401                            "Cannot find timestamp in '%s'!\n", value);
1402     }
1403     else if (stamp <= ci->last_update_stamp)
1404     {
1405       pthread_mutex_unlock(&cache_lock);
1406       return send_response(sock, RESP_ERR,
1407                            "illegal attempt to update using time %ld when last"
1408                            " update time is %ld (minimum one second step)\n",
1409                            stamp, ci->last_update_stamp);
1410     }
1411     else
1412       ci->last_update_stamp = stamp;
1413
1414     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1415     {
1416       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1417       continue;
1418     }
1419
1420     values_num++;
1421   }
1422
1423   if (((now - ci->last_flush_time) >= config_write_interval)
1424       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1425       && (ci->values_num > 0))
1426   {
1427     enqueue_cache_item (ci, TAIL);
1428   }
1429
1430   pthread_mutex_unlock (&cache_lock);
1431
1432   if (values_num < 1)
1433     return send_response(sock, RESP_ERR, "No values updated.\n");
1434   else
1435     return send_response(sock, RESP_OK,
1436                          "errors, enqueued %i value(s).\n", values_num);
1437
1438   /* NOTREACHED */
1439   assert(1==0);
1440
1441 } /* }}} int handle_request_update */
1442
1443 /* we came across a "WROTE" entry during journal replay.
1444  * throw away any values that we have accumulated for this file
1445  */
1446 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1447 {
1448   cache_item_t *ci;
1449   const char *file = buffer;
1450
1451   pthread_mutex_lock(&cache_lock);
1452
1453   ci = g_tree_lookup(cache_tree, file);
1454   if (ci == NULL)
1455   {
1456     pthread_mutex_unlock(&cache_lock);
1457     return (0);
1458   }
1459
1460   if (ci->values)
1461     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1462
1463   wipe_ci_values(ci, now);
1464   remove_from_queue(ci);
1465
1466   pthread_mutex_unlock(&cache_lock);
1467   return (0);
1468 } /* }}} int handle_request_wrote */
1469
1470 /* start "BATCH" processing */
1471 static int batch_start (HANDLER_PROTO) /* {{{ */
1472 {
1473   int status;
1474   if (sock->batch_start)
1475     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1476
1477   status = send_response(sock, RESP_OK,
1478                          "Go ahead.  End with dot '.' on its own line.\n");
1479   sock->batch_start = time(NULL);
1480   sock->batch_cmd = 0;
1481
1482   return status;
1483 } /* }}} static int batch_start */
1484
1485 /* finish "BATCH" processing and return results to the client */
1486 static int batch_done (HANDLER_PROTO) /* {{{ */
1487 {
1488   assert(sock->batch_start);
1489   sock->batch_start = 0;
1490   sock->batch_cmd  = 0;
1491   return send_response(sock, RESP_OK, "errors\n");
1492 } /* }}} static int batch_done */
1493
1494 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1495 {
1496   return -1;
1497 } /* }}} static int handle_request_quit */
1498
1499 struct command COMMANDS[] = {
1500   {
1501     "UPDATE",
1502     handle_request_update,
1503     PRIV_HIGH,
1504     CMD_CONTEXT_ANY,
1505     "UPDATE <filename> <values> [<values> ...]\n"
1506     ,
1507     "Adds the given file to the internal cache if it is not yet known and\n"
1508     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1509     "for details.\n"
1510     "\n"
1511     "Each <values> has the following form:\n"
1512     "  <values> = <time>:<value>[:<value>[...]]\n"
1513     "See the rrdupdate(1) manpage for details.\n"
1514   },
1515   {
1516     "WROTE",
1517     handle_request_wrote,
1518     PRIV_HIGH,
1519     CMD_CONTEXT_JOURNAL,
1520     NULL,
1521     NULL
1522   },
1523   {
1524     "FLUSH",
1525     handle_request_flush,
1526     PRIV_LOW,
1527     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1528     "FLUSH <filename>\n"
1529     ,
1530     "Adds the given filename to the head of the update queue and returns\n"
1531     "after it has been dequeued.\n"
1532   },
1533   {
1534     "FLUSHALL",
1535     handle_request_flushall,
1536     PRIV_HIGH,
1537     CMD_CONTEXT_CLIENT,
1538     "FLUSHALL\n"
1539     ,
1540     "Triggers writing of all pending updates.  Returns immediately.\n"
1541   },
1542   {
1543     "PENDING",
1544     handle_request_pending,
1545     PRIV_HIGH,
1546     CMD_CONTEXT_CLIENT,
1547     "PENDING <filename>\n"
1548     ,
1549     "Shows any 'pending' updates for a file, in order.\n"
1550     "The updates shown have not yet been written to the underlying RRD file.\n"
1551   },
1552   {
1553     "FORGET",
1554     handle_request_forget,
1555     PRIV_HIGH,
1556     CMD_CONTEXT_ANY,
1557     "FORGET <filename>\n"
1558     ,
1559     "Removes the file completely from the cache.\n"
1560     "Any pending updates for the file will be lost.\n"
1561   },
1562   {
1563     "QUEUE",
1564     handle_request_queue,
1565     PRIV_LOW,
1566     CMD_CONTEXT_CLIENT,
1567     "QUEUE\n"
1568     ,
1569         "Shows all files in the output queue.\n"
1570     "The output is zero or more lines in the following format:\n"
1571     "(where <num_vals> is the number of values to be written)\n"
1572     "\n"
1573     "<num_vals> <filename>\n"
1574   },
1575   {
1576     "STATS",
1577     handle_request_stats,
1578     PRIV_LOW,
1579     CMD_CONTEXT_CLIENT,
1580     "STATS\n"
1581     ,
1582     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1583     "a description of the values.\n"
1584   },
1585   {
1586     "HELP",
1587     handle_request_help,
1588     PRIV_LOW,
1589     CMD_CONTEXT_CLIENT,
1590     "HELP [<command>]\n",
1591     NULL, /* special! */
1592   },
1593   {
1594     "BATCH",
1595     batch_start,
1596     PRIV_LOW,
1597     CMD_CONTEXT_CLIENT,
1598     "BATCH\n"
1599     ,
1600     "The 'BATCH' command permits the client to initiate a bulk load\n"
1601     "   of commands to rrdcached.\n"
1602     "\n"
1603     "Usage:\n"
1604     "\n"
1605     "    client: BATCH\n"
1606     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1607     "    client: command #1\n"
1608     "    client: command #2\n"
1609     "    client: ... and so on\n"
1610     "    client: .\n"
1611     "    server: 2 errors\n"
1612     "    server: 7 message for command #7\n"
1613     "    server: 9 message for command #9\n"
1614     "\n"
1615     "For more information, consult the rrdcached(1) documentation.\n"
1616   },
1617   {
1618     ".",   /* BATCH terminator */
1619     batch_done,
1620     PRIV_LOW,
1621     CMD_CONTEXT_BATCH,
1622     NULL,
1623     NULL
1624   },
1625   {
1626     "QUIT",
1627     handle_request_quit,
1628     PRIV_LOW,
1629     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1630     "QUIT\n"
1631     ,
1632     "Disconnect from rrdcached.\n"
1633   },
1634   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1635 };
1636
1637 static struct command *find_command(char *cmd)
1638 {
1639   struct command *c = COMMANDS;
1640
1641   while (c->cmd != NULL)
1642   {
1643     if (strcasecmp(cmd, c->cmd) == 0)
1644       break;
1645     c++;
1646   }
1647
1648   if (c->cmd == NULL)
1649     return NULL;
1650   else
1651     return c;
1652 }
1653
1654 /* check whether commands are received in the expected context */
1655 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1656 {
1657   if (sock == NULL)
1658     return (cmd->context & CMD_CONTEXT_JOURNAL);
1659   else if (sock->batch_start)
1660     return (cmd->context & CMD_CONTEXT_BATCH);
1661   else
1662     return (cmd->context & CMD_CONTEXT_CLIENT);
1663
1664   /* NOTREACHED */
1665   assert(1==0);
1666 }
1667
1668 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1669 {
1670   int status;
1671   char *cmd_str;
1672   char *resp_txt;
1673   struct command *help = NULL;
1674
1675   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1676   if (status == 0)
1677     help = find_command(cmd_str);
1678
1679   if (help && (help->syntax || help->help))
1680   {
1681     char tmp[CMD_MAX];
1682
1683     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1684     resp_txt = tmp;
1685
1686     if (help->syntax)
1687       add_response_info(sock, "Usage: %s\n", help->syntax);
1688
1689     if (help->help)
1690       add_response_info(sock, "%s\n", help->help);
1691   }
1692   else
1693   {
1694     help = COMMANDS;
1695     resp_txt = "Command overview\n";
1696
1697     while (help->cmd)
1698     {
1699       if (help->syntax)
1700         add_response_info(sock, "%s", help->syntax);
1701       help++;
1702     }
1703   }
1704
1705   return send_response(sock, RESP_OK, resp_txt);
1706 } /* }}} int handle_request_help */
1707
1708 /* if sock==NULL, we are in journal replay mode */
1709 static int handle_request (DISPATCH_PROTO) /* {{{ */
1710 {
1711   char *buffer_ptr = buffer;
1712   char *cmd_str = NULL;
1713   struct command *cmd = NULL;
1714   int status;
1715
1716   assert (buffer[buffer_size - 1] == '\0');
1717
1718   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1719   if (status != 0)
1720   {
1721     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1722     return (-1);
1723   }
1724
1725   if (sock != NULL && sock->batch_start)
1726     sock->batch_cmd++;
1727
1728   cmd = find_command(cmd_str);
1729   if (!cmd)
1730     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1731
1732   status = has_privilege(sock, cmd->min_priv);
1733   if (status <= 0)
1734     return status;
1735
1736   if (!command_check_context(sock, cmd))
1737     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1738
1739   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1740 } /* }}} int handle_request */
1741
1742 /* MUST NOT hold journal_lock before calling this */
1743 static void journal_rotate(void) /* {{{ */
1744 {
1745   FILE *old_fh = NULL;
1746   int new_fd;
1747
1748   if (journal_cur == NULL || journal_old == NULL)
1749     return;
1750
1751   pthread_mutex_lock(&journal_lock);
1752
1753   /* we rotate this way (rename before close) so that the we can release
1754    * the journal lock as fast as possible.  Journal writes to the new
1755    * journal can proceed immediately after the new file is opened.  The
1756    * fclose can then block without affecting new updates.
1757    */
1758   if (journal_fh != NULL)
1759   {
1760     old_fh = journal_fh;
1761     journal_fh = NULL;
1762     rename(journal_cur, journal_old);
1763     ++stats_journal_rotate;
1764   }
1765
1766   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1767                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1768   if (new_fd >= 0)
1769   {
1770     journal_fh = fdopen(new_fd, "a");
1771     if (journal_fh == NULL)
1772       close(new_fd);
1773   }
1774
1775   pthread_mutex_unlock(&journal_lock);
1776
1777   if (old_fh != NULL)
1778     fclose(old_fh);
1779
1780   if (journal_fh == NULL)
1781   {
1782     RRDD_LOG(LOG_CRIT,
1783              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1784              journal_cur, rrd_strerror(errno));
1785
1786     RRDD_LOG(LOG_ERR,
1787              "JOURNALING DISABLED: All values will be flushed at shutdown");
1788     config_flush_at_shutdown = 1;
1789   }
1790
1791 } /* }}} static void journal_rotate */
1792
1793 static void journal_done(void) /* {{{ */
1794 {
1795   if (journal_cur == NULL)
1796     return;
1797
1798   pthread_mutex_lock(&journal_lock);
1799   if (journal_fh != NULL)
1800   {
1801     fclose(journal_fh);
1802     journal_fh = NULL;
1803   }
1804
1805   if (config_flush_at_shutdown)
1806   {
1807     RRDD_LOG(LOG_INFO, "removing journals");
1808     unlink(journal_old);
1809     unlink(journal_cur);
1810   }
1811   else
1812   {
1813     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1814              "journals will be used at next startup");
1815   }
1816
1817   pthread_mutex_unlock(&journal_lock);
1818
1819 } /* }}} static void journal_done */
1820
1821 static int journal_write(char *cmd, char *args) /* {{{ */
1822 {
1823   int chars;
1824
1825   if (journal_fh == NULL)
1826     return 0;
1827
1828   pthread_mutex_lock(&journal_lock);
1829   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1830   pthread_mutex_unlock(&journal_lock);
1831
1832   if (chars > 0)
1833   {
1834     pthread_mutex_lock(&stats_lock);
1835     stats_journal_bytes += chars;
1836     pthread_mutex_unlock(&stats_lock);
1837   }
1838
1839   return chars;
1840 } /* }}} static int journal_write */
1841
1842 static int journal_replay (const char *file) /* {{{ */
1843 {
1844   FILE *fh;
1845   int entry_cnt = 0;
1846   int fail_cnt = 0;
1847   uint64_t line = 0;
1848   char entry[CMD_MAX];
1849   time_t now;
1850
1851   if (file == NULL) return 0;
1852
1853   {
1854     char *reason = "unknown error";
1855     int status = 0;
1856     struct stat statbuf;
1857
1858     memset(&statbuf, 0, sizeof(statbuf));
1859     if (stat(file, &statbuf) != 0)
1860     {
1861       if (errno == ENOENT)
1862         return 0;
1863
1864       reason = "stat error";
1865       status = errno;
1866     }
1867     else if (!S_ISREG(statbuf.st_mode))
1868     {
1869       reason = "not a regular file";
1870       status = EPERM;
1871     }
1872     if (statbuf.st_uid != daemon_uid)
1873     {
1874       reason = "not owned by daemon user";
1875       status = EACCES;
1876     }
1877     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1878     {
1879       reason = "must not be user/group writable";
1880       status = EACCES;
1881     }
1882
1883     if (status != 0)
1884     {
1885       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1886                file, rrd_strerror(status), reason);
1887       return 0;
1888     }
1889   }
1890
1891   fh = fopen(file, "r");
1892   if (fh == NULL)
1893   {
1894     if (errno != ENOENT)
1895       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1896                file, rrd_strerror(errno));
1897     return 0;
1898   }
1899   else
1900     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1901
1902   now = time(NULL);
1903
1904   while(!feof(fh))
1905   {
1906     size_t entry_len;
1907
1908     ++line;
1909     if (fgets(entry, sizeof(entry), fh) == NULL)
1910       break;
1911     entry_len = strlen(entry);
1912
1913     /* check \n termination in case journal writing crashed mid-line */
1914     if (entry_len == 0)
1915       continue;
1916     else if (entry[entry_len - 1] != '\n')
1917     {
1918       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1919       ++fail_cnt;
1920       continue;
1921     }
1922
1923     entry[entry_len - 1] = '\0';
1924
1925     if (handle_request(NULL, now, entry, entry_len) == 0)
1926       ++entry_cnt;
1927     else
1928       ++fail_cnt;
1929   }
1930
1931   fclose(fh);
1932
1933   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1934            entry_cnt, fail_cnt);
1935
1936   return entry_cnt > 0 ? 1 : 0;
1937 } /* }}} static int journal_replay */
1938
1939 static void journal_init(void) /* {{{ */
1940 {
1941   int had_journal = 0;
1942
1943   if (journal_cur == NULL) return;
1944
1945   pthread_mutex_lock(&journal_lock);
1946
1947   RRDD_LOG(LOG_INFO, "checking for journal files");
1948
1949   had_journal += journal_replay(journal_old);
1950   had_journal += journal_replay(journal_cur);
1951
1952   /* it must have been a crash.  start a flush */
1953   if (had_journal && config_flush_at_shutdown)
1954     flush_old_values(-1);
1955
1956   pthread_mutex_unlock(&journal_lock);
1957   journal_rotate();
1958
1959   RRDD_LOG(LOG_INFO, "journal processing complete");
1960
1961 } /* }}} static void journal_init */
1962
1963 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1964 {
1965   assert(sock != NULL);
1966
1967   free(sock->rbuf);  sock->rbuf = NULL;
1968   free(sock->wbuf);  sock->wbuf = NULL;
1969   free(sock);
1970 } /* }}} void free_listen_socket */
1971
1972 static void close_connection(listen_socket_t *sock) /* {{{ */
1973 {
1974   if (sock->fd >= 0)
1975   {
1976     close(sock->fd);
1977     sock->fd = -1;
1978   }
1979
1980   free_listen_socket(sock);
1981
1982 } /* }}} void close_connection */
1983
1984 static void *connection_thread_main (void *args) /* {{{ */
1985 {
1986   listen_socket_t *sock;
1987   int fd;
1988
1989   sock = (listen_socket_t *) args;
1990   fd = sock->fd;
1991
1992   /* init read buffers */
1993   sock->next_read = sock->next_cmd = 0;
1994   sock->rbuf = malloc(RBUF_SIZE);
1995   if (sock->rbuf == NULL)
1996   {
1997     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1998     close_connection(sock);
1999     return NULL;
2000   }
2001
2002   pthread_mutex_lock (&connection_threads_lock);
2003   connection_threads_num++;
2004   pthread_mutex_unlock (&connection_threads_lock);
2005
2006   while (do_shutdown == 0)
2007   {
2008     char *cmd;
2009     ssize_t cmd_len;
2010     ssize_t rbytes;
2011     time_t now;
2012
2013     struct pollfd pollfd;
2014     int status;
2015
2016     pollfd.fd = fd;
2017     pollfd.events = POLLIN | POLLPRI;
2018     pollfd.revents = 0;
2019
2020     status = poll (&pollfd, 1, /* timeout = */ 500);
2021     if (do_shutdown)
2022       break;
2023     else if (status == 0) /* timeout */
2024       continue;
2025     else if (status < 0) /* error */
2026     {
2027       status = errno;
2028       if (status != EINTR)
2029         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2030       continue;
2031     }
2032
2033     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2034       break;
2035     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2036     {
2037       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2038           "poll(2) returned something unexpected: %#04hx",
2039           pollfd.revents);
2040       break;
2041     }
2042
2043     rbytes = read(fd, sock->rbuf + sock->next_read,
2044                   RBUF_SIZE - sock->next_read);
2045     if (rbytes < 0)
2046     {
2047       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2048       break;
2049     }
2050     else if (rbytes == 0)
2051       break; /* eof */
2052
2053     sock->next_read += rbytes;
2054
2055     if (sock->batch_start)
2056       now = sock->batch_start;
2057     else
2058       now = time(NULL);
2059
2060     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2061     {
2062       status = handle_request (sock, now, cmd, cmd_len+1);
2063       if (status != 0)
2064         goto out_close;
2065     }
2066   }
2067
2068 out_close:
2069   close_connection(sock);
2070
2071   /* Remove this thread from the connection threads list */
2072   pthread_mutex_lock (&connection_threads_lock);
2073   connection_threads_num--;
2074   if (connection_threads_num <= 0)
2075     pthread_cond_broadcast(&connection_threads_done);
2076   pthread_mutex_unlock (&connection_threads_lock);
2077
2078   return (NULL);
2079 } /* }}} void *connection_thread_main */
2080
2081 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2082 {
2083   int fd;
2084   struct sockaddr_un sa;
2085   listen_socket_t *temp;
2086   int status;
2087   const char *path;
2088
2089   path = sock->addr;
2090   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2091     path += strlen("unix:");
2092
2093   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2094       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2095   if (temp == NULL)
2096   {
2097     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2098     return (-1);
2099   }
2100   listen_fds = temp;
2101   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2102
2103   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2104   if (fd < 0)
2105   {
2106     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2107              rrd_strerror(errno));
2108     return (-1);
2109   }
2110
2111   memset (&sa, 0, sizeof (sa));
2112   sa.sun_family = AF_UNIX;
2113   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2114
2115   /* if we've gotten this far, we own the pid file.  any daemon started
2116    * with the same args must not be alive.  therefore, ensure that we can
2117    * create the socket...
2118    */
2119   unlink(path);
2120
2121   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2122   if (status != 0)
2123   {
2124     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2125              path, rrd_strerror(errno));
2126     close (fd);
2127     return (-1);
2128   }
2129
2130   status = listen (fd, /* backlog = */ 10);
2131   if (status != 0)
2132   {
2133     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2134              path, rrd_strerror(errno));
2135     close (fd);
2136     unlink (path);
2137     return (-1);
2138   }
2139
2140   listen_fds[listen_fds_num].fd = fd;
2141   listen_fds[listen_fds_num].family = PF_UNIX;
2142   strncpy(listen_fds[listen_fds_num].addr, path,
2143           sizeof (listen_fds[listen_fds_num].addr) - 1);
2144   listen_fds_num++;
2145
2146   return (0);
2147 } /* }}} int open_listen_socket_unix */
2148
2149 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2150 {
2151   struct addrinfo ai_hints;
2152   struct addrinfo *ai_res;
2153   struct addrinfo *ai_ptr;
2154   char addr_copy[NI_MAXHOST];
2155   char *addr;
2156   char *port;
2157   int status;
2158
2159   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2160   addr_copy[sizeof (addr_copy) - 1] = 0;
2161   addr = addr_copy;
2162
2163   memset (&ai_hints, 0, sizeof (ai_hints));
2164   ai_hints.ai_flags = 0;
2165 #ifdef AI_ADDRCONFIG
2166   ai_hints.ai_flags |= AI_ADDRCONFIG;
2167 #endif
2168   ai_hints.ai_family = AF_UNSPEC;
2169   ai_hints.ai_socktype = SOCK_STREAM;
2170
2171   port = NULL;
2172   if (*addr == '[') /* IPv6+port format */
2173   {
2174     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2175     addr++;
2176
2177     port = strchr (addr, ']');
2178     if (port == NULL)
2179     {
2180       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2181       return (-1);
2182     }
2183     *port = 0;
2184     port++;
2185
2186     if (*port == ':')
2187       port++;
2188     else if (*port == 0)
2189       port = NULL;
2190     else
2191     {
2192       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2193       return (-1);
2194     }
2195   } /* if (*addr = ']') */
2196   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2197   {
2198     port = rindex(addr, ':');
2199     if (port != NULL)
2200     {
2201       *port = 0;
2202       port++;
2203     }
2204   }
2205   ai_res = NULL;
2206   status = getaddrinfo (addr,
2207                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2208                         &ai_hints, &ai_res);
2209   if (status != 0)
2210   {
2211     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2212              addr, gai_strerror (status));
2213     return (-1);
2214   }
2215
2216   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2217   {
2218     int fd;
2219     listen_socket_t *temp;
2220     int one = 1;
2221
2222     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2223         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2224     if (temp == NULL)
2225     {
2226       fprintf (stderr,
2227                "rrdcached: open_listen_socket_network: realloc failed.\n");
2228       continue;
2229     }
2230     listen_fds = temp;
2231     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2232
2233     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2234     if (fd < 0)
2235     {
2236       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2237                rrd_strerror(errno));
2238       continue;
2239     }
2240
2241     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2242
2243     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2244     if (status != 0)
2245     {
2246       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2247                sock->addr, rrd_strerror(errno));
2248       close (fd);
2249       continue;
2250     }
2251
2252     status = listen (fd, /* backlog = */ 10);
2253     if (status != 0)
2254     {
2255       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2256                sock->addr, rrd_strerror(errno));
2257       close (fd);
2258       freeaddrinfo(ai_res);
2259       return (-1);
2260     }
2261
2262     listen_fds[listen_fds_num].fd = fd;
2263     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2264     listen_fds_num++;
2265   } /* for (ai_ptr) */
2266
2267   freeaddrinfo(ai_res);
2268   return (0);
2269 } /* }}} static int open_listen_socket_network */
2270
2271 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2272 {
2273   assert(sock != NULL);
2274   assert(sock->addr != NULL);
2275
2276   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2277       || sock->addr[0] == '/')
2278     return (open_listen_socket_unix(sock));
2279   else
2280     return (open_listen_socket_network(sock));
2281 } /* }}} int open_listen_socket */
2282
2283 static int close_listen_sockets (void) /* {{{ */
2284 {
2285   size_t i;
2286
2287   for (i = 0; i < listen_fds_num; i++)
2288   {
2289     close (listen_fds[i].fd);
2290
2291     if (listen_fds[i].family == PF_UNIX)
2292       unlink(listen_fds[i].addr);
2293   }
2294
2295   free (listen_fds);
2296   listen_fds = NULL;
2297   listen_fds_num = 0;
2298
2299   return (0);
2300 } /* }}} int close_listen_sockets */
2301
2302 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2303 {
2304   struct pollfd *pollfds;
2305   int pollfds_num;
2306   int status;
2307   int i;
2308
2309   if (listen_fds_num < 1)
2310   {
2311     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2312     return (NULL);
2313   }
2314
2315   pollfds_num = listen_fds_num;
2316   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2317   if (pollfds == NULL)
2318   {
2319     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2320     return (NULL);
2321   }
2322   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2323
2324   RRDD_LOG(LOG_INFO, "listening for connections");
2325
2326   while (do_shutdown == 0)
2327   {
2328     for (i = 0; i < pollfds_num; i++)
2329     {
2330       pollfds[i].fd = listen_fds[i].fd;
2331       pollfds[i].events = POLLIN | POLLPRI;
2332       pollfds[i].revents = 0;
2333     }
2334
2335     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2336     if (do_shutdown)
2337       break;
2338     else if (status == 0) /* timeout */
2339       continue;
2340     else if (status < 0) /* error */
2341     {
2342       status = errno;
2343       if (status != EINTR)
2344       {
2345         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2346       }
2347       continue;
2348     }
2349
2350     for (i = 0; i < pollfds_num; i++)
2351     {
2352       listen_socket_t *client_sock;
2353       struct sockaddr_storage client_sa;
2354       socklen_t client_sa_size;
2355       pthread_t tid;
2356       pthread_attr_t attr;
2357
2358       if (pollfds[i].revents == 0)
2359         continue;
2360
2361       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2362       {
2363         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2364             "poll(2) returned something unexpected for listen FD #%i.",
2365             pollfds[i].fd);
2366         continue;
2367       }
2368
2369       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2370       if (client_sock == NULL)
2371       {
2372         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2373         continue;
2374       }
2375       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2376
2377       client_sa_size = sizeof (client_sa);
2378       client_sock->fd = accept (pollfds[i].fd,
2379           (struct sockaddr *) &client_sa, &client_sa_size);
2380       if (client_sock->fd < 0)
2381       {
2382         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2383         free(client_sock);
2384         continue;
2385       }
2386
2387       pthread_attr_init (&attr);
2388       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2389
2390       status = pthread_create (&tid, &attr, connection_thread_main,
2391                                client_sock);
2392       if (status != 0)
2393       {
2394         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2395         close_connection(client_sock);
2396         continue;
2397       }
2398     } /* for (pollfds_num) */
2399   } /* while (do_shutdown == 0) */
2400
2401   RRDD_LOG(LOG_INFO, "starting shutdown");
2402
2403   close_listen_sockets ();
2404
2405   pthread_mutex_lock (&connection_threads_lock);
2406   while (connection_threads_num > 0)
2407     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2408   pthread_mutex_unlock (&connection_threads_lock);
2409
2410   free(pollfds);
2411
2412   return (NULL);
2413 } /* }}} void *listen_thread_main */
2414
2415 static int daemonize (void) /* {{{ */
2416 {
2417   int pid_fd;
2418   char *base_dir;
2419
2420   daemon_uid = geteuid();
2421
2422   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2423   if (pid_fd < 0)
2424     pid_fd = check_pidfile();
2425   if (pid_fd < 0)
2426     return pid_fd;
2427
2428   /* open all the listen sockets */
2429   if (config_listen_address_list_len > 0)
2430   {
2431     for (size_t i = 0; i < config_listen_address_list_len; i++)
2432       open_listen_socket (config_listen_address_list[i]);
2433
2434     rrd_free_ptrs((void ***) &config_listen_address_list,
2435                   &config_listen_address_list_len);
2436   }
2437   else
2438   {
2439     listen_socket_t sock;
2440     memset(&sock, 0, sizeof(sock));
2441     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2442     open_listen_socket (&sock);
2443   }
2444
2445   if (listen_fds_num < 1)
2446   {
2447     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2448     goto error;
2449   }
2450
2451   if (!stay_foreground)
2452   {
2453     pid_t child;
2454
2455     child = fork ();
2456     if (child < 0)
2457     {
2458       fprintf (stderr, "daemonize: fork(2) failed.\n");
2459       goto error;
2460     }
2461     else if (child > 0)
2462       exit(0);
2463
2464     /* Become session leader */
2465     setsid ();
2466
2467     /* Open the first three file descriptors to /dev/null */
2468     close (2);
2469     close (1);
2470     close (0);
2471
2472     open ("/dev/null", O_RDWR);
2473     if (dup(0) == -1 || dup(0) == -1){
2474         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2475     }
2476   } /* if (!stay_foreground) */
2477
2478   /* Change into the /tmp directory. */
2479   base_dir = (config_base_dir != NULL)
2480     ? config_base_dir
2481     : "/tmp";
2482
2483   if (chdir (base_dir) != 0)
2484   {
2485     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2486     goto error;
2487   }
2488
2489   install_signal_handlers();
2490
2491   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2492   RRDD_LOG(LOG_INFO, "starting up");
2493
2494   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2495                                 (GDestroyNotify) free_cache_item);
2496   if (cache_tree == NULL)
2497   {
2498     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2499     goto error;
2500   }
2501
2502   return write_pidfile (pid_fd);
2503
2504 error:
2505   remove_pidfile();
2506   return -1;
2507 } /* }}} int daemonize */
2508
2509 static int cleanup (void) /* {{{ */
2510 {
2511   do_shutdown++;
2512
2513   pthread_cond_broadcast (&flush_cond);
2514   pthread_join (flush_thread, NULL);
2515
2516   pthread_cond_broadcast (&queue_cond);
2517   for (int i = 0; i < config_queue_threads; i++)
2518     pthread_join (queue_threads[i], NULL);
2519
2520   if (config_flush_at_shutdown)
2521   {
2522     assert(cache_queue_head == NULL);
2523     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2524   }
2525
2526   journal_done();
2527   remove_pidfile ();
2528
2529   free(queue_threads);
2530   free(config_base_dir);
2531   free(config_pid_file);
2532   free(journal_cur);
2533   free(journal_old);
2534
2535   pthread_mutex_lock(&cache_lock);
2536   g_tree_destroy(cache_tree);
2537
2538   RRDD_LOG(LOG_INFO, "goodbye");
2539   closelog ();
2540
2541   return (0);
2542 } /* }}} int cleanup */
2543
2544 static int read_options (int argc, char **argv) /* {{{ */
2545 {
2546   int option;
2547   int status = 0;
2548
2549   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2550   {
2551     switch (option)
2552     {
2553       case 'g':
2554         stay_foreground=1;
2555         break;
2556
2557       case 'L':
2558       case 'l':
2559       {
2560         listen_socket_t *new;
2561
2562         new = malloc(sizeof(listen_socket_t));
2563         if (new == NULL)
2564         {
2565           fprintf(stderr, "read_options: malloc failed.\n");
2566           return(2);
2567         }
2568         memset(new, 0, sizeof(listen_socket_t));
2569
2570         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2571         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2572
2573         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2574                          &config_listen_address_list_len, new))
2575         {
2576           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2577           return (2);
2578         }
2579       }
2580       break;
2581
2582       case 'f':
2583       {
2584         int temp;
2585
2586         temp = atoi (optarg);
2587         if (temp > 0)
2588           config_flush_interval = temp;
2589         else
2590         {
2591           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2592           status = 3;
2593         }
2594       }
2595       break;
2596
2597       case 'w':
2598       {
2599         int temp;
2600
2601         temp = atoi (optarg);
2602         if (temp > 0)
2603           config_write_interval = temp;
2604         else
2605         {
2606           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2607           status = 2;
2608         }
2609       }
2610       break;
2611
2612       case 'z':
2613       {
2614         int temp;
2615
2616         temp = atoi(optarg);
2617         if (temp > 0)
2618           config_write_jitter = temp;
2619         else
2620         {
2621           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2622           status = 2;
2623         }
2624
2625         break;
2626       }
2627
2628       case 't':
2629       {
2630         int threads;
2631         threads = atoi(optarg);
2632         if (threads >= 1)
2633           config_queue_threads = threads;
2634         else
2635         {
2636           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2637           return 1;
2638         }
2639       }
2640       break;
2641
2642       case 'B':
2643         config_write_base_only = 1;
2644         break;
2645
2646       case 'b':
2647       {
2648         size_t len;
2649         char base_realpath[PATH_MAX];
2650
2651         if (config_base_dir != NULL)
2652           free (config_base_dir);
2653         config_base_dir = strdup (optarg);
2654         if (config_base_dir == NULL)
2655         {
2656           fprintf (stderr, "read_options: strdup failed.\n");
2657           return (3);
2658         }
2659
2660         /* make sure that the base directory is not resolved via
2661          * symbolic links.  this makes some performance-enhancing
2662          * assumptions possible (we don't have to resolve paths
2663          * that start with a "/")
2664          */
2665         if (realpath(config_base_dir, base_realpath) == NULL)
2666         {
2667           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2668           return 5;
2669         }
2670         else if (strncmp(config_base_dir,
2671                          base_realpath, sizeof(base_realpath)) != 0)
2672         {
2673           fprintf(stderr,
2674                   "Base directory (-b) resolved via file system links!\n"
2675                   "Please consult rrdcached '-b' documentation!\n"
2676                   "Consider specifying the real directory (%s)\n",
2677                   base_realpath);
2678           return 5;
2679         }
2680
2681         len = strlen (config_base_dir);
2682         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2683         {
2684           config_base_dir[len - 1] = 0;
2685           len--;
2686         }
2687
2688         if (len < 1)
2689         {
2690           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2691           return (4);
2692         }
2693
2694         _config_base_dir_len = len;
2695       }
2696       break;
2697
2698       case 'p':
2699       {
2700         if (config_pid_file != NULL)
2701           free (config_pid_file);
2702         config_pid_file = strdup (optarg);
2703         if (config_pid_file == NULL)
2704         {
2705           fprintf (stderr, "read_options: strdup failed.\n");
2706           return (3);
2707         }
2708       }
2709       break;
2710
2711       case 'F':
2712         config_flush_at_shutdown = 1;
2713         break;
2714
2715       case 'j':
2716       {
2717         struct stat statbuf;
2718         const char *dir = optarg;
2719
2720         status = stat(dir, &statbuf);
2721         if (status != 0)
2722         {
2723           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2724           return 6;
2725         }
2726
2727         if (!S_ISDIR(statbuf.st_mode)
2728             || access(dir, R_OK|W_OK|X_OK) != 0)
2729         {
2730           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2731                   errno ? rrd_strerror(errno) : "");
2732           return 6;
2733         }
2734
2735         journal_cur = malloc(PATH_MAX + 1);
2736         journal_old = malloc(PATH_MAX + 1);
2737         if (journal_cur == NULL || journal_old == NULL)
2738         {
2739           fprintf(stderr, "malloc failure for journal files\n");
2740           return 6;
2741         }
2742         else 
2743         {
2744           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2745           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2746         }
2747       }
2748       break;
2749
2750       case 'h':
2751       case '?':
2752         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2753             "\n"
2754             "Usage: rrdcached [options]\n"
2755             "\n"
2756             "Valid options are:\n"
2757             "  -l <address>  Socket address to listen to.\n"
2758             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2759             "  -w <seconds>  Interval in which to write data.\n"
2760             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2761             "  -t <threads>  Number of write threads.\n"
2762             "  -f <seconds>  Interval in which to flush dead data.\n"
2763             "  -p <file>     Location of the PID-file.\n"
2764             "  -b <dir>      Base directory to change to.\n"
2765             "  -B            Restrict file access to paths within -b <dir>\n"
2766             "  -g            Do not fork and run in the foreground.\n"
2767             "  -j <dir>      Directory in which to create the journal files.\n"
2768             "  -F            Always flush all updates at shutdown\n"
2769             "\n"
2770             "For more information and a detailed description of all options "
2771             "please refer\n"
2772             "to the rrdcached(1) manual page.\n",
2773             VERSION);
2774         status = -1;
2775         break;
2776     } /* switch (option) */
2777   } /* while (getopt) */
2778
2779   /* advise the user when values are not sane */
2780   if (config_flush_interval < 2 * config_write_interval)
2781     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2782             " 2x write interval (-w) !\n");
2783   if (config_write_jitter > config_write_interval)
2784     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2785             " write interval (-w) !\n");
2786
2787   if (config_write_base_only && config_base_dir == NULL)
2788     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2789             "  Consult the rrdcached documentation\n");
2790
2791   if (journal_cur == NULL)
2792     config_flush_at_shutdown = 1;
2793
2794   return (status);
2795 } /* }}} int read_options */
2796
2797 int main (int argc, char **argv)
2798 {
2799   int status;
2800
2801   status = read_options (argc, argv);
2802   if (status != 0)
2803   {
2804     if (status < 0)
2805       status = 0;
2806     return (status);
2807   }
2808
2809   status = daemonize ();
2810   if (status != 0)
2811   {
2812     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2813     return (1);
2814   }
2815
2816   journal_init();
2817
2818   /* start the queue threads */
2819   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2820   if (queue_threads == NULL)
2821   {
2822     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2823     cleanup();
2824     return (1);
2825   }
2826   for (int i = 0; i < config_queue_threads; i++)
2827   {
2828     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2829     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2830     if (status != 0)
2831     {
2832       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2833       cleanup();
2834       return (1);
2835     }
2836   }
2837
2838   /* start the flush thread */
2839   memset(&flush_thread, 0, sizeof(flush_thread));
2840   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2841   if (status != 0)
2842   {
2843     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2844     cleanup();
2845     return (1);
2846   }
2847
2848   listen_thread_main (NULL);
2849   cleanup ();
2850
2851   return (0);
2852 } /* int main */
2853
2854 /*
2855  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2856  */