Increment the DataSetsWritten counter before freeing the RRD values. --kevin
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
161
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
167
168   char *syntax;
169   char *help;
170 };
171
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
188
189 struct callback_flush_data_s
190 {
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
197
198 enum queue_side_e
199 {
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
204
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
208
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
214
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
217
218 enum {
219   RUNNING,              /* normal operation */
220   FLUSHING,             /* flushing remaining values */
221   SHUTDOWN              /* shutting down */
222 } state = RUNNING;
223
224 static pthread_t *queue_threads;
225 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
226 static int config_queue_threads = 4;
227
228 static pthread_t flush_thread;
229 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
230
231 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
232 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
233 static int connection_threads_num = 0;
234
235 /* Cache stuff */
236 static GTree          *cache_tree = NULL;
237 static cache_item_t   *cache_queue_head = NULL;
238 static cache_item_t   *cache_queue_tail = NULL;
239 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
240
241 static int config_write_interval = 300;
242 static int config_write_jitter   = 0;
243 static int config_flush_interval = 3600;
244 static int config_flush_at_shutdown = 0;
245 static char *config_pid_file = NULL;
246 static char *config_base_dir = NULL;
247 static size_t _config_base_dir_len = 0;
248 static int config_write_base_only = 0;
249
250 static listen_socket_t **config_listen_address_list = NULL;
251 static size_t config_listen_address_list_len = 0;
252
253 static uint64_t stats_queue_length = 0;
254 static uint64_t stats_updates_received = 0;
255 static uint64_t stats_flush_received = 0;
256 static uint64_t stats_updates_written = 0;
257 static uint64_t stats_data_sets_written = 0;
258 static uint64_t stats_journal_bytes = 0;
259 static uint64_t stats_journal_rotate = 0;
260 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
261
262 /* Journaled updates */
263 static char *journal_cur = NULL;
264 static char *journal_old = NULL;
265 static FILE *journal_fh = NULL;
266 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
267 static int journal_write(char *cmd, char *args);
268 static void journal_done(void);
269 static void journal_rotate(void);
270
271 /* prototypes for forward refernces */
272 static int handle_request_help (HANDLER_PROTO);
273
274 /* 
275  * Functions
276  */
277 static void sig_common (const char *sig) /* {{{ */
278 {
279   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
280   state = FLUSHING;
281   pthread_cond_broadcast(&flush_cond);
282   pthread_cond_broadcast(&queue_cond);
283 } /* }}} void sig_common */
284
285 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
286 {
287   sig_common("INT");
288 } /* }}} void sig_int_handler */
289
290 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
291 {
292   sig_common("TERM");
293 } /* }}} void sig_term_handler */
294
295 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
296 {
297   config_flush_at_shutdown = 1;
298   sig_common("USR1");
299 } /* }}} void sig_usr1_handler */
300
301 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
302 {
303   config_flush_at_shutdown = 0;
304   sig_common("USR2");
305 } /* }}} void sig_usr2_handler */
306
307 static void install_signal_handlers(void) /* {{{ */
308 {
309   /* These structures are static, because `sigaction' behaves weird if the are
310    * overwritten.. */
311   static struct sigaction sa_int;
312   static struct sigaction sa_term;
313   static struct sigaction sa_pipe;
314   static struct sigaction sa_usr1;
315   static struct sigaction sa_usr2;
316
317   /* Install signal handlers */
318   memset (&sa_int, 0, sizeof (sa_int));
319   sa_int.sa_handler = sig_int_handler;
320   sigaction (SIGINT, &sa_int, NULL);
321
322   memset (&sa_term, 0, sizeof (sa_term));
323   sa_term.sa_handler = sig_term_handler;
324   sigaction (SIGTERM, &sa_term, NULL);
325
326   memset (&sa_pipe, 0, sizeof (sa_pipe));
327   sa_pipe.sa_handler = SIG_IGN;
328   sigaction (SIGPIPE, &sa_pipe, NULL);
329
330   memset (&sa_pipe, 0, sizeof (sa_usr1));
331   sa_usr1.sa_handler = sig_usr1_handler;
332   sigaction (SIGUSR1, &sa_usr1, NULL);
333
334   memset (&sa_usr2, 0, sizeof (sa_usr2));
335   sa_usr2.sa_handler = sig_usr2_handler;
336   sigaction (SIGUSR2, &sa_usr2, NULL);
337
338 } /* }}} void install_signal_handlers */
339
340 static int open_pidfile(char *action, int oflag) /* {{{ */
341 {
342   int fd;
343   char *file;
344
345   file = (config_pid_file != NULL)
346     ? config_pid_file
347     : LOCALSTATEDIR "/run/rrdcached.pid";
348
349   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
350   if (fd < 0)
351     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
352             action, file, rrd_strerror(errno));
353
354   return(fd);
355 } /* }}} static int open_pidfile */
356
357 /* check existing pid file to see whether a daemon is running */
358 static int check_pidfile(void)
359 {
360   int pid_fd;
361   pid_t pid;
362   char pid_str[16];
363
364   pid_fd = open_pidfile("open", O_RDWR);
365   if (pid_fd < 0)
366     return pid_fd;
367
368   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
369     return -1;
370
371   pid = atoi(pid_str);
372   if (pid <= 0)
373     return -1;
374
375   /* another running process that we can signal COULD be
376    * a competing rrdcached */
377   if (pid != getpid() && kill(pid, 0) == 0)
378   {
379     fprintf(stderr,
380             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
381     close(pid_fd);
382     return -1;
383   }
384
385   lseek(pid_fd, 0, SEEK_SET);
386   if (ftruncate(pid_fd, 0) == -1)
387   {
388     fprintf(stderr,
389             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
390     close(pid_fd);
391     return -1;
392   }
393
394   fprintf(stderr,
395           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
396           "rrdcached: starting normally.\n", pid);
397
398   return pid_fd;
399 } /* }}} static int check_pidfile */
400
401 static int write_pidfile (int fd) /* {{{ */
402 {
403   pid_t pid;
404   FILE *fh;
405
406   pid = getpid ();
407
408   fh = fdopen (fd, "w");
409   if (fh == NULL)
410   {
411     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
412     close(fd);
413     return (-1);
414   }
415
416   fprintf (fh, "%i\n", (int) pid);
417   fclose (fh);
418
419   return (0);
420 } /* }}} int write_pidfile */
421
422 static int remove_pidfile (void) /* {{{ */
423 {
424   char *file;
425   int status;
426
427   file = (config_pid_file != NULL)
428     ? config_pid_file
429     : LOCALSTATEDIR "/run/rrdcached.pid";
430
431   status = unlink (file);
432   if (status == 0)
433     return (0);
434   return (errno);
435 } /* }}} int remove_pidfile */
436
437 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
438 {
439   char *eol;
440
441   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
442                sock->next_read - sock->next_cmd);
443
444   if (eol == NULL)
445   {
446     /* no commands left, move remainder back to front of rbuf */
447     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
448             sock->next_read - sock->next_cmd);
449     sock->next_read -= sock->next_cmd;
450     sock->next_cmd = 0;
451     *len = 0;
452     return NULL;
453   }
454   else
455   {
456     char *cmd = sock->rbuf + sock->next_cmd;
457     *eol = '\0';
458
459     sock->next_cmd = eol - sock->rbuf + 1;
460
461     if (eol > sock->rbuf && *(eol-1) == '\r')
462       *(--eol) = '\0'; /* handle "\r\n" EOL */
463
464     *len = eol - cmd;
465
466     return cmd;
467   }
468
469   /* NOTREACHED */
470   assert(1==0);
471 }
472
473 /* add the characters directly to the write buffer */
474 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
475 {
476   char *new_buf;
477
478   assert(sock != NULL);
479
480   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
481   if (new_buf == NULL)
482   {
483     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
484     return -1;
485   }
486
487   strncpy(new_buf + sock->wbuf_len, str, len + 1);
488
489   sock->wbuf = new_buf;
490   sock->wbuf_len += len;
491
492   return 0;
493 } /* }}} static int add_to_wbuf */
494
495 /* add the text to the "extra" info that's sent after the status line */
496 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
497 {
498   va_list argp;
499   char buffer[CMD_MAX];
500   int len;
501
502   if (sock == NULL) return 0; /* journal replay mode */
503   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
504
505   va_start(argp, fmt);
506 #ifdef HAVE_VSNPRINTF
507   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
508 #else
509   len = vsprintf(buffer, fmt, argp);
510 #endif
511   va_end(argp);
512   if (len < 0)
513   {
514     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
515     return -1;
516   }
517
518   return add_to_wbuf(sock, buffer, len);
519 } /* }}} static int add_response_info */
520
521 static int count_lines(char *str) /* {{{ */
522 {
523   int lines = 0;
524
525   if (str != NULL)
526   {
527     while ((str = strchr(str, '\n')) != NULL)
528     {
529       ++lines;
530       ++str;
531     }
532   }
533
534   return lines;
535 } /* }}} static int count_lines */
536
537 /* send the response back to the user.
538  * returns 0 on success, -1 on error
539  * write buffer is always zeroed after this call */
540 static int send_response (listen_socket_t *sock, response_code rc,
541                           char *fmt, ...) /* {{{ */
542 {
543   va_list argp;
544   char buffer[CMD_MAX];
545   int lines;
546   ssize_t wrote;
547   int rclen, len;
548
549   if (sock == NULL) return rc;  /* journal replay mode */
550
551   if (sock->batch_start)
552   {
553     if (rc == RESP_OK)
554       return rc; /* no response on success during BATCH */
555     lines = sock->batch_cmd;
556   }
557   else if (rc == RESP_OK)
558     lines = count_lines(sock->wbuf);
559   else
560     lines = -1;
561
562   rclen = sprintf(buffer, "%d ", lines);
563   va_start(argp, fmt);
564 #ifdef HAVE_VSNPRINTF
565   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
566 #else
567   len = vsprintf(buffer+rclen, fmt, argp);
568 #endif
569   va_end(argp);
570   if (len < 0)
571     return -1;
572
573   len += rclen;
574
575   /* append the result to the wbuf, don't write to the user */
576   if (sock->batch_start)
577     return add_to_wbuf(sock, buffer, len);
578
579   /* first write must be complete */
580   if (len != write(sock->fd, buffer, len))
581   {
582     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
583     return -1;
584   }
585
586   if (sock->wbuf != NULL && rc == RESP_OK)
587   {
588     wrote = 0;
589     while (wrote < sock->wbuf_len)
590     {
591       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
592       if (wb <= 0)
593       {
594         RRDD_LOG(LOG_INFO, "send_response: could not write results");
595         return -1;
596       }
597       wrote += wb;
598     }
599   }
600
601   free(sock->wbuf); sock->wbuf = NULL;
602   sock->wbuf_len = 0;
603
604   return 0;
605 } /* }}} */
606
607 static void wipe_ci_values(cache_item_t *ci, time_t when)
608 {
609   ci->values = NULL;
610   ci->values_num = 0;
611
612   ci->last_flush_time = when;
613   if (config_write_jitter > 0)
614     ci->last_flush_time += (rrd_random() % config_write_jitter);
615 }
616
617 /* remove_from_queue
618  * remove a "cache_item_t" item from the queue.
619  * must hold 'cache_lock' when calling this
620  */
621 static void remove_from_queue(cache_item_t *ci) /* {{{ */
622 {
623   if (ci == NULL) return;
624   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
625
626   if (ci->prev == NULL)
627     cache_queue_head = ci->next; /* reset head */
628   else
629     ci->prev->next = ci->next;
630
631   if (ci->next == NULL)
632     cache_queue_tail = ci->prev; /* reset the tail */
633   else
634     ci->next->prev = ci->prev;
635
636   ci->next = ci->prev = NULL;
637   ci->flags &= ~CI_FLAGS_IN_QUEUE;
638
639   pthread_mutex_lock (&stats_lock);
640   assert (stats_queue_length > 0);
641   stats_queue_length--;
642   pthread_mutex_unlock (&stats_lock);
643
644 } /* }}} static void remove_from_queue */
645
646 /* free the resources associated with the cache_item_t
647  * must hold cache_lock when calling this function
648  */
649 static void *free_cache_item(cache_item_t *ci) /* {{{ */
650 {
651   if (ci == NULL) return NULL;
652
653   remove_from_queue(ci);
654
655   for (size_t i=0; i < ci->values_num; i++)
656     free(ci->values[i]);
657
658   free (ci->values);
659   free (ci->file);
660
661   /* in case anyone is waiting */
662   pthread_cond_broadcast(&ci->flushed);
663   pthread_cond_destroy(&ci->flushed);
664
665   free (ci);
666
667   return NULL;
668 } /* }}} static void *free_cache_item */
669
670 /*
671  * enqueue_cache_item:
672  * `cache_lock' must be acquired before calling this function!
673  */
674 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
675     queue_side_t side)
676 {
677   if (ci == NULL)
678     return (-1);
679
680   if (ci->values_num == 0)
681     return (0);
682
683   if (side == HEAD)
684   {
685     if (cache_queue_head == ci)
686       return 0;
687
688     /* remove if further down in queue */
689     remove_from_queue(ci);
690
691     ci->prev = NULL;
692     ci->next = cache_queue_head;
693     if (ci->next != NULL)
694       ci->next->prev = ci;
695     cache_queue_head = ci;
696
697     if (cache_queue_tail == NULL)
698       cache_queue_tail = cache_queue_head;
699   }
700   else /* (side == TAIL) */
701   {
702     /* We don't move values back in the list.. */
703     if (ci->flags & CI_FLAGS_IN_QUEUE)
704       return (0);
705
706     assert (ci->next == NULL);
707     assert (ci->prev == NULL);
708
709     ci->prev = cache_queue_tail;
710
711     if (cache_queue_tail == NULL)
712       cache_queue_head = ci;
713     else
714       cache_queue_tail->next = ci;
715
716     cache_queue_tail = ci;
717   }
718
719   ci->flags |= CI_FLAGS_IN_QUEUE;
720
721   pthread_cond_signal(&queue_cond);
722   pthread_mutex_lock (&stats_lock);
723   stats_queue_length++;
724   pthread_mutex_unlock (&stats_lock);
725
726   return (0);
727 } /* }}} int enqueue_cache_item */
728
729 /*
730  * tree_callback_flush:
731  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
732  * while this is in progress.
733  */
734 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
735     gpointer data)
736 {
737   cache_item_t *ci;
738   callback_flush_data_t *cfd;
739
740   ci = (cache_item_t *) value;
741   cfd = (callback_flush_data_t *) data;
742
743   if (ci->flags & CI_FLAGS_IN_QUEUE)
744     return FALSE;
745
746   if (ci->values_num > 0
747       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
748   {
749     enqueue_cache_item (ci, TAIL);
750   }
751   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
752       && (ci->values_num <= 0))
753   {
754     assert ((char *) key == ci->file);
755     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
756     {
757       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
758       return (FALSE);
759     }
760   }
761
762   return (FALSE);
763 } /* }}} gboolean tree_callback_flush */
764
765 static int flush_old_values (int max_age)
766 {
767   callback_flush_data_t cfd;
768   size_t k;
769
770   memset (&cfd, 0, sizeof (cfd));
771   /* Pass the current time as user data so that we don't need to call
772    * `time' for each node. */
773   cfd.now = time (NULL);
774   cfd.keys = NULL;
775   cfd.keys_num = 0;
776
777   if (max_age > 0)
778     cfd.abs_timeout = cfd.now - max_age;
779   else
780     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
781
782   /* `tree_callback_flush' will return the keys of all values that haven't
783    * been touched in the last `config_flush_interval' seconds in `cfd'.
784    * The char*'s in this array point to the same memory as ci->file, so we
785    * don't need to free them separately. */
786   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
787
788   for (k = 0; k < cfd.keys_num; k++)
789   {
790     /* should never fail, since we have held the cache_lock
791      * the entire time */
792     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
793   }
794
795   if (cfd.keys != NULL)
796   {
797     free (cfd.keys);
798     cfd.keys = NULL;
799   }
800
801   return (0);
802 } /* int flush_old_values */
803
804 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
805 {
806   struct timeval now;
807   struct timespec next_flush;
808   int status;
809
810   gettimeofday (&now, NULL);
811   next_flush.tv_sec = now.tv_sec + config_flush_interval;
812   next_flush.tv_nsec = 1000 * now.tv_usec;
813
814   pthread_mutex_lock(&cache_lock);
815
816   while (state == RUNNING)
817   {
818     gettimeofday (&now, NULL);
819     if ((now.tv_sec > next_flush.tv_sec)
820         || ((now.tv_sec == next_flush.tv_sec)
821           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
822     {
823       RRDD_LOG(LOG_DEBUG, "flushing old values");
824
825       /* Determine the time of the next cache flush. */
826       next_flush.tv_sec = now.tv_sec + config_flush_interval;
827
828       /* Flush all values that haven't been written in the last
829        * `config_write_interval' seconds. */
830       flush_old_values (config_write_interval);
831
832       /* unlock the cache while we rotate so we don't block incoming
833        * updates if the fsync() blocks on disk I/O */
834       pthread_mutex_unlock(&cache_lock);
835       journal_rotate();
836       pthread_mutex_lock(&cache_lock);
837     }
838
839     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
840     if (status != 0 && status != ETIMEDOUT)
841     {
842       RRDD_LOG (LOG_ERR, "flush_thread_main: "
843                 "pthread_cond_timedwait returned %i.", status);
844     }
845   }
846
847   if (config_flush_at_shutdown)
848     flush_old_values (-1); /* flush everything */
849
850   state = SHUTDOWN;
851
852   pthread_mutex_unlock(&cache_lock);
853
854   return NULL;
855 } /* void *flush_thread_main */
856
857 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
858 {
859   pthread_mutex_lock (&cache_lock);
860
861   while (state != SHUTDOWN
862          || (cache_queue_head != NULL && config_flush_at_shutdown))
863   {
864     cache_item_t *ci;
865     char *file;
866     char **values;
867     size_t values_num;
868     int status;
869
870     /* Now, check if there's something to store away. If not, wait until
871      * something comes in. */
872     if (cache_queue_head == NULL)
873     {
874       status = pthread_cond_wait (&queue_cond, &cache_lock);
875       if ((status != 0) && (status != ETIMEDOUT))
876       {
877         RRDD_LOG (LOG_ERR, "queue_thread_main: "
878             "pthread_cond_wait returned %i.", status);
879       }
880     }
881
882     /* Check if a value has arrived. This may be NULL if we timed out or there
883      * was an interrupt such as a signal. */
884     if (cache_queue_head == NULL)
885       continue;
886
887     ci = cache_queue_head;
888
889     /* copy the relevant parts */
890     file = strdup (ci->file);
891     if (file == NULL)
892     {
893       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
894       continue;
895     }
896
897     assert(ci->values != NULL);
898     assert(ci->values_num > 0);
899
900     values = ci->values;
901     values_num = ci->values_num;
902
903     wipe_ci_values(ci, time(NULL));
904     remove_from_queue(ci);
905
906     pthread_mutex_unlock (&cache_lock);
907
908     rrd_clear_error ();
909     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
910     if (status != 0)
911     {
912       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
913           "rrd_update_r (%s) failed with status %i. (%s)",
914           file, status, rrd_get_error());
915     }
916
917     journal_write("wrote", file);
918
919     /* Search again in the tree.  It's possible someone issued a "FORGET"
920      * while we were writing the update values. */
921     pthread_mutex_lock(&cache_lock);
922     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
923     if (ci)
924       pthread_cond_broadcast(&ci->flushed);
925     pthread_mutex_unlock(&cache_lock);
926
927     if (status == 0)
928     {
929       pthread_mutex_lock (&stats_lock);
930       stats_updates_written++;
931       stats_data_sets_written += values_num;
932       pthread_mutex_unlock (&stats_lock);
933     }
934
935     rrd_free_ptrs((void ***) &values, &values_num);
936     free(file);
937
938     pthread_mutex_lock (&cache_lock);
939   }
940   pthread_mutex_unlock (&cache_lock);
941
942   return (NULL);
943 } /* }}} void *queue_thread_main */
944
945 static int buffer_get_field (char **buffer_ret, /* {{{ */
946     size_t *buffer_size_ret, char **field_ret)
947 {
948   char *buffer;
949   size_t buffer_pos;
950   size_t buffer_size;
951   char *field;
952   size_t field_size;
953   int status;
954
955   buffer = *buffer_ret;
956   buffer_pos = 0;
957   buffer_size = *buffer_size_ret;
958   field = *buffer_ret;
959   field_size = 0;
960
961   if (buffer_size <= 0)
962     return (-1);
963
964   /* This is ensured by `handle_request'. */
965   assert (buffer[buffer_size - 1] == '\0');
966
967   status = -1;
968   while (buffer_pos < buffer_size)
969   {
970     /* Check for end-of-field or end-of-buffer */
971     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
972     {
973       field[field_size] = 0;
974       field_size++;
975       buffer_pos++;
976       status = 0;
977       break;
978     }
979     /* Handle escaped characters. */
980     else if (buffer[buffer_pos] == '\\')
981     {
982       if (buffer_pos >= (buffer_size - 1))
983         break;
984       buffer_pos++;
985       field[field_size] = buffer[buffer_pos];
986       field_size++;
987       buffer_pos++;
988     }
989     /* Normal operation */ 
990     else
991     {
992       field[field_size] = buffer[buffer_pos];
993       field_size++;
994       buffer_pos++;
995     }
996   } /* while (buffer_pos < buffer_size) */
997
998   if (status != 0)
999     return (status);
1000
1001   *buffer_ret = buffer + buffer_pos;
1002   *buffer_size_ret = buffer_size - buffer_pos;
1003   *field_ret = field;
1004
1005   return (0);
1006 } /* }}} int buffer_get_field */
1007
1008 /* if we're restricting writes to the base directory,
1009  * check whether the file falls within the dir
1010  * returns 1 if OK, otherwise 0
1011  */
1012 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1013 {
1014   assert(file != NULL);
1015
1016   if (!config_write_base_only
1017       || sock == NULL /* journal replay */
1018       || config_base_dir == NULL)
1019     return 1;
1020
1021   if (strstr(file, "../") != NULL) goto err;
1022
1023   /* relative paths without "../" are ok */
1024   if (*file != '/') return 1;
1025
1026   /* file must be of the format base + "/" + <1+ char filename> */
1027   if (strlen(file) < _config_base_dir_len + 2) goto err;
1028   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1029   if (*(file + _config_base_dir_len) != '/') goto err;
1030
1031   return 1;
1032
1033 err:
1034   if (sock != NULL && sock->fd >= 0)
1035     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1036
1037   return 0;
1038 } /* }}} static int check_file_access */
1039
1040 /* when using a base dir, convert relative paths to absolute paths.
1041  * if necessary, modifies the "filename" pointer to point
1042  * to the new path created in "tmp".  "tmp" is provided
1043  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1044  *
1045  * this allows us to optimize for the expected case (absolute path)
1046  * with a no-op.
1047  */
1048 static void get_abs_path(char **filename, char *tmp)
1049 {
1050   assert(tmp != NULL);
1051   assert(filename != NULL && *filename != NULL);
1052
1053   if (config_base_dir == NULL || **filename == '/')
1054     return;
1055
1056   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1057   *filename = tmp;
1058 } /* }}} static int get_abs_path */
1059
1060 /* returns 1 if we have the required privilege level,
1061  * otherwise issue an error to the user on sock */
1062 static int has_privilege (listen_socket_t *sock, /* {{{ */
1063                           socket_privilege priv)
1064 {
1065   if (sock == NULL) /* journal replay */
1066     return 1;
1067
1068   if (sock->privilege >= priv)
1069     return 1;
1070
1071   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1072 } /* }}} static int has_privilege */
1073
1074 static int flush_file (const char *filename) /* {{{ */
1075 {
1076   cache_item_t *ci;
1077
1078   pthread_mutex_lock (&cache_lock);
1079
1080   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1081   if (ci == NULL)
1082   {
1083     pthread_mutex_unlock (&cache_lock);
1084     return (ENOENT);
1085   }
1086
1087   if (ci->values_num > 0)
1088   {
1089     /* Enqueue at head */
1090     enqueue_cache_item (ci, HEAD);
1091     pthread_cond_wait(&ci->flushed, &cache_lock);
1092   }
1093
1094   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1095    * may have been purged during our cond_wait() */
1096
1097   pthread_mutex_unlock(&cache_lock);
1098
1099   return (0);
1100 } /* }}} int flush_file */
1101
1102 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1103 {
1104   char *err = "Syntax error.\n";
1105
1106   if (cmd && cmd->syntax)
1107     err = cmd->syntax;
1108
1109   return send_response(sock, RESP_ERR, "Usage: %s", err);
1110 } /* }}} static int syntax_error() */
1111
1112 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1113 {
1114   uint64_t copy_queue_length;
1115   uint64_t copy_updates_received;
1116   uint64_t copy_flush_received;
1117   uint64_t copy_updates_written;
1118   uint64_t copy_data_sets_written;
1119   uint64_t copy_journal_bytes;
1120   uint64_t copy_journal_rotate;
1121
1122   uint64_t tree_nodes_number;
1123   uint64_t tree_depth;
1124
1125   pthread_mutex_lock (&stats_lock);
1126   copy_queue_length       = stats_queue_length;
1127   copy_updates_received   = stats_updates_received;
1128   copy_flush_received     = stats_flush_received;
1129   copy_updates_written    = stats_updates_written;
1130   copy_data_sets_written  = stats_data_sets_written;
1131   copy_journal_bytes      = stats_journal_bytes;
1132   copy_journal_rotate     = stats_journal_rotate;
1133   pthread_mutex_unlock (&stats_lock);
1134
1135   pthread_mutex_lock (&cache_lock);
1136   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1137   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1138   pthread_mutex_unlock (&cache_lock);
1139
1140   add_response_info(sock,
1141                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1142   add_response_info(sock,
1143                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1144   add_response_info(sock,
1145                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1146   add_response_info(sock,
1147                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1148   add_response_info(sock,
1149                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1150   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1151   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1152   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1153   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1154
1155   send_response(sock, RESP_OK, "Statistics follow\n");
1156
1157   return (0);
1158 } /* }}} int handle_request_stats */
1159
1160 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1161 {
1162   char *file, file_tmp[PATH_MAX];
1163   int status;
1164
1165   status = buffer_get_field (&buffer, &buffer_size, &file);
1166   if (status != 0)
1167   {
1168     return syntax_error(sock,cmd);
1169   }
1170   else
1171   {
1172     pthread_mutex_lock(&stats_lock);
1173     stats_flush_received++;
1174     pthread_mutex_unlock(&stats_lock);
1175
1176     get_abs_path(&file, file_tmp);
1177     if (!check_file_access(file, sock)) return 0;
1178
1179     status = flush_file (file);
1180     if (status == 0)
1181       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1182     else if (status == ENOENT)
1183     {
1184       /* no file in our tree; see whether it exists at all */
1185       struct stat statbuf;
1186
1187       memset(&statbuf, 0, sizeof(statbuf));
1188       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1189         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1190       else
1191         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1192     }
1193     else if (status < 0)
1194       return send_response(sock, RESP_ERR, "Internal error.\n");
1195     else
1196       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1197   }
1198
1199   /* NOTREACHED */
1200   assert(1==0);
1201 } /* }}} int handle_request_flush */
1202
1203 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1204 {
1205   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1206
1207   pthread_mutex_lock(&cache_lock);
1208   flush_old_values(-1);
1209   pthread_mutex_unlock(&cache_lock);
1210
1211   return send_response(sock, RESP_OK, "Started flush.\n");
1212 } /* }}} static int handle_request_flushall */
1213
1214 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1215 {
1216   int status;
1217   char *file, file_tmp[PATH_MAX];
1218   cache_item_t *ci;
1219
1220   status = buffer_get_field(&buffer, &buffer_size, &file);
1221   if (status != 0)
1222     return syntax_error(sock,cmd);
1223
1224   get_abs_path(&file, file_tmp);
1225
1226   pthread_mutex_lock(&cache_lock);
1227   ci = g_tree_lookup(cache_tree, file);
1228   if (ci == NULL)
1229   {
1230     pthread_mutex_unlock(&cache_lock);
1231     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1232   }
1233
1234   for (size_t i=0; i < ci->values_num; i++)
1235     add_response_info(sock, "%s\n", ci->values[i]);
1236
1237   pthread_mutex_unlock(&cache_lock);
1238   return send_response(sock, RESP_OK, "updates pending\n");
1239 } /* }}} static int handle_request_pending */
1240
1241 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1242 {
1243   int status;
1244   gboolean found;
1245   char *file, file_tmp[PATH_MAX];
1246
1247   status = buffer_get_field(&buffer, &buffer_size, &file);
1248   if (status != 0)
1249     return syntax_error(sock,cmd);
1250
1251   get_abs_path(&file, file_tmp);
1252   if (!check_file_access(file, sock)) return 0;
1253
1254   pthread_mutex_lock(&cache_lock);
1255   found = g_tree_remove(cache_tree, file);
1256   pthread_mutex_unlock(&cache_lock);
1257
1258   if (found == TRUE)
1259   {
1260     if (sock != NULL)
1261       journal_write("forget", file);
1262
1263     return send_response(sock, RESP_OK, "Gone!\n");
1264   }
1265   else
1266     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267
1268   /* NOTREACHED */
1269   assert(1==0);
1270 } /* }}} static int handle_request_forget */
1271
1272 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1273 {
1274   cache_item_t *ci;
1275
1276   pthread_mutex_lock(&cache_lock);
1277
1278   ci = cache_queue_head;
1279   while (ci != NULL)
1280   {
1281     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1282     ci = ci->next;
1283   }
1284
1285   pthread_mutex_unlock(&cache_lock);
1286
1287   return send_response(sock, RESP_OK, "in queue.\n");
1288 } /* }}} int handle_request_queue */
1289
1290 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1291 {
1292   char *file, file_tmp[PATH_MAX];
1293   int values_num = 0;
1294   int status;
1295   char orig_buf[CMD_MAX];
1296
1297   cache_item_t *ci;
1298
1299   /* save it for the journal later */
1300   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1301
1302   status = buffer_get_field (&buffer, &buffer_size, &file);
1303   if (status != 0)
1304     return syntax_error(sock,cmd);
1305
1306   pthread_mutex_lock(&stats_lock);
1307   stats_updates_received++;
1308   pthread_mutex_unlock(&stats_lock);
1309
1310   get_abs_path(&file, file_tmp);
1311   if (!check_file_access(file, sock)) return 0;
1312
1313   pthread_mutex_lock (&cache_lock);
1314   ci = g_tree_lookup (cache_tree, file);
1315
1316   if (ci == NULL) /* {{{ */
1317   {
1318     struct stat statbuf;
1319     cache_item_t *tmp;
1320
1321     /* don't hold the lock while we setup; stat(2) might block */
1322     pthread_mutex_unlock(&cache_lock);
1323
1324     memset (&statbuf, 0, sizeof (statbuf));
1325     status = stat (file, &statbuf);
1326     if (status != 0)
1327     {
1328       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1329
1330       status = errno;
1331       if (status == ENOENT)
1332         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1333       else
1334         return send_response(sock, RESP_ERR,
1335                              "stat failed with error %i.\n", status);
1336     }
1337     if (!S_ISREG (statbuf.st_mode))
1338       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1339
1340     if (access(file, R_OK|W_OK) != 0)
1341       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1342                            file, rrd_strerror(errno));
1343
1344     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1345     if (ci == NULL)
1346     {
1347       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1348
1349       return send_response(sock, RESP_ERR, "malloc failed.\n");
1350     }
1351     memset (ci, 0, sizeof (cache_item_t));
1352
1353     ci->file = strdup (file);
1354     if (ci->file == NULL)
1355     {
1356       free (ci);
1357       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1358
1359       return send_response(sock, RESP_ERR, "strdup failed.\n");
1360     }
1361
1362     wipe_ci_values(ci, now);
1363     ci->flags = CI_FLAGS_IN_TREE;
1364     pthread_cond_init(&ci->flushed, NULL);
1365
1366     pthread_mutex_lock(&cache_lock);
1367
1368     /* another UPDATE might have added this entry in the meantime */
1369     tmp = g_tree_lookup (cache_tree, file);
1370     if (tmp == NULL)
1371       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1372     else
1373     {
1374       free_cache_item (ci);
1375       ci = tmp;
1376     }
1377
1378     /* state may have changed while we were unlocked */
1379     if (state == SHUTDOWN)
1380       return -1;
1381   } /* }}} */
1382   assert (ci != NULL);
1383
1384   /* don't re-write updates in replay mode */
1385   if (sock != NULL)
1386     journal_write("update", orig_buf);
1387
1388   while (buffer_size > 0)
1389   {
1390     char *value;
1391     time_t stamp;
1392     char *eostamp;
1393
1394     status = buffer_get_field (&buffer, &buffer_size, &value);
1395     if (status != 0)
1396     {
1397       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1398       break;
1399     }
1400
1401     /* make sure update time is always moving forward */
1402     stamp = strtol(value, &eostamp, 10);
1403     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1404     {
1405       pthread_mutex_unlock(&cache_lock);
1406       return send_response(sock, RESP_ERR,
1407                            "Cannot find timestamp in '%s'!\n", value);
1408     }
1409     else if (stamp <= ci->last_update_stamp)
1410     {
1411       pthread_mutex_unlock(&cache_lock);
1412       return send_response(sock, RESP_ERR,
1413                            "illegal attempt to update using time %ld when last"
1414                            " update time is %ld (minimum one second step)\n",
1415                            stamp, ci->last_update_stamp);
1416     }
1417     else
1418       ci->last_update_stamp = stamp;
1419
1420     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1421     {
1422       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1423       continue;
1424     }
1425
1426     values_num++;
1427   }
1428
1429   if (((now - ci->last_flush_time) >= config_write_interval)
1430       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1431       && (ci->values_num > 0))
1432   {
1433     enqueue_cache_item (ci, TAIL);
1434   }
1435
1436   pthread_mutex_unlock (&cache_lock);
1437
1438   if (values_num < 1)
1439     return send_response(sock, RESP_ERR, "No values updated.\n");
1440   else
1441     return send_response(sock, RESP_OK,
1442                          "errors, enqueued %i value(s).\n", values_num);
1443
1444   /* NOTREACHED */
1445   assert(1==0);
1446
1447 } /* }}} int handle_request_update */
1448
1449 /* we came across a "WROTE" entry during journal replay.
1450  * throw away any values that we have accumulated for this file
1451  */
1452 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1453 {
1454   cache_item_t *ci;
1455   const char *file = buffer;
1456
1457   pthread_mutex_lock(&cache_lock);
1458
1459   ci = g_tree_lookup(cache_tree, file);
1460   if (ci == NULL)
1461   {
1462     pthread_mutex_unlock(&cache_lock);
1463     return (0);
1464   }
1465
1466   if (ci->values)
1467     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1468
1469   wipe_ci_values(ci, now);
1470   remove_from_queue(ci);
1471
1472   pthread_mutex_unlock(&cache_lock);
1473   return (0);
1474 } /* }}} int handle_request_wrote */
1475
1476 /* start "BATCH" processing */
1477 static int batch_start (HANDLER_PROTO) /* {{{ */
1478 {
1479   int status;
1480   if (sock->batch_start)
1481     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1482
1483   status = send_response(sock, RESP_OK,
1484                          "Go ahead.  End with dot '.' on its own line.\n");
1485   sock->batch_start = time(NULL);
1486   sock->batch_cmd = 0;
1487
1488   return status;
1489 } /* }}} static int batch_start */
1490
1491 /* finish "BATCH" processing and return results to the client */
1492 static int batch_done (HANDLER_PROTO) /* {{{ */
1493 {
1494   assert(sock->batch_start);
1495   sock->batch_start = 0;
1496   sock->batch_cmd  = 0;
1497   return send_response(sock, RESP_OK, "errors\n");
1498 } /* }}} static int batch_done */
1499
1500 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1501 {
1502   return -1;
1503 } /* }}} static int handle_request_quit */
1504
1505 struct command COMMANDS[] = {
1506   {
1507     "UPDATE",
1508     handle_request_update,
1509     PRIV_HIGH,
1510     CMD_CONTEXT_ANY,
1511     "UPDATE <filename> <values> [<values> ...]\n"
1512     ,
1513     "Adds the given file to the internal cache if it is not yet known and\n"
1514     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1515     "for details.\n"
1516     "\n"
1517     "Each <values> has the following form:\n"
1518     "  <values> = <time>:<value>[:<value>[...]]\n"
1519     "See the rrdupdate(1) manpage for details.\n"
1520   },
1521   {
1522     "WROTE",
1523     handle_request_wrote,
1524     PRIV_HIGH,
1525     CMD_CONTEXT_JOURNAL,
1526     NULL,
1527     NULL
1528   },
1529   {
1530     "FLUSH",
1531     handle_request_flush,
1532     PRIV_LOW,
1533     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1534     "FLUSH <filename>\n"
1535     ,
1536     "Adds the given filename to the head of the update queue and returns\n"
1537     "after it has been dequeued.\n"
1538   },
1539   {
1540     "FLUSHALL",
1541     handle_request_flushall,
1542     PRIV_HIGH,
1543     CMD_CONTEXT_CLIENT,
1544     "FLUSHALL\n"
1545     ,
1546     "Triggers writing of all pending updates.  Returns immediately.\n"
1547   },
1548   {
1549     "PENDING",
1550     handle_request_pending,
1551     PRIV_HIGH,
1552     CMD_CONTEXT_CLIENT,
1553     "PENDING <filename>\n"
1554     ,
1555     "Shows any 'pending' updates for a file, in order.\n"
1556     "The updates shown have not yet been written to the underlying RRD file.\n"
1557   },
1558   {
1559     "FORGET",
1560     handle_request_forget,
1561     PRIV_HIGH,
1562     CMD_CONTEXT_ANY,
1563     "FORGET <filename>\n"
1564     ,
1565     "Removes the file completely from the cache.\n"
1566     "Any pending updates for the file will be lost.\n"
1567   },
1568   {
1569     "QUEUE",
1570     handle_request_queue,
1571     PRIV_LOW,
1572     CMD_CONTEXT_CLIENT,
1573     "QUEUE\n"
1574     ,
1575         "Shows all files in the output queue.\n"
1576     "The output is zero or more lines in the following format:\n"
1577     "(where <num_vals> is the number of values to be written)\n"
1578     "\n"
1579     "<num_vals> <filename>\n"
1580   },
1581   {
1582     "STATS",
1583     handle_request_stats,
1584     PRIV_LOW,
1585     CMD_CONTEXT_CLIENT,
1586     "STATS\n"
1587     ,
1588     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1589     "a description of the values.\n"
1590   },
1591   {
1592     "HELP",
1593     handle_request_help,
1594     PRIV_LOW,
1595     CMD_CONTEXT_CLIENT,
1596     "HELP [<command>]\n",
1597     NULL, /* special! */
1598   },
1599   {
1600     "BATCH",
1601     batch_start,
1602     PRIV_LOW,
1603     CMD_CONTEXT_CLIENT,
1604     "BATCH\n"
1605     ,
1606     "The 'BATCH' command permits the client to initiate a bulk load\n"
1607     "   of commands to rrdcached.\n"
1608     "\n"
1609     "Usage:\n"
1610     "\n"
1611     "    client: BATCH\n"
1612     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1613     "    client: command #1\n"
1614     "    client: command #2\n"
1615     "    client: ... and so on\n"
1616     "    client: .\n"
1617     "    server: 2 errors\n"
1618     "    server: 7 message for command #7\n"
1619     "    server: 9 message for command #9\n"
1620     "\n"
1621     "For more information, consult the rrdcached(1) documentation.\n"
1622   },
1623   {
1624     ".",   /* BATCH terminator */
1625     batch_done,
1626     PRIV_LOW,
1627     CMD_CONTEXT_BATCH,
1628     NULL,
1629     NULL
1630   },
1631   {
1632     "QUIT",
1633     handle_request_quit,
1634     PRIV_LOW,
1635     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1636     "QUIT\n"
1637     ,
1638     "Disconnect from rrdcached.\n"
1639   },
1640   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1641 };
1642
1643 static struct command *find_command(char *cmd)
1644 {
1645   struct command *c = COMMANDS;
1646
1647   while (c->cmd != NULL)
1648   {
1649     if (strcasecmp(cmd, c->cmd) == 0)
1650       break;
1651     c++;
1652   }
1653
1654   if (c->cmd == NULL)
1655     return NULL;
1656   else
1657     return c;
1658 }
1659
1660 /* check whether commands are received in the expected context */
1661 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1662 {
1663   if (sock == NULL)
1664     return (cmd->context & CMD_CONTEXT_JOURNAL);
1665   else if (sock->batch_start)
1666     return (cmd->context & CMD_CONTEXT_BATCH);
1667   else
1668     return (cmd->context & CMD_CONTEXT_CLIENT);
1669
1670   /* NOTREACHED */
1671   assert(1==0);
1672 }
1673
1674 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1675 {
1676   int status;
1677   char *cmd_str;
1678   char *resp_txt;
1679   struct command *help = NULL;
1680
1681   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1682   if (status == 0)
1683     help = find_command(cmd_str);
1684
1685   if (help && (help->syntax || help->help))
1686   {
1687     char tmp[CMD_MAX];
1688
1689     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1690     resp_txt = tmp;
1691
1692     if (help->syntax)
1693       add_response_info(sock, "Usage: %s\n", help->syntax);
1694
1695     if (help->help)
1696       add_response_info(sock, "%s\n", help->help);
1697   }
1698   else
1699   {
1700     help = COMMANDS;
1701     resp_txt = "Command overview\n";
1702
1703     while (help->cmd)
1704     {
1705       if (help->syntax)
1706         add_response_info(sock, "%s", help->syntax);
1707       help++;
1708     }
1709   }
1710
1711   return send_response(sock, RESP_OK, resp_txt);
1712 } /* }}} int handle_request_help */
1713
1714 /* if sock==NULL, we are in journal replay mode */
1715 static int handle_request (DISPATCH_PROTO) /* {{{ */
1716 {
1717   char *buffer_ptr = buffer;
1718   char *cmd_str = NULL;
1719   struct command *cmd = NULL;
1720   int status;
1721
1722   assert (buffer[buffer_size - 1] == '\0');
1723
1724   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1725   if (status != 0)
1726   {
1727     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1728     return (-1);
1729   }
1730
1731   if (sock != NULL && sock->batch_start)
1732     sock->batch_cmd++;
1733
1734   cmd = find_command(cmd_str);
1735   if (!cmd)
1736     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1737
1738   status = has_privilege(sock, cmd->min_priv);
1739   if (status <= 0)
1740     return status;
1741
1742   if (!command_check_context(sock, cmd))
1743     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1744
1745   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1746 } /* }}} int handle_request */
1747
1748 /* MUST NOT hold journal_lock before calling this */
1749 static void journal_rotate(void) /* {{{ */
1750 {
1751   FILE *old_fh = NULL;
1752   int new_fd;
1753
1754   if (journal_cur == NULL || journal_old == NULL)
1755     return;
1756
1757   pthread_mutex_lock(&journal_lock);
1758
1759   /* we rotate this way (rename before close) so that the we can release
1760    * the journal lock as fast as possible.  Journal writes to the new
1761    * journal can proceed immediately after the new file is opened.  The
1762    * fclose can then block without affecting new updates.
1763    */
1764   if (journal_fh != NULL)
1765   {
1766     old_fh = journal_fh;
1767     journal_fh = NULL;
1768     rename(journal_cur, journal_old);
1769     ++stats_journal_rotate;
1770   }
1771
1772   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1773                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1774   if (new_fd >= 0)
1775   {
1776     journal_fh = fdopen(new_fd, "a");
1777     if (journal_fh == NULL)
1778       close(new_fd);
1779   }
1780
1781   pthread_mutex_unlock(&journal_lock);
1782
1783   if (old_fh != NULL)
1784     fclose(old_fh);
1785
1786   if (journal_fh == NULL)
1787   {
1788     RRDD_LOG(LOG_CRIT,
1789              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1790              journal_cur, rrd_strerror(errno));
1791
1792     RRDD_LOG(LOG_ERR,
1793              "JOURNALING DISABLED: All values will be flushed at shutdown");
1794     config_flush_at_shutdown = 1;
1795   }
1796
1797 } /* }}} static void journal_rotate */
1798
1799 static void journal_done(void) /* {{{ */
1800 {
1801   if (journal_cur == NULL)
1802     return;
1803
1804   pthread_mutex_lock(&journal_lock);
1805   if (journal_fh != NULL)
1806   {
1807     fclose(journal_fh);
1808     journal_fh = NULL;
1809   }
1810
1811   if (config_flush_at_shutdown)
1812   {
1813     RRDD_LOG(LOG_INFO, "removing journals");
1814     unlink(journal_old);
1815     unlink(journal_cur);
1816   }
1817   else
1818   {
1819     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1820              "journals will be used at next startup");
1821   }
1822
1823   pthread_mutex_unlock(&journal_lock);
1824
1825 } /* }}} static void journal_done */
1826
1827 static int journal_write(char *cmd, char *args) /* {{{ */
1828 {
1829   int chars;
1830
1831   if (journal_fh == NULL)
1832     return 0;
1833
1834   pthread_mutex_lock(&journal_lock);
1835   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1836   pthread_mutex_unlock(&journal_lock);
1837
1838   if (chars > 0)
1839   {
1840     pthread_mutex_lock(&stats_lock);
1841     stats_journal_bytes += chars;
1842     pthread_mutex_unlock(&stats_lock);
1843   }
1844
1845   return chars;
1846 } /* }}} static int journal_write */
1847
1848 static int journal_replay (const char *file) /* {{{ */
1849 {
1850   FILE *fh;
1851   int entry_cnt = 0;
1852   int fail_cnt = 0;
1853   uint64_t line = 0;
1854   char entry[CMD_MAX];
1855   time_t now;
1856
1857   if (file == NULL) return 0;
1858
1859   {
1860     char *reason = "unknown error";
1861     int status = 0;
1862     struct stat statbuf;
1863
1864     memset(&statbuf, 0, sizeof(statbuf));
1865     if (stat(file, &statbuf) != 0)
1866     {
1867       if (errno == ENOENT)
1868         return 0;
1869
1870       reason = "stat error";
1871       status = errno;
1872     }
1873     else if (!S_ISREG(statbuf.st_mode))
1874     {
1875       reason = "not a regular file";
1876       status = EPERM;
1877     }
1878     if (statbuf.st_uid != daemon_uid)
1879     {
1880       reason = "not owned by daemon user";
1881       status = EACCES;
1882     }
1883     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1884     {
1885       reason = "must not be user/group writable";
1886       status = EACCES;
1887     }
1888
1889     if (status != 0)
1890     {
1891       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1892                file, rrd_strerror(status), reason);
1893       return 0;
1894     }
1895   }
1896
1897   fh = fopen(file, "r");
1898   if (fh == NULL)
1899   {
1900     if (errno != ENOENT)
1901       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1902                file, rrd_strerror(errno));
1903     return 0;
1904   }
1905   else
1906     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1907
1908   now = time(NULL);
1909
1910   while(!feof(fh))
1911   {
1912     size_t entry_len;
1913
1914     ++line;
1915     if (fgets(entry, sizeof(entry), fh) == NULL)
1916       break;
1917     entry_len = strlen(entry);
1918
1919     /* check \n termination in case journal writing crashed mid-line */
1920     if (entry_len == 0)
1921       continue;
1922     else if (entry[entry_len - 1] != '\n')
1923     {
1924       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1925       ++fail_cnt;
1926       continue;
1927     }
1928
1929     entry[entry_len - 1] = '\0';
1930
1931     if (handle_request(NULL, now, entry, entry_len) == 0)
1932       ++entry_cnt;
1933     else
1934       ++fail_cnt;
1935   }
1936
1937   fclose(fh);
1938
1939   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1940            entry_cnt, fail_cnt);
1941
1942   return entry_cnt > 0 ? 1 : 0;
1943 } /* }}} static int journal_replay */
1944
1945 static void journal_init(void) /* {{{ */
1946 {
1947   int had_journal = 0;
1948
1949   if (journal_cur == NULL) return;
1950
1951   pthread_mutex_lock(&journal_lock);
1952
1953   RRDD_LOG(LOG_INFO, "checking for journal files");
1954
1955   had_journal += journal_replay(journal_old);
1956   had_journal += journal_replay(journal_cur);
1957
1958   /* it must have been a crash.  start a flush */
1959   if (had_journal && config_flush_at_shutdown)
1960     flush_old_values(-1);
1961
1962   pthread_mutex_unlock(&journal_lock);
1963   journal_rotate();
1964
1965   RRDD_LOG(LOG_INFO, "journal processing complete");
1966
1967 } /* }}} static void journal_init */
1968
1969 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1970 {
1971   assert(sock != NULL);
1972
1973   free(sock->rbuf);  sock->rbuf = NULL;
1974   free(sock->wbuf);  sock->wbuf = NULL;
1975   free(sock);
1976 } /* }}} void free_listen_socket */
1977
1978 static void close_connection(listen_socket_t *sock) /* {{{ */
1979 {
1980   if (sock->fd >= 0)
1981   {
1982     close(sock->fd);
1983     sock->fd = -1;
1984   }
1985
1986   free_listen_socket(sock);
1987
1988 } /* }}} void close_connection */
1989
1990 static void *connection_thread_main (void *args) /* {{{ */
1991 {
1992   listen_socket_t *sock;
1993   int fd;
1994
1995   sock = (listen_socket_t *) args;
1996   fd = sock->fd;
1997
1998   /* init read buffers */
1999   sock->next_read = sock->next_cmd = 0;
2000   sock->rbuf = malloc(RBUF_SIZE);
2001   if (sock->rbuf == NULL)
2002   {
2003     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2004     close_connection(sock);
2005     return NULL;
2006   }
2007
2008   pthread_mutex_lock (&connection_threads_lock);
2009   connection_threads_num++;
2010   pthread_mutex_unlock (&connection_threads_lock);
2011
2012   while (state == RUNNING)
2013   {
2014     char *cmd;
2015     ssize_t cmd_len;
2016     ssize_t rbytes;
2017     time_t now;
2018
2019     struct pollfd pollfd;
2020     int status;
2021
2022     pollfd.fd = fd;
2023     pollfd.events = POLLIN | POLLPRI;
2024     pollfd.revents = 0;
2025
2026     status = poll (&pollfd, 1, /* timeout = */ 500);
2027     if (state != RUNNING)
2028       break;
2029     else if (status == 0) /* timeout */
2030       continue;
2031     else if (status < 0) /* error */
2032     {
2033       status = errno;
2034       if (status != EINTR)
2035         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2036       continue;
2037     }
2038
2039     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2040       break;
2041     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2042     {
2043       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2044           "poll(2) returned something unexpected: %#04hx",
2045           pollfd.revents);
2046       break;
2047     }
2048
2049     rbytes = read(fd, sock->rbuf + sock->next_read,
2050                   RBUF_SIZE - sock->next_read);
2051     if (rbytes < 0)
2052     {
2053       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2054       break;
2055     }
2056     else if (rbytes == 0)
2057       break; /* eof */
2058
2059     sock->next_read += rbytes;
2060
2061     if (sock->batch_start)
2062       now = sock->batch_start;
2063     else
2064       now = time(NULL);
2065
2066     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2067     {
2068       status = handle_request (sock, now, cmd, cmd_len+1);
2069       if (status != 0)
2070         goto out_close;
2071     }
2072   }
2073
2074 out_close:
2075   close_connection(sock);
2076
2077   /* Remove this thread from the connection threads list */
2078   pthread_mutex_lock (&connection_threads_lock);
2079   connection_threads_num--;
2080   if (connection_threads_num <= 0)
2081     pthread_cond_broadcast(&connection_threads_done);
2082   pthread_mutex_unlock (&connection_threads_lock);
2083
2084   return (NULL);
2085 } /* }}} void *connection_thread_main */
2086
2087 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2088 {
2089   int fd;
2090   struct sockaddr_un sa;
2091   listen_socket_t *temp;
2092   int status;
2093   const char *path;
2094
2095   path = sock->addr;
2096   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2097     path += strlen("unix:");
2098
2099   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2100       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2101   if (temp == NULL)
2102   {
2103     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2104     return (-1);
2105   }
2106   listen_fds = temp;
2107   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2108
2109   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2110   if (fd < 0)
2111   {
2112     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2113              rrd_strerror(errno));
2114     return (-1);
2115   }
2116
2117   memset (&sa, 0, sizeof (sa));
2118   sa.sun_family = AF_UNIX;
2119   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2120
2121   /* if we've gotten this far, we own the pid file.  any daemon started
2122    * with the same args must not be alive.  therefore, ensure that we can
2123    * create the socket...
2124    */
2125   unlink(path);
2126
2127   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2128   if (status != 0)
2129   {
2130     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2131              path, rrd_strerror(errno));
2132     close (fd);
2133     return (-1);
2134   }
2135
2136   status = listen (fd, /* backlog = */ 10);
2137   if (status != 0)
2138   {
2139     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2140              path, rrd_strerror(errno));
2141     close (fd);
2142     unlink (path);
2143     return (-1);
2144   }
2145
2146   listen_fds[listen_fds_num].fd = fd;
2147   listen_fds[listen_fds_num].family = PF_UNIX;
2148   strncpy(listen_fds[listen_fds_num].addr, path,
2149           sizeof (listen_fds[listen_fds_num].addr) - 1);
2150   listen_fds_num++;
2151
2152   return (0);
2153 } /* }}} int open_listen_socket_unix */
2154
2155 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2156 {
2157   struct addrinfo ai_hints;
2158   struct addrinfo *ai_res;
2159   struct addrinfo *ai_ptr;
2160   char addr_copy[NI_MAXHOST];
2161   char *addr;
2162   char *port;
2163   int status;
2164
2165   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2166   addr_copy[sizeof (addr_copy) - 1] = 0;
2167   addr = addr_copy;
2168
2169   memset (&ai_hints, 0, sizeof (ai_hints));
2170   ai_hints.ai_flags = 0;
2171 #ifdef AI_ADDRCONFIG
2172   ai_hints.ai_flags |= AI_ADDRCONFIG;
2173 #endif
2174   ai_hints.ai_family = AF_UNSPEC;
2175   ai_hints.ai_socktype = SOCK_STREAM;
2176
2177   port = NULL;
2178   if (*addr == '[') /* IPv6+port format */
2179   {
2180     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2181     addr++;
2182
2183     port = strchr (addr, ']');
2184     if (port == NULL)
2185     {
2186       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2187       return (-1);
2188     }
2189     *port = 0;
2190     port++;
2191
2192     if (*port == ':')
2193       port++;
2194     else if (*port == 0)
2195       port = NULL;
2196     else
2197     {
2198       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2199       return (-1);
2200     }
2201   } /* if (*addr = ']') */
2202   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2203   {
2204     port = rindex(addr, ':');
2205     if (port != NULL)
2206     {
2207       *port = 0;
2208       port++;
2209     }
2210   }
2211   ai_res = NULL;
2212   status = getaddrinfo (addr,
2213                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2214                         &ai_hints, &ai_res);
2215   if (status != 0)
2216   {
2217     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2218              addr, gai_strerror (status));
2219     return (-1);
2220   }
2221
2222   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2223   {
2224     int fd;
2225     listen_socket_t *temp;
2226     int one = 1;
2227
2228     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2229         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2230     if (temp == NULL)
2231     {
2232       fprintf (stderr,
2233                "rrdcached: open_listen_socket_network: realloc failed.\n");
2234       continue;
2235     }
2236     listen_fds = temp;
2237     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2238
2239     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2240     if (fd < 0)
2241     {
2242       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2243                rrd_strerror(errno));
2244       continue;
2245     }
2246
2247     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2248
2249     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2250     if (status != 0)
2251     {
2252       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2253                sock->addr, rrd_strerror(errno));
2254       close (fd);
2255       continue;
2256     }
2257
2258     status = listen (fd, /* backlog = */ 10);
2259     if (status != 0)
2260     {
2261       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2262                sock->addr, rrd_strerror(errno));
2263       close (fd);
2264       freeaddrinfo(ai_res);
2265       return (-1);
2266     }
2267
2268     listen_fds[listen_fds_num].fd = fd;
2269     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2270     listen_fds_num++;
2271   } /* for (ai_ptr) */
2272
2273   freeaddrinfo(ai_res);
2274   return (0);
2275 } /* }}} static int open_listen_socket_network */
2276
2277 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2278 {
2279   assert(sock != NULL);
2280   assert(sock->addr != NULL);
2281
2282   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2283       || sock->addr[0] == '/')
2284     return (open_listen_socket_unix(sock));
2285   else
2286     return (open_listen_socket_network(sock));
2287 } /* }}} int open_listen_socket */
2288
2289 static int close_listen_sockets (void) /* {{{ */
2290 {
2291   size_t i;
2292
2293   for (i = 0; i < listen_fds_num; i++)
2294   {
2295     close (listen_fds[i].fd);
2296
2297     if (listen_fds[i].family == PF_UNIX)
2298       unlink(listen_fds[i].addr);
2299   }
2300
2301   free (listen_fds);
2302   listen_fds = NULL;
2303   listen_fds_num = 0;
2304
2305   return (0);
2306 } /* }}} int close_listen_sockets */
2307
2308 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2309 {
2310   struct pollfd *pollfds;
2311   int pollfds_num;
2312   int status;
2313   int i;
2314
2315   if (listen_fds_num < 1)
2316   {
2317     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2318     return (NULL);
2319   }
2320
2321   pollfds_num = listen_fds_num;
2322   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2323   if (pollfds == NULL)
2324   {
2325     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2326     return (NULL);
2327   }
2328   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2329
2330   RRDD_LOG(LOG_INFO, "listening for connections");
2331
2332   while (state == RUNNING)
2333   {
2334     for (i = 0; i < pollfds_num; i++)
2335     {
2336       pollfds[i].fd = listen_fds[i].fd;
2337       pollfds[i].events = POLLIN | POLLPRI;
2338       pollfds[i].revents = 0;
2339     }
2340
2341     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2342     if (state != RUNNING)
2343       break;
2344     else if (status == 0) /* timeout */
2345       continue;
2346     else if (status < 0) /* error */
2347     {
2348       status = errno;
2349       if (status != EINTR)
2350       {
2351         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2352       }
2353       continue;
2354     }
2355
2356     for (i = 0; i < pollfds_num; i++)
2357     {
2358       listen_socket_t *client_sock;
2359       struct sockaddr_storage client_sa;
2360       socklen_t client_sa_size;
2361       pthread_t tid;
2362       pthread_attr_t attr;
2363
2364       if (pollfds[i].revents == 0)
2365         continue;
2366
2367       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2368       {
2369         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2370             "poll(2) returned something unexpected for listen FD #%i.",
2371             pollfds[i].fd);
2372         continue;
2373       }
2374
2375       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2376       if (client_sock == NULL)
2377       {
2378         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2379         continue;
2380       }
2381       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2382
2383       client_sa_size = sizeof (client_sa);
2384       client_sock->fd = accept (pollfds[i].fd,
2385           (struct sockaddr *) &client_sa, &client_sa_size);
2386       if (client_sock->fd < 0)
2387       {
2388         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2389         free(client_sock);
2390         continue;
2391       }
2392
2393       pthread_attr_init (&attr);
2394       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2395
2396       status = pthread_create (&tid, &attr, connection_thread_main,
2397                                client_sock);
2398       if (status != 0)
2399       {
2400         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2401         close_connection(client_sock);
2402         continue;
2403       }
2404     } /* for (pollfds_num) */
2405   } /* while (state == RUNNING) */
2406
2407   RRDD_LOG(LOG_INFO, "starting shutdown");
2408
2409   close_listen_sockets ();
2410
2411   pthread_mutex_lock (&connection_threads_lock);
2412   while (connection_threads_num > 0)
2413     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2414   pthread_mutex_unlock (&connection_threads_lock);
2415
2416   free(pollfds);
2417
2418   return (NULL);
2419 } /* }}} void *listen_thread_main */
2420
2421 static int daemonize (void) /* {{{ */
2422 {
2423   int pid_fd;
2424   char *base_dir;
2425
2426   daemon_uid = geteuid();
2427
2428   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429   if (pid_fd < 0)
2430     pid_fd = check_pidfile();
2431   if (pid_fd < 0)
2432     return pid_fd;
2433
2434   /* open all the listen sockets */
2435   if (config_listen_address_list_len > 0)
2436   {
2437     for (size_t i = 0; i < config_listen_address_list_len; i++)
2438       open_listen_socket (config_listen_address_list[i]);
2439
2440     rrd_free_ptrs((void ***) &config_listen_address_list,
2441                   &config_listen_address_list_len);
2442   }
2443   else
2444   {
2445     listen_socket_t sock;
2446     memset(&sock, 0, sizeof(sock));
2447     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2448     open_listen_socket (&sock);
2449   }
2450
2451   if (listen_fds_num < 1)
2452   {
2453     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2454     goto error;
2455   }
2456
2457   if (!stay_foreground)
2458   {
2459     pid_t child;
2460
2461     child = fork ();
2462     if (child < 0)
2463     {
2464       fprintf (stderr, "daemonize: fork(2) failed.\n");
2465       goto error;
2466     }
2467     else if (child > 0)
2468       exit(0);
2469
2470     /* Become session leader */
2471     setsid ();
2472
2473     /* Open the first three file descriptors to /dev/null */
2474     close (2);
2475     close (1);
2476     close (0);
2477
2478     open ("/dev/null", O_RDWR);
2479     if (dup(0) == -1 || dup(0) == -1){
2480         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2481     }
2482   } /* if (!stay_foreground) */
2483
2484   /* Change into the /tmp directory. */
2485   base_dir = (config_base_dir != NULL)
2486     ? config_base_dir
2487     : "/tmp";
2488
2489   if (chdir (base_dir) != 0)
2490   {
2491     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2492     goto error;
2493   }
2494
2495   install_signal_handlers();
2496
2497   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2498   RRDD_LOG(LOG_INFO, "starting up");
2499
2500   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2501                                 (GDestroyNotify) free_cache_item);
2502   if (cache_tree == NULL)
2503   {
2504     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2505     goto error;
2506   }
2507
2508   return write_pidfile (pid_fd);
2509
2510 error:
2511   remove_pidfile();
2512   return -1;
2513 } /* }}} int daemonize */
2514
2515 static int cleanup (void) /* {{{ */
2516 {
2517   pthread_cond_broadcast (&flush_cond);
2518   pthread_join (flush_thread, NULL);
2519
2520   pthread_cond_broadcast (&queue_cond);
2521   for (int i = 0; i < config_queue_threads; i++)
2522     pthread_join (queue_threads[i], NULL);
2523
2524   if (config_flush_at_shutdown)
2525   {
2526     assert(cache_queue_head == NULL);
2527     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2528   }
2529
2530   journal_done();
2531   remove_pidfile ();
2532
2533   free(queue_threads);
2534   free(config_base_dir);
2535   free(config_pid_file);
2536   free(journal_cur);
2537   free(journal_old);
2538
2539   pthread_mutex_lock(&cache_lock);
2540   g_tree_destroy(cache_tree);
2541
2542   RRDD_LOG(LOG_INFO, "goodbye");
2543   closelog ();
2544
2545   return (0);
2546 } /* }}} int cleanup */
2547
2548 static int read_options (int argc, char **argv) /* {{{ */
2549 {
2550   int option;
2551   int status = 0;
2552
2553   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2554   {
2555     switch (option)
2556     {
2557       case 'g':
2558         stay_foreground=1;
2559         break;
2560
2561       case 'L':
2562       case 'l':
2563       {
2564         listen_socket_t *new;
2565
2566         new = malloc(sizeof(listen_socket_t));
2567         if (new == NULL)
2568         {
2569           fprintf(stderr, "read_options: malloc failed.\n");
2570           return(2);
2571         }
2572         memset(new, 0, sizeof(listen_socket_t));
2573
2574         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2575         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2576
2577         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2578                          &config_listen_address_list_len, new))
2579         {
2580           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2581           return (2);
2582         }
2583       }
2584       break;
2585
2586       case 'f':
2587       {
2588         int temp;
2589
2590         temp = atoi (optarg);
2591         if (temp > 0)
2592           config_flush_interval = temp;
2593         else
2594         {
2595           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2596           status = 3;
2597         }
2598       }
2599       break;
2600
2601       case 'w':
2602       {
2603         int temp;
2604
2605         temp = atoi (optarg);
2606         if (temp > 0)
2607           config_write_interval = temp;
2608         else
2609         {
2610           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2611           status = 2;
2612         }
2613       }
2614       break;
2615
2616       case 'z':
2617       {
2618         int temp;
2619
2620         temp = atoi(optarg);
2621         if (temp > 0)
2622           config_write_jitter = temp;
2623         else
2624         {
2625           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2626           status = 2;
2627         }
2628
2629         break;
2630       }
2631
2632       case 't':
2633       {
2634         int threads;
2635         threads = atoi(optarg);
2636         if (threads >= 1)
2637           config_queue_threads = threads;
2638         else
2639         {
2640           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2641           return 1;
2642         }
2643       }
2644       break;
2645
2646       case 'B':
2647         config_write_base_only = 1;
2648         break;
2649
2650       case 'b':
2651       {
2652         size_t len;
2653         char base_realpath[PATH_MAX];
2654
2655         if (config_base_dir != NULL)
2656           free (config_base_dir);
2657         config_base_dir = strdup (optarg);
2658         if (config_base_dir == NULL)
2659         {
2660           fprintf (stderr, "read_options: strdup failed.\n");
2661           return (3);
2662         }
2663
2664         /* make sure that the base directory is not resolved via
2665          * symbolic links.  this makes some performance-enhancing
2666          * assumptions possible (we don't have to resolve paths
2667          * that start with a "/")
2668          */
2669         if (realpath(config_base_dir, base_realpath) == NULL)
2670         {
2671           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2672           return 5;
2673         }
2674         else if (strncmp(config_base_dir,
2675                          base_realpath, sizeof(base_realpath)) != 0)
2676         {
2677           fprintf(stderr,
2678                   "Base directory (-b) resolved via file system links!\n"
2679                   "Please consult rrdcached '-b' documentation!\n"
2680                   "Consider specifying the real directory (%s)\n",
2681                   base_realpath);
2682           return 5;
2683         }
2684
2685         len = strlen (config_base_dir);
2686         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2687         {
2688           config_base_dir[len - 1] = 0;
2689           len--;
2690         }
2691
2692         if (len < 1)
2693         {
2694           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2695           return (4);
2696         }
2697
2698         _config_base_dir_len = len;
2699       }
2700       break;
2701
2702       case 'p':
2703       {
2704         if (config_pid_file != NULL)
2705           free (config_pid_file);
2706         config_pid_file = strdup (optarg);
2707         if (config_pid_file == NULL)
2708         {
2709           fprintf (stderr, "read_options: strdup failed.\n");
2710           return (3);
2711         }
2712       }
2713       break;
2714
2715       case 'F':
2716         config_flush_at_shutdown = 1;
2717         break;
2718
2719       case 'j':
2720       {
2721         struct stat statbuf;
2722         const char *dir = optarg;
2723
2724         status = stat(dir, &statbuf);
2725         if (status != 0)
2726         {
2727           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2728           return 6;
2729         }
2730
2731         if (!S_ISDIR(statbuf.st_mode)
2732             || access(dir, R_OK|W_OK|X_OK) != 0)
2733         {
2734           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2735                   errno ? rrd_strerror(errno) : "");
2736           return 6;
2737         }
2738
2739         journal_cur = malloc(PATH_MAX + 1);
2740         journal_old = malloc(PATH_MAX + 1);
2741         if (journal_cur == NULL || journal_old == NULL)
2742         {
2743           fprintf(stderr, "malloc failure for journal files\n");
2744           return 6;
2745         }
2746         else 
2747         {
2748           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2749           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2750         }
2751       }
2752       break;
2753
2754       case 'h':
2755       case '?':
2756         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2757             "\n"
2758             "Usage: rrdcached [options]\n"
2759             "\n"
2760             "Valid options are:\n"
2761             "  -l <address>  Socket address to listen to.\n"
2762             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2763             "  -w <seconds>  Interval in which to write data.\n"
2764             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2765             "  -t <threads>  Number of write threads.\n"
2766             "  -f <seconds>  Interval in which to flush dead data.\n"
2767             "  -p <file>     Location of the PID-file.\n"
2768             "  -b <dir>      Base directory to change to.\n"
2769             "  -B            Restrict file access to paths within -b <dir>\n"
2770             "  -g            Do not fork and run in the foreground.\n"
2771             "  -j <dir>      Directory in which to create the journal files.\n"
2772             "  -F            Always flush all updates at shutdown\n"
2773             "\n"
2774             "For more information and a detailed description of all options "
2775             "please refer\n"
2776             "to the rrdcached(1) manual page.\n",
2777             VERSION);
2778         status = -1;
2779         break;
2780     } /* switch (option) */
2781   } /* while (getopt) */
2782
2783   /* advise the user when values are not sane */
2784   if (config_flush_interval < 2 * config_write_interval)
2785     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2786             " 2x write interval (-w) !\n");
2787   if (config_write_jitter > config_write_interval)
2788     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2789             " write interval (-w) !\n");
2790
2791   if (config_write_base_only && config_base_dir == NULL)
2792     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2793             "  Consult the rrdcached documentation\n");
2794
2795   if (journal_cur == NULL)
2796     config_flush_at_shutdown = 1;
2797
2798   return (status);
2799 } /* }}} int read_options */
2800
2801 int main (int argc, char **argv)
2802 {
2803   int status;
2804
2805   status = read_options (argc, argv);
2806   if (status != 0)
2807   {
2808     if (status < 0)
2809       status = 0;
2810     return (status);
2811   }
2812
2813   status = daemonize ();
2814   if (status != 0)
2815   {
2816     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2817     return (1);
2818   }
2819
2820   journal_init();
2821
2822   /* start the queue threads */
2823   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2824   if (queue_threads == NULL)
2825   {
2826     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2827     cleanup();
2828     return (1);
2829   }
2830   for (int i = 0; i < config_queue_threads; i++)
2831   {
2832     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2833     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2834     if (status != 0)
2835     {
2836       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2837       cleanup();
2838       return (1);
2839     }
2840   }
2841
2842   /* start the flush thread */
2843   memset(&flush_thread, 0, sizeof(flush_thread));
2844   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2845   if (status != 0)
2846   {
2847     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2848     cleanup();
2849     return (1);
2850   }
2851
2852   listen_thread_main (NULL);
2853   cleanup ();
2854
2855   return (0);
2856 } /* int main */
2857
2858 /*
2859  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2860  */