fix off by 1 error
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66
67 #include "rrd_tool.h"
68 #include "rrd_client.h"
69 #include "unused.h"
70
71 #include <stdlib.h>
72
73 #ifndef WIN32
74 #ifdef HAVE_STDINT_H
75 #  include <stdint.h>
76 #endif
77 #include <unistd.h>
78 #include <strings.h>
79 #include <inttypes.h>
80 #include <sys/socket.h>
81
82 #else
83
84 #endif
85 #include <stdio.h>
86 #include <string.h>
87
88 #include <sys/types.h>
89 #include <sys/stat.h>
90 #include <dirent.h>
91 #include <fcntl.h>
92 #include <signal.h>
93 #include <sys/un.h>
94 #include <netdb.h>
95 #include <poll.h>
96 #include <syslog.h>
97 #include <pthread.h>
98 #include <errno.h>
99 #include <assert.h>
100 #include <sys/time.h>
101 #include <time.h>
102 #include <libgen.h>
103 #include <grp.h>
104
105 #ifdef HAVE_LIBWRAP
106 #include <tcpd.h>
107 #endif /* HAVE_LIBWRAP */
108
109 #include <glib-2.0/glib.h>
110 /* }}} */
111
112 #define RRDD_LOG(severity, ...) \
113   do { \
114     if (stay_foreground) { \
115       fprintf(stderr, __VA_ARGS__); \
116       fprintf(stderr, "\n"); } \
117     syslog ((severity), __VA_ARGS__); \
118   } while (0)
119
120 /*
121  * Types
122  */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124
125 struct listen_socket_s
126 {
127   int fd;
128   char addr[PATH_MAX + 1];
129   int family;
130
131   /* state for BATCH processing */
132   time_t batch_start;
133   int batch_cmd;
134
135   /* buffered IO */
136   char *rbuf;
137   off_t next_cmd;
138   off_t next_read;
139
140   char *wbuf;
141   ssize_t wbuf_len;
142
143   uint32_t permissions;
144
145   gid_t  socket_group;
146   mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
149
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
154                         time_t UNUSED(now),\
155                         char  UNUSED(*buffer),\
156                         size_t UNUSED(buffer_size)
157
158 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
159                         DISPATCH_PROTO
160
161 struct command_s {
162   char   *cmd;
163   int (*handler)(HANDLER_PROTO);
164
165   char  context;                /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT      (1<<0)
167 #define CMD_CONTEXT_BATCH       (1<<1)
168 #define CMD_CONTEXT_JOURNAL     (1<<2)
169 #define CMD_CONTEXT_ANY         (0x7f)
170
171   char *syntax;
172   char *help;
173 };
174
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
178 {
179   char *file;
180   char **values;
181   size_t values_num;            /* number of valid pointers */
182   size_t values_alloc;          /* number of allocated pointers */
183   time_t last_flush_time;
184   double last_update_stamp;
185 #define CI_FLAGS_IN_TREE  (1<<0)
186 #define CI_FLAGS_IN_QUEUE (1<<1)
187   int flags;
188   pthread_cond_t  flushed;
189   cache_item_t *prev;
190   cache_item_t *next;
191 };
192
193 struct callback_flush_data_s
194 {
195   time_t now;
196   time_t abs_timeout;
197   char **keys;
198   size_t keys_num;
199 };
200 typedef struct callback_flush_data_s callback_flush_data_t;
201
202 enum queue_side_e
203 {
204   HEAD,
205   TAIL
206 };
207 typedef enum queue_side_e queue_side_t;
208
209 /* describe a set of journal files */
210 typedef struct {
211   char **files;
212   size_t files_num;
213 } journal_set;
214
215 #define RBUF_SIZE (RRD_CMD_MAX*2)
216
217 /*
218  * Variables
219  */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
222
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
225
226 static listen_socket_t default_socket;
227
228 enum {
229   RUNNING,              /* normal operation */
230   FLUSHING,             /* flushing remaining values */
231   SHUTDOWN              /* shutting down */
232 } state = RUNNING;
233
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
237
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
240
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
244
245 /* Cache stuff */
246 static GTree          *cache_tree = NULL;
247 static cache_item_t   *cache_queue_head = NULL;
248 static cache_item_t   *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
250
251 static int config_write_interval = 300;
252 static int config_write_jitter   = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
259 static size_t config_alloc_chunk = 1;
260
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
263
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
272
273 static int opt_no_overwrite = 0; /* default for the daemon */
274
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL;         /* current journal file handle */
282 static long  journal_size = 0;          /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
288
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
291
292 /* 
293  * Functions
294  */
295 static void sig_common (const char *sig) /* {{{ */
296 {
297   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298   if (state == RUNNING) {
299       state = FLUSHING;
300   }
301   pthread_cond_broadcast(&flush_cond);
302   pthread_cond_broadcast(&queue_cond);
303 } /* }}} void sig_common */
304
305 static void sig_int_handler (int UNUSED(s)) /* {{{ */
306 {
307   sig_common("INT");
308 } /* }}} void sig_int_handler */
309
310 static void sig_term_handler (int UNUSED(s)) /* {{{ */
311 {
312   sig_common("TERM");
313 } /* }}} void sig_term_handler */
314
315 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
316 {
317   config_flush_at_shutdown = 1;
318   sig_common("USR1");
319 } /* }}} void sig_usr1_handler */
320
321 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
322 {
323   config_flush_at_shutdown = 0;
324   sig_common("USR2");
325 } /* }}} void sig_usr2_handler */
326
327 static void install_signal_handlers(void) /* {{{ */
328 {
329   /* These structures are static, because `sigaction' behaves weird if the are
330    * overwritten.. */
331   static struct sigaction sa_int;
332   static struct sigaction sa_term;
333   static struct sigaction sa_pipe;
334   static struct sigaction sa_usr1;
335   static struct sigaction sa_usr2;
336
337   /* Install signal handlers */
338   memset (&sa_int, 0, sizeof (sa_int));
339   sa_int.sa_handler = sig_int_handler;
340   sigaction (SIGINT, &sa_int, NULL);
341
342   memset (&sa_term, 0, sizeof (sa_term));
343   sa_term.sa_handler = sig_term_handler;
344   sigaction (SIGTERM, &sa_term, NULL);
345
346   memset (&sa_pipe, 0, sizeof (sa_pipe));
347   sa_pipe.sa_handler = SIG_IGN;
348   sigaction (SIGPIPE, &sa_pipe, NULL);
349
350   memset (&sa_pipe, 0, sizeof (sa_usr1));
351   sa_usr1.sa_handler = sig_usr1_handler;
352   sigaction (SIGUSR1, &sa_usr1, NULL);
353
354   memset (&sa_usr2, 0, sizeof (sa_usr2));
355   sa_usr2.sa_handler = sig_usr2_handler;
356   sigaction (SIGUSR2, &sa_usr2, NULL);
357
358 } /* }}} void install_signal_handlers */
359
360 static int open_pidfile(char *action, int oflag) /* {{{ */
361 {
362   int fd;
363   const char *file;
364   char *file_copy, *dir;
365
366   file = (config_pid_file != NULL)
367     ? config_pid_file
368     : LOCALSTATEDIR "/run/rrdcached.pid";
369
370   /* dirname may modify its argument */
371   file_copy = strdup(file);
372   if (file_copy == NULL)
373   {
374     fprintf(stderr, "rrdcached: strdup(): %s\n",
375         rrd_strerror(errno));
376     return -1;
377   }
378
379   dir = dirname(file_copy);
380   if (rrd_mkdir_p(dir, 0777) != 0)
381   {
382     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
383         dir, rrd_strerror(errno));
384     return -1;
385   }
386
387   free(file_copy);
388
389   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
390   if (fd < 0)
391     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
392             action, file, rrd_strerror(errno));
393
394   return(fd);
395 } /* }}} static int open_pidfile */
396
397 /* check existing pid file to see whether a daemon is running */
398 static int check_pidfile(void)
399 {
400   int pid_fd;
401   pid_t pid;
402   char pid_str[16];
403
404   pid_fd = open_pidfile("open", O_RDWR);
405   if (pid_fd < 0)
406     return pid_fd;
407
408   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
409     return -1;
410
411   pid = atoi(pid_str);
412   if (pid <= 0)
413     return -1;
414
415   /* another running process that we can signal COULD be
416    * a competing rrdcached */
417   if (pid != getpid() && kill(pid, 0) == 0)
418   {
419     fprintf(stderr,
420             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
421     close(pid_fd);
422     return -1;
423   }
424
425   lseek(pid_fd, 0, SEEK_SET);
426   if (ftruncate(pid_fd, 0) == -1)
427   {
428     fprintf(stderr,
429             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
430     close(pid_fd);
431     return -1;
432   }
433
434   fprintf(stderr,
435           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
436           "rrdcached: starting normally.\n", pid);
437
438   return pid_fd;
439 } /* }}} static int check_pidfile */
440
441 static int write_pidfile (int fd) /* {{{ */
442 {
443   pid_t pid;
444   FILE *fh;
445
446   pid = getpid ();
447
448   fh = fdopen (fd, "w");
449   if (fh == NULL)
450   {
451     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
452     close(fd);
453     return (-1);
454   }
455
456   fprintf (fh, "%i\n", (int) pid);
457   fclose (fh);
458
459   return (0);
460 } /* }}} int write_pidfile */
461
462 static int remove_pidfile (void) /* {{{ */
463 {
464   char *file;
465   int status;
466
467   file = (config_pid_file != NULL)
468     ? config_pid_file
469     : LOCALSTATEDIR "/run/rrdcached.pid";
470
471   status = unlink (file);
472   if (status == 0)
473     return (0);
474   return (errno);
475 } /* }}} int remove_pidfile */
476
477 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
478 {
479   char *eol;
480
481   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
482                sock->next_read - sock->next_cmd);
483
484   if (eol == NULL)
485   {
486     /* no commands left, move remainder back to front of rbuf */
487     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
488             sock->next_read - sock->next_cmd);
489     sock->next_read -= sock->next_cmd;
490     sock->next_cmd = 0;
491     *len = 0;
492     return NULL;
493   }
494   else
495   {
496     char *cmd = sock->rbuf + sock->next_cmd;
497     *eol = '\0';
498
499     sock->next_cmd = eol - sock->rbuf + 1;
500
501     if (eol > sock->rbuf && *(eol-1) == '\r')
502       *(--eol) = '\0'; /* handle "\r\n" EOL */
503
504     *len = eol - cmd;
505
506     return cmd;
507   }
508
509   /* NOTREACHED */
510   assert(1==0);
511 } /* }}} char *next_cmd */
512
513 /* add the characters directly to the write buffer */
514 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
515 {
516   char *new_buf;
517
518   assert(sock != NULL);
519
520   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
521   if (new_buf == NULL)
522   {
523     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
524     return -1;
525   }
526
527   strncpy(new_buf + sock->wbuf_len, str, len + 1);
528
529   sock->wbuf = new_buf;
530   sock->wbuf_len += len;
531
532   return 0;
533 } /* }}} static int add_to_wbuf */
534
535 /* add the text to the "extra" info that's sent after the status line */
536 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
537 {
538   va_list argp;
539   char buffer[RRD_CMD_MAX];
540   int len;
541
542   if (JOURNAL_REPLAY(sock)) return 0;
543   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
544
545   va_start(argp, fmt);
546 #ifdef HAVE_VSNPRINTF
547   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
548 #else
549   len = vsprintf(buffer, fmt, argp);
550 #endif
551   va_end(argp);
552   if (len < 0)
553   {
554     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
555     return -1;
556   }
557
558   return add_to_wbuf(sock, buffer, len);
559 } /* }}} static int add_response_info */
560
561 static int count_lines(char *str) /* {{{ */
562 {
563   int lines = 0;
564
565   if (str != NULL)
566   {
567     while ((str = strchr(str, '\n')) != NULL)
568     {
569       ++lines;
570       ++str;
571     }
572   }
573
574   return lines;
575 } /* }}} static int count_lines */
576
577 /* send the response back to the user.
578  * returns 0 on success, -1 on error
579  * write buffer is always zeroed after this call */
580 static int send_response (listen_socket_t *sock, response_code rc,
581                           char *fmt, ...) /* {{{ */
582 {
583   va_list argp;
584   char buffer[RRD_CMD_MAX];
585   int lines;
586   ssize_t wrote;
587   int rclen, len;
588
589   if (JOURNAL_REPLAY(sock)) return rc;
590
591   if (sock->batch_start)
592   {
593     if (rc == RESP_OK)
594       return rc; /* no response on success during BATCH */
595     lines = sock->batch_cmd;
596   }
597   else if (rc == RESP_OK)
598     lines = count_lines(sock->wbuf);
599   else
600     lines = -1;
601
602   rclen = snprintf(buffer, sizeof buffer, "%d ", lines);
603   va_start(argp, fmt);
604 #ifdef HAVE_VSNPRINTF
605   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
606 #else
607   len = vsprintf(buffer+rclen, fmt, argp);
608 #endif
609   va_end(argp);
610   if (len < 0)
611     return -1;
612
613   len += rclen;
614
615   /* append the result to the wbuf, don't write to the user */
616   if (sock->batch_start)
617     return add_to_wbuf(sock, buffer, len);
618
619   /* first write must be complete */
620   if (len != write(sock->fd, buffer, len))
621   {
622     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
623     return -1;
624   }
625
626   if (sock->wbuf != NULL && rc == RESP_OK)
627   {
628     wrote = 0;
629     while (wrote < sock->wbuf_len)
630     {
631       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
632       if (wb <= 0)
633       {
634         RRDD_LOG(LOG_INFO, "send_response: could not write results");
635         return -1;
636       }
637       wrote += wb;
638     }
639   }
640
641   free(sock->wbuf); sock->wbuf = NULL;
642   sock->wbuf_len = 0;
643
644   return 0;
645 } /* }}} */
646
647 static void wipe_ci_values(cache_item_t *ci, time_t when)
648 {
649   ci->values = NULL;
650   ci->values_num = 0;
651   ci->values_alloc = 0;
652
653   ci->last_flush_time = when;
654   if (config_write_jitter > 0)
655     ci->last_flush_time += (rrd_random() % config_write_jitter);
656 }
657
658 /* remove_from_queue
659  * remove a "cache_item_t" item from the queue.
660  * must hold 'cache_lock' when calling this
661  */
662 static void remove_from_queue(cache_item_t *ci) /* {{{ */
663 {
664   if (ci == NULL) return;
665   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
666
667   if (ci->prev == NULL)
668     cache_queue_head = ci->next; /* reset head */
669   else
670     ci->prev->next = ci->next;
671
672   if (ci->next == NULL)
673     cache_queue_tail = ci->prev; /* reset the tail */
674   else
675     ci->next->prev = ci->prev;
676
677   ci->next = ci->prev = NULL;
678   ci->flags &= ~CI_FLAGS_IN_QUEUE;
679
680   pthread_mutex_lock (&stats_lock);
681   assert (stats_queue_length > 0);
682   stats_queue_length--;
683   pthread_mutex_unlock (&stats_lock);
684
685 } /* }}} static void remove_from_queue */
686
687 /* free the resources associated with the cache_item_t
688  * must hold cache_lock when calling this function
689  */
690 static void *free_cache_item(cache_item_t *ci) /* {{{ */
691 {
692   if (ci == NULL) return NULL;
693
694   remove_from_queue(ci);
695
696   for (size_t i=0; i < ci->values_num; i++)
697     free(ci->values[i]);
698
699   free (ci->values);
700   free (ci->file);
701
702   /* in case anyone is waiting */
703   pthread_cond_broadcast(&ci->flushed);
704   pthread_cond_destroy(&ci->flushed);
705
706   free (ci);
707
708   return NULL;
709 } /* }}} static void *free_cache_item */
710
711 /*
712  * enqueue_cache_item:
713  * `cache_lock' must be acquired before calling this function!
714  */
715 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
716     queue_side_t side)
717 {
718   if (ci == NULL)
719     return (-1);
720
721   if (ci->values_num == 0)
722     return (0);
723
724   if (side == HEAD)
725   {
726     if (cache_queue_head == ci)
727       return 0;
728
729     /* remove if further down in queue */
730     remove_from_queue(ci);
731
732     ci->prev = NULL;
733     ci->next = cache_queue_head;
734     if (ci->next != NULL)
735       ci->next->prev = ci;
736     cache_queue_head = ci;
737
738     if (cache_queue_tail == NULL)
739       cache_queue_tail = cache_queue_head;
740   }
741   else /* (side == TAIL) */
742   {
743     /* We don't move values back in the list.. */
744     if (ci->flags & CI_FLAGS_IN_QUEUE)
745       return (0);
746
747     assert (ci->next == NULL);
748     assert (ci->prev == NULL);
749
750     ci->prev = cache_queue_tail;
751
752     if (cache_queue_tail == NULL)
753       cache_queue_head = ci;
754     else
755       cache_queue_tail->next = ci;
756
757     cache_queue_tail = ci;
758   }
759
760   ci->flags |= CI_FLAGS_IN_QUEUE;
761
762   pthread_cond_signal(&queue_cond);
763   pthread_mutex_lock (&stats_lock);
764   stats_queue_length++;
765   pthread_mutex_unlock (&stats_lock);
766
767   return (0);
768 } /* }}} int enqueue_cache_item */
769
770 /*
771  * tree_callback_flush:
772  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
773  * while this is in progress.
774  */
775 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
776     gpointer data)
777 {
778   cache_item_t *ci;
779   callback_flush_data_t *cfd;
780
781   ci = (cache_item_t *) value;
782   cfd = (callback_flush_data_t *) data;
783
784   if (ci->flags & CI_FLAGS_IN_QUEUE)
785     return FALSE;
786
787   if (ci->values_num > 0
788       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
789   {
790     enqueue_cache_item (ci, TAIL);
791   }
792   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
793       && (ci->values_num <= 0))
794   {
795     assert ((char *) key == ci->file);
796     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
797     {
798       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
799       return (FALSE);
800     }
801   }
802
803   return (FALSE);
804 } /* }}} gboolean tree_callback_flush */
805
806 static int flush_old_values (int max_age)
807 {
808   callback_flush_data_t cfd;
809   size_t k;
810
811   memset (&cfd, 0, sizeof (cfd));
812   /* Pass the current time as user data so that we don't need to call
813    * `time' for each node. */
814   cfd.now = time (NULL);
815   cfd.keys = NULL;
816   cfd.keys_num = 0;
817
818   if (max_age > 0)
819     cfd.abs_timeout = cfd.now - max_age;
820   else
821     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
822
823   /* `tree_callback_flush' will return the keys of all values that haven't
824    * been touched in the last `config_flush_interval' seconds in `cfd'.
825    * The char*'s in this array point to the same memory as ci->file, so we
826    * don't need to free them separately. */
827   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
828
829   for (k = 0; k < cfd.keys_num; k++)
830   {
831     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
832     /* should never fail, since we have held the cache_lock
833      * the entire time */
834     assert(status == TRUE);
835   }
836
837   if (cfd.keys != NULL)
838   {
839     free (cfd.keys);
840     cfd.keys = NULL;
841   }
842
843   return (0);
844 } /* int flush_old_values */
845
846 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
847 {
848   struct timeval now;
849   struct timespec next_flush;
850   int status;
851
852   gettimeofday (&now, NULL);
853   next_flush.tv_sec = now.tv_sec + config_flush_interval;
854   next_flush.tv_nsec = 1000 * now.tv_usec;
855
856   pthread_mutex_lock(&cache_lock);
857
858   while (state == RUNNING)
859   {
860     gettimeofday (&now, NULL);
861     if ((now.tv_sec > next_flush.tv_sec)
862         || ((now.tv_sec == next_flush.tv_sec)
863           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
864     {
865       RRDD_LOG(LOG_DEBUG, "flushing old values");
866
867       /* Determine the time of the next cache flush. */
868       next_flush.tv_sec = now.tv_sec + config_flush_interval;
869
870       /* Flush all values that haven't been written in the last
871        * `config_write_interval' seconds. */
872       flush_old_values (config_write_interval);
873
874       /* unlock the cache while we rotate so we don't block incoming
875        * updates if the fsync() blocks on disk I/O */
876       pthread_mutex_unlock(&cache_lock);
877       journal_rotate();
878       pthread_mutex_lock(&cache_lock);
879     }
880
881     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
882     if (status != 0 && status != ETIMEDOUT)
883     {
884       RRDD_LOG (LOG_ERR, "flush_thread_main: "
885                 "pthread_cond_timedwait returned %i.", status);
886     }
887   }
888
889   if (config_flush_at_shutdown)
890     flush_old_values (-1); /* flush everything */
891
892   state = SHUTDOWN;
893
894   pthread_mutex_unlock(&cache_lock);
895
896   return NULL;
897 } /* void *flush_thread_main */
898
899 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
900 {
901   pthread_mutex_lock (&cache_lock);
902
903   while (state != SHUTDOWN
904          || (cache_queue_head != NULL && config_flush_at_shutdown))
905   {
906     cache_item_t *ci;
907     char *file;
908     char **values;
909     size_t values_num;
910     int status;
911
912     /* Now, check if there's something to store away. If not, wait until
913      * something comes in. */
914     if (cache_queue_head == NULL)
915     {
916       status = pthread_cond_wait (&queue_cond, &cache_lock);
917       if ((status != 0) && (status != ETIMEDOUT))
918       {
919         RRDD_LOG (LOG_ERR, "queue_thread_main: "
920             "pthread_cond_wait returned %i.", status);
921       }
922     }
923
924     /* Check if a value has arrived. This may be NULL if we timed out or there
925      * was an interrupt such as a signal. */
926     if (cache_queue_head == NULL)
927       continue;
928
929     ci = cache_queue_head;
930
931     /* copy the relevant parts */
932     file = strdup (ci->file);
933     if (file == NULL)
934     {
935       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
936       continue;
937     }
938
939     assert(ci->values != NULL);
940     assert(ci->values_num > 0);
941
942     values = ci->values;
943     values_num = ci->values_num;
944
945     wipe_ci_values(ci, time(NULL));
946     remove_from_queue(ci);
947
948     pthread_mutex_unlock (&cache_lock);
949
950     rrd_clear_error ();
951     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
952     if (status != 0)
953     {
954       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
955           "rrd_update_r (%s) failed with status %i. (%s)",
956           file, status, rrd_get_error());
957     }
958
959     journal_write("wrote", file);
960
961     /* Search again in the tree.  It's possible someone issued a "FORGET"
962      * while we were writing the update values. */
963     pthread_mutex_lock(&cache_lock);
964     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
965     if (ci)
966       pthread_cond_broadcast(&ci->flushed);
967     pthread_mutex_unlock(&cache_lock);
968
969     if (status == 0)
970     {
971       pthread_mutex_lock (&stats_lock);
972       stats_updates_written++;
973       stats_data_sets_written += values_num;
974       pthread_mutex_unlock (&stats_lock);
975     }
976
977     rrd_free_ptrs((void ***) &values, &values_num);
978     free(file);
979
980     pthread_mutex_lock (&cache_lock);
981   }
982   pthread_mutex_unlock (&cache_lock);
983
984   return (NULL);
985 } /* }}} void *queue_thread_main */
986
987 static int buffer_get_field (char **buffer_ret, /* {{{ */
988     size_t *buffer_size_ret, char **field_ret)
989 {
990   char *buffer;
991   size_t buffer_pos;
992   size_t buffer_size;
993   char *field;
994   size_t field_size;
995   int status;
996
997   buffer = *buffer_ret;
998   buffer_pos = 0;
999   buffer_size = *buffer_size_ret;
1000   field = *buffer_ret;
1001   field_size = 0;
1002
1003   if (buffer_size <= 0)
1004     return (-1);
1005
1006   /* This is ensured by `handle_request'. */
1007   assert (buffer[buffer_size - 1] == '\0');
1008
1009   status = -1;
1010   while (buffer_pos < buffer_size)
1011   {
1012     /* Check for end-of-field or end-of-buffer */
1013     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1014     {
1015       field[field_size] = 0;
1016       field_size++;
1017       buffer_pos++;
1018       status = 0;
1019       break;
1020     }
1021     /* Handle escaped characters. */
1022     else if (buffer[buffer_pos] == '\\')
1023     {
1024       if (buffer_pos >= (buffer_size - 1))
1025         break;
1026       buffer_pos++;
1027       field[field_size] = buffer[buffer_pos];
1028       field_size++;
1029       buffer_pos++;
1030     }
1031     /* Normal operation */ 
1032     else
1033     {
1034       field[field_size] = buffer[buffer_pos];
1035       field_size++;
1036       buffer_pos++;
1037     }
1038   } /* while (buffer_pos < buffer_size) */
1039
1040   if (status != 0)
1041     return (status);
1042
1043   *buffer_ret = buffer + buffer_pos;
1044   *buffer_size_ret = buffer_size - buffer_pos;
1045   *field_ret = field;
1046
1047   return (0);
1048 } /* }}} int buffer_get_field */
1049
1050 /* if we're restricting writes to the base directory,
1051  * check whether the file falls within the dir
1052  * returns 1 if OK, otherwise 0
1053  */
1054 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1055 {
1056   assert(file != NULL);
1057
1058   if (!config_write_base_only
1059       || JOURNAL_REPLAY(sock)
1060       || config_base_dir == NULL)
1061     return 1;
1062
1063   if (strstr(file, "../") != NULL) goto err;
1064
1065   /* relative paths without "../" are ok */
1066   if (*file != '/') return 1;
1067
1068   /* file must be of the format base + "/" + <1+ char filename> */
1069   if (strlen(file) < _config_base_dir_len + 2) goto err;
1070   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1071   if (*(file + _config_base_dir_len) != '/') goto err;
1072
1073   return 1;
1074
1075 err:
1076   if (sock != NULL && sock->fd >= 0)
1077     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1078
1079   return 0;
1080 } /* }}} static int check_file_access */
1081
1082 /* when using a base dir, convert relative paths to absolute paths.
1083  * if necessary, modifies the "filename" pointer to point
1084  * to the new path created in "tmp".  "tmp" is provided
1085  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1086  *
1087  * this allows us to optimize for the expected case (absolute path)
1088  * with a no-op.
1089  */
1090 static void get_abs_path(char **filename, char *tmp)
1091 {
1092   assert(tmp != NULL);
1093   assert(filename != NULL && *filename != NULL);
1094
1095   if (config_base_dir == NULL || **filename == '/')
1096     return;
1097
1098   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1099   *filename = tmp;
1100 } /* }}} static int get_abs_path */
1101
1102 static int flush_file (const char *filename) /* {{{ */
1103 {
1104   cache_item_t *ci;
1105
1106   pthread_mutex_lock (&cache_lock);
1107
1108   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1109   if (ci == NULL)
1110   {
1111     pthread_mutex_unlock (&cache_lock);
1112     return (ENOENT);
1113   }
1114
1115   if (ci->values_num > 0)
1116   {
1117     /* Enqueue at head */
1118     enqueue_cache_item (ci, HEAD);
1119     pthread_cond_wait(&ci->flushed, &cache_lock);
1120   }
1121
1122   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1123    * may have been purged during our cond_wait() */
1124
1125   pthread_mutex_unlock(&cache_lock);
1126
1127   return (0);
1128 } /* }}} int flush_file */
1129
1130 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1131 {
1132   char *err = "Syntax error.\n";
1133
1134   if (cmd && cmd->syntax)
1135     err = cmd->syntax;
1136
1137   return send_response(sock, RESP_ERR, "Usage: %s", err);
1138 } /* }}} static int syntax_error() */
1139
1140 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1141 {
1142   uint64_t copy_queue_length;
1143   uint64_t copy_updates_received;
1144   uint64_t copy_flush_received;
1145   uint64_t copy_updates_written;
1146   uint64_t copy_data_sets_written;
1147   uint64_t copy_journal_bytes;
1148   uint64_t copy_journal_rotate;
1149
1150   uint64_t tree_nodes_number;
1151   uint64_t tree_depth;
1152
1153   pthread_mutex_lock (&stats_lock);
1154   copy_queue_length       = stats_queue_length;
1155   copy_updates_received   = stats_updates_received;
1156   copy_flush_received     = stats_flush_received;
1157   copy_updates_written    = stats_updates_written;
1158   copy_data_sets_written  = stats_data_sets_written;
1159   copy_journal_bytes      = stats_journal_bytes;
1160   copy_journal_rotate     = stats_journal_rotate;
1161   pthread_mutex_unlock (&stats_lock);
1162
1163   pthread_mutex_lock (&cache_lock);
1164   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1165   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1166   pthread_mutex_unlock (&cache_lock);
1167
1168   add_response_info(sock,
1169                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1170   add_response_info(sock,
1171                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1172   add_response_info(sock,
1173                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1174   add_response_info(sock,
1175                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1176   add_response_info(sock,
1177                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1178   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1179   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1180   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1181   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1182
1183   send_response(sock, RESP_OK, "Statistics follow\n");
1184
1185   return (0);
1186 } /* }}} int handle_request_stats */
1187
1188 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1189 {
1190   char *file, file_tmp[PATH_MAX];
1191   int status;
1192
1193   status = buffer_get_field (&buffer, &buffer_size, &file);
1194   if (status != 0)
1195   {
1196     return syntax_error(sock,cmd);
1197   }
1198   else
1199   {
1200     pthread_mutex_lock(&stats_lock);
1201     stats_flush_received++;
1202     pthread_mutex_unlock(&stats_lock);
1203
1204     get_abs_path(&file, file_tmp);
1205     if (!check_file_access(file, sock)) return 0;
1206
1207     status = flush_file (file);
1208     if (status == 0)
1209       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1210     else if (status == ENOENT)
1211     {
1212       /* no file in our tree; see whether it exists at all */
1213       struct stat statbuf;
1214
1215       memset(&statbuf, 0, sizeof(statbuf));
1216       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1217         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1218       else
1219         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1220     }
1221     else if (status < 0)
1222       return send_response(sock, RESP_ERR, "Internal error.\n");
1223     else
1224       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1225   }
1226
1227   /* NOTREACHED */
1228   assert(1==0);
1229 } /* }}} int handle_request_flush */
1230
1231 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1232 {
1233   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1234
1235   pthread_mutex_lock(&cache_lock);
1236   flush_old_values(-1);
1237   pthread_mutex_unlock(&cache_lock);
1238
1239   return send_response(sock, RESP_OK, "Started flush.\n");
1240 } /* }}} static int handle_request_flushall */
1241
1242 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1243 {
1244   int status;
1245   char *file, file_tmp[PATH_MAX];
1246   cache_item_t *ci;
1247
1248   status = buffer_get_field(&buffer, &buffer_size, &file);
1249   if (status != 0)
1250     return syntax_error(sock,cmd);
1251
1252   get_abs_path(&file, file_tmp);
1253
1254   pthread_mutex_lock(&cache_lock);
1255   ci = g_tree_lookup(cache_tree, file);
1256   if (ci == NULL)
1257   {
1258     pthread_mutex_unlock(&cache_lock);
1259     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1260   }
1261
1262   for (size_t i=0; i < ci->values_num; i++)
1263     add_response_info(sock, "%s\n", ci->values[i]);
1264
1265   pthread_mutex_unlock(&cache_lock);
1266   return send_response(sock, RESP_OK, "updates pending\n");
1267 } /* }}} static int handle_request_pending */
1268
1269 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1270 {
1271   int status;
1272   gboolean found;
1273   char *file, file_tmp[PATH_MAX];
1274
1275   status = buffer_get_field(&buffer, &buffer_size, &file);
1276   if (status != 0)
1277     return syntax_error(sock,cmd);
1278
1279   get_abs_path(&file, file_tmp);
1280   if (!check_file_access(file, sock)) return 0;
1281
1282   pthread_mutex_lock(&cache_lock);
1283   found = g_tree_remove(cache_tree, file);
1284   pthread_mutex_unlock(&cache_lock);
1285
1286   if (found == TRUE)
1287   {
1288     if (!JOURNAL_REPLAY(sock))
1289       journal_write("forget", file);
1290
1291     return send_response(sock, RESP_OK, "Gone!\n");
1292   }
1293   else
1294     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1295
1296   /* NOTREACHED */
1297   assert(1==0);
1298 } /* }}} static int handle_request_forget */
1299
1300 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1301 {
1302   cache_item_t *ci;
1303
1304   pthread_mutex_lock(&cache_lock);
1305
1306   ci = cache_queue_head;
1307   while (ci != NULL)
1308   {
1309     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1310     ci = ci->next;
1311   }
1312
1313   pthread_mutex_unlock(&cache_lock);
1314
1315   return send_response(sock, RESP_OK, "in queue.\n");
1316 } /* }}} int handle_request_queue */
1317
1318 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1319 {
1320   char *file, file_tmp[PATH_MAX];
1321   int values_num = 0;
1322   int status;
1323   char orig_buf[RRD_CMD_MAX];
1324
1325   cache_item_t *ci;
1326
1327   /* save it for the journal later */
1328   if (!JOURNAL_REPLAY(sock))
1329     strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1330
1331   status = buffer_get_field (&buffer, &buffer_size, &file);
1332   if (status != 0)
1333     return syntax_error(sock,cmd);
1334
1335   pthread_mutex_lock(&stats_lock);
1336   stats_updates_received++;
1337   pthread_mutex_unlock(&stats_lock);
1338
1339   get_abs_path(&file, file_tmp);
1340   if (!check_file_access(file, sock)) return 0;
1341
1342   pthread_mutex_lock (&cache_lock);
1343   ci = g_tree_lookup (cache_tree, file);
1344
1345   if (ci == NULL) /* {{{ */
1346   {
1347     struct stat statbuf;
1348     cache_item_t *tmp;
1349
1350     /* don't hold the lock while we setup; stat(2) might block */
1351     pthread_mutex_unlock(&cache_lock);
1352
1353     memset (&statbuf, 0, sizeof (statbuf));
1354     status = stat (file, &statbuf);
1355     if (status != 0)
1356     {
1357       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1358
1359       status = errno;
1360       if (status == ENOENT)
1361         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1362       else
1363         return send_response(sock, RESP_ERR,
1364                              "stat failed with error %i.\n", status);
1365     }
1366     if (!S_ISREG (statbuf.st_mode))
1367       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1368
1369     if (access(file, R_OK|W_OK) != 0)
1370       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1371                            file, rrd_strerror(errno));
1372
1373     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1374     if (ci == NULL)
1375     {
1376       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1377
1378       return send_response(sock, RESP_ERR, "malloc failed.\n");
1379     }
1380     memset (ci, 0, sizeof (cache_item_t));
1381
1382     ci->file = strdup (file);
1383     if (ci->file == NULL)
1384     {
1385       free (ci);
1386       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1387
1388       return send_response(sock, RESP_ERR, "strdup failed.\n");
1389     }
1390
1391     wipe_ci_values(ci, now);
1392     ci->flags = CI_FLAGS_IN_TREE;
1393     pthread_cond_init(&ci->flushed, NULL);
1394
1395     pthread_mutex_lock(&cache_lock);
1396
1397     /* another UPDATE might have added this entry in the meantime */
1398     tmp = g_tree_lookup (cache_tree, file);
1399     if (tmp == NULL)
1400       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1401     else
1402     {
1403       free_cache_item (ci);
1404       ci = tmp;
1405     }
1406
1407     /* state may have changed while we were unlocked */
1408     if (state == SHUTDOWN)
1409       return -1;
1410   } /* }}} */
1411   assert (ci != NULL);
1412
1413   /* don't re-write updates in replay mode */
1414   if (!JOURNAL_REPLAY(sock))
1415     journal_write("update", orig_buf);
1416
1417   while (buffer_size > 0)
1418   {
1419     char *value;
1420     double stamp;
1421     char *eostamp;
1422
1423     status = buffer_get_field (&buffer, &buffer_size, &value);
1424     if (status != 0)
1425     {
1426       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1427       break;
1428     }
1429
1430     /* make sure update time is always moving forward. We use double here since
1431        update does support subsecond precision for timestamps ... */
1432     stamp = strtod(value, &eostamp);
1433     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "Cannot find timestamp in '%s'!\n", value);
1438     }
1439     else if (stamp <= ci->last_update_stamp)
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "illegal attempt to update using time %lf when last"
1444                            " update time is %lf (minimum one second step)\n",
1445                            stamp, ci->last_update_stamp);
1446     }
1447     else
1448       ci->last_update_stamp = stamp;
1449
1450     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451                               &ci->values_alloc, config_alloc_chunk))
1452     {
1453       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454       continue;
1455     }
1456
1457     values_num++;
1458   }
1459
1460   if (((now - ci->last_flush_time) >= config_write_interval)
1461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462       && (ci->values_num > 0))
1463   {
1464     enqueue_cache_item (ci, TAIL);
1465   }
1466
1467   pthread_mutex_unlock (&cache_lock);
1468
1469   if (values_num < 1)
1470     return send_response(sock, RESP_ERR, "No values updated.\n");
1471   else
1472     return send_response(sock, RESP_OK,
1473                          "errors, enqueued %i value(s).\n", values_num);
1474
1475   /* NOTREACHED */
1476   assert(1==0);
1477
1478 } /* }}} int handle_request_update */
1479
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1481 {
1482   char *file, file_tmp[PATH_MAX];
1483   char *cf;
1484
1485   char *start_str;
1486   char *end_str;
1487   time_t start_tm;
1488   time_t end_tm;
1489
1490   unsigned long step;
1491   unsigned long ds_cnt;
1492   char **ds_namv;
1493   rrd_value_t *data;
1494
1495   int status;
1496   unsigned long i;
1497   time_t t;
1498   rrd_value_t *data_ptr;
1499
1500   file = NULL;
1501   cf = NULL;
1502   start_str = NULL;
1503   end_str = NULL;
1504
1505   /* Read the arguments */
1506   do /* while (0) */
1507   {
1508     status = buffer_get_field (&buffer, &buffer_size, &file);
1509     if (status != 0)
1510       break;
1511
1512     status = buffer_get_field (&buffer, &buffer_size, &cf);
1513     if (status != 0)
1514       break;
1515
1516     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1517     if (status != 0)
1518     {
1519       start_str = NULL;
1520       status = 0;
1521       break;
1522     }
1523
1524     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1525     if (status != 0)
1526     {
1527       end_str = NULL;
1528       status = 0;
1529       break;
1530     }
1531   } while (0);
1532
1533   if (status != 0)
1534     return (syntax_error(sock,cmd));
1535
1536   get_abs_path(&file, file_tmp);
1537   if (!check_file_access(file, sock)) return 0;
1538
1539   status = flush_file (file);
1540   if ((status != 0) && (status != ENOENT))
1541     return (send_response (sock, RESP_ERR,
1542           "flush_file (%s) failed with status %i.\n", file, status));
1543
1544   t = time (NULL); /* "now" */
1545
1546   /* Parse start time */
1547   if (start_str != NULL)
1548   {
1549     char *endptr;
1550     long value;
1551
1552     endptr = NULL;
1553     errno = 0;
1554     value = strtol (start_str, &endptr, /* base = */ 0);
1555     if ((endptr == start_str) || (errno != 0))
1556       return (send_response(sock, RESP_ERR,
1557             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1558             start_str));
1559
1560     if (value > 0)
1561       start_tm = (time_t) value;
1562     else
1563       start_tm = (time_t) (t + value);
1564   }
1565   else
1566   {
1567     start_tm = t - 86400;
1568   }
1569
1570   /* Parse end time */
1571   if (end_str != NULL)
1572   {
1573     char *endptr;
1574     long value;
1575
1576     endptr = NULL;
1577     errno = 0;
1578     value = strtol (end_str, &endptr, /* base = */ 0);
1579     if ((endptr == end_str) || (errno != 0))
1580       return (send_response(sock, RESP_ERR,
1581             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1582             end_str));
1583
1584     if (value > 0)
1585       end_tm = (time_t) value;
1586     else
1587       end_tm = (time_t) (t + value);
1588   }
1589   else
1590   {
1591     end_tm = t;
1592   }
1593
1594   step = -1;
1595   ds_cnt = 0;
1596   ds_namv = NULL;
1597   data = NULL;
1598
1599   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1600       &ds_cnt, &ds_namv, &data);
1601   if (status != 0)
1602     return (send_response(sock, RESP_ERR,
1603           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1604
1605   add_response_info (sock, "FlushVersion: %lu\n", 1);
1606   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1607   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1608   add_response_info (sock, "Step: %lu\n", step);
1609   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1610
1611 #define SSTRCAT(buffer,str,buffer_fill) do { \
1612     size_t str_len = strlen (str); \
1613     if ((buffer_fill + str_len) > sizeof (buffer)) \
1614       str_len = sizeof (buffer) - buffer_fill; \
1615     if (str_len > 0) { \
1616       strncpy (buffer + buffer_fill, str, str_len); \
1617       buffer_fill += str_len; \
1618       assert (buffer_fill <= sizeof (buffer)); \
1619       if (buffer_fill == sizeof (buffer)) \
1620         buffer[buffer_fill - 1] = 0; \
1621       else \
1622         buffer[buffer_fill] = 0; \
1623     } \
1624   } while (0)
1625
1626   { /* Add list of DS names */
1627     char linebuf[1024];
1628     size_t linebuf_fill;
1629
1630     memset (linebuf, 0, sizeof (linebuf));
1631     linebuf_fill = 0;
1632     for (i = 0; i < ds_cnt; i++)
1633     {
1634       if (i > 0)
1635         SSTRCAT (linebuf, " ", linebuf_fill);
1636       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1637       rrd_freemem(ds_namv[i]);
1638     }
1639     rrd_freemem(ds_namv);
1640     add_response_info (sock, "DSName: %s\n", linebuf);
1641   }
1642
1643   /* Add the actual data */
1644   assert (step > 0);
1645   data_ptr = data;
1646   for (t = start_tm + step; t <= end_tm; t += step)
1647   {
1648     char linebuf[1024];
1649     size_t linebuf_fill;
1650     char tmp[128];
1651
1652     memset (linebuf, 0, sizeof (linebuf));
1653     linebuf_fill = 0;
1654     for (i = 0; i < ds_cnt; i++)
1655     {
1656       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1657       tmp[sizeof (tmp) - 1] = 0;
1658       SSTRCAT (linebuf, tmp, linebuf_fill);
1659
1660       data_ptr++;
1661     }
1662
1663     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1664   } /* for (t) */
1665   rrd_freemem(data);
1666
1667   return (send_response (sock, RESP_OK, "Success\n"));
1668 #undef SSTRCAT
1669 } /* }}} int handle_request_fetch */
1670
1671 /* we came across a "WROTE" entry during journal replay.
1672  * throw away any values that we have accumulated for this file
1673  */
1674 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1675 {
1676   cache_item_t *ci;
1677   const char *file = buffer;
1678
1679   pthread_mutex_lock(&cache_lock);
1680
1681   ci = g_tree_lookup(cache_tree, file);
1682   if (ci == NULL)
1683   {
1684     pthread_mutex_unlock(&cache_lock);
1685     return (0);
1686   }
1687
1688   if (ci->values)
1689     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1690
1691   wipe_ci_values(ci, now);
1692   remove_from_queue(ci);
1693
1694   pthread_mutex_unlock(&cache_lock);
1695   return (0);
1696 } /* }}} int handle_request_wrote */
1697
1698 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1699 {
1700   char *file, file_tmp[PATH_MAX];
1701   int status;
1702   rrd_info_t *info;
1703
1704   /* obtain filename */
1705   status = buffer_get_field(&buffer, &buffer_size, &file);
1706   if (status != 0)
1707     return syntax_error(sock,cmd);
1708   /* get full pathname */
1709   get_abs_path(&file, file_tmp);
1710   if (!check_file_access(file, sock)) {
1711     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1712   }
1713   /* get data */
1714   rrd_clear_error ();
1715   info = rrd_info_r(file);
1716   if(!info) {
1717     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1718   }
1719   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1720       switch (data->type) {
1721       case RD_I_VAL:
1722           if (isnan(data->value.u_val))
1723               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1724           else
1725               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1726           break;
1727       case RD_I_CNT:
1728           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1729           break;
1730       case RD_I_INT:
1731           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1732           break;
1733       case RD_I_STR:
1734           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1735           break;
1736       case RD_I_BLO:
1737           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1738           break;
1739       }
1740   }
1741
1742   rrd_info_free(info);
1743
1744   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info  */
1746
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1748 {
1749   char *i, *file, file_tmp[PATH_MAX];
1750   int status;
1751   int idx;
1752   time_t t;
1753
1754   /* obtain filename */
1755   status = buffer_get_field(&buffer, &buffer_size, &file);
1756   if (status != 0)
1757     return syntax_error(sock,cmd);
1758   /* get full pathname */
1759   get_abs_path(&file, file_tmp);
1760   if (!check_file_access(file, sock)) {
1761     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1762   }
1763
1764   status = buffer_get_field(&buffer, &buffer_size, &i);
1765   if (status != 0)
1766     return syntax_error(sock,cmd);
1767   idx = atoi(i);
1768   if(idx<0) { 
1769     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1770   }
1771
1772   /* get data */
1773   rrd_clear_error ();
1774   t = rrd_first_r(file,idx);
1775   if(t<1) {
1776     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1777   }
1778   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_first  */
1780
1781
1782 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1783 {
1784   char *file, file_tmp[PATH_MAX];
1785   int status;
1786   time_t t, from_file, step;
1787   rrd_file_t * rrd_file;
1788   cache_item_t * ci;
1789   rrd_t rrd; 
1790
1791   /* obtain filename */
1792   status = buffer_get_field(&buffer, &buffer_size, &file);
1793   if (status != 0)
1794     return syntax_error(sock,cmd);
1795   /* get full pathname */
1796   get_abs_path(&file, file_tmp);
1797   if (!check_file_access(file, sock)) {
1798     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1799   }
1800   rrd_clear_error();
1801   rrd_init(&rrd);
1802   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1803   if(!rrd_file) {
1804     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1805   }
1806   from_file = rrd.live_head->last_up;
1807   step = rrd.stat_head->pdp_step;
1808   rrd_close(rrd_file);
1809   pthread_mutex_lock(&cache_lock);
1810   ci = g_tree_lookup(cache_tree, file);
1811   if (ci)
1812     t = ci->last_update_stamp;
1813   else
1814     t = from_file;
1815   pthread_mutex_unlock(&cache_lock);
1816   t -= t % step;
1817   rrd_free(&rrd);
1818   if(t<1) {
1819     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1820   }
1821   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1822 } /* }}} static int handle_request_last  */
1823
1824 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1825 {
1826   char *file, file_tmp[PATH_MAX];
1827   char *tok;
1828   int ac = 0;
1829   char *av[128];
1830   int status;
1831   unsigned long step = 300;
1832   time_t last_up = time(NULL)-10;
1833   int no_overwrite = opt_no_overwrite;
1834
1835
1836   /* obtain filename */
1837   status = buffer_get_field(&buffer, &buffer_size, &file);
1838   if (status != 0)
1839     return syntax_error(sock,cmd);
1840   /* get full pathname */
1841   get_abs_path(&file, file_tmp);
1842   if (!check_file_access(file, sock)) {
1843     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1844   }
1845   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1846
1847   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1848     if( ! strncmp(tok,"-b",2) ) {
1849       status = buffer_get_field(&buffer, &buffer_size, &tok );
1850       if (status != 0) return syntax_error(sock,cmd);
1851       last_up = (time_t) atol(tok);
1852       continue;
1853     }
1854     if( ! strncmp(tok,"-s",2) ) {
1855       status = buffer_get_field(&buffer, &buffer_size, &tok );
1856       if (status != 0) return syntax_error(sock,cmd);
1857       step = atol(tok);
1858       continue;
1859     }
1860     if( ! strncmp(tok,"-O",2) ) {
1861       no_overwrite = 1;
1862       continue;
1863     }
1864     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1865     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1866     return syntax_error(sock,cmd);
1867   }
1868   if(step<1) {
1869     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1870   }
1871   if (last_up < 3600 * 24 * 365 * 10) {
1872     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1873   }
1874
1875   rrd_clear_error ();
1876   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1877
1878   if(!status) {
1879     return send_response(sock, RESP_OK, "RRD created OK\n");
1880   }
1881   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1882 } /* }}} static int handle_request_create  */
1883
1884 /* start "BATCH" processing */
1885 static int batch_start (HANDLER_PROTO) /* {{{ */
1886 {
1887   int status;
1888   if (sock->batch_start)
1889     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1890
1891   status = send_response(sock, RESP_OK,
1892                          "Go ahead.  End with dot '.' on its own line.\n");
1893   sock->batch_start = time(NULL);
1894   sock->batch_cmd = 0;
1895
1896   return status;
1897 } /* }}} static int batch_start */
1898
1899 /* finish "BATCH" processing and return results to the client */
1900 static int batch_done (HANDLER_PROTO) /* {{{ */
1901 {
1902   assert(sock->batch_start);
1903   sock->batch_start = 0;
1904   sock->batch_cmd  = 0;
1905   return send_response(sock, RESP_OK, "errors\n");
1906 } /* }}} static int batch_done */
1907
1908 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1909 {
1910   return -1;
1911 } /* }}} static int handle_request_quit */
1912
1913 static command_t list_of_commands[] = { /* {{{ */
1914   {
1915     "UPDATE",
1916     handle_request_update,
1917     CMD_CONTEXT_ANY,
1918     "UPDATE <filename> <values> [<values> ...]\n"
1919     ,
1920     "Adds the given file to the internal cache if it is not yet known and\n"
1921     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1922     "for details.\n"
1923     "\n"
1924     "Each <values> has the following form:\n"
1925     "  <values> = <time>:<value>[:<value>[...]]\n"
1926     "See the rrdupdate(1) manpage for details.\n"
1927   },
1928   {
1929     "WROTE",
1930     handle_request_wrote,
1931     CMD_CONTEXT_JOURNAL,
1932     NULL,
1933     NULL
1934   },
1935   {
1936     "FLUSH",
1937     handle_request_flush,
1938     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1939     "FLUSH <filename>\n"
1940     ,
1941     "Adds the given filename to the head of the update queue and returns\n"
1942     "after it has been dequeued.\n"
1943   },
1944   {
1945     "FLUSHALL",
1946     handle_request_flushall,
1947     CMD_CONTEXT_CLIENT,
1948     "FLUSHALL\n"
1949     ,
1950     "Triggers writing of all pending updates.  Returns immediately.\n"
1951   },
1952   {
1953     "PENDING",
1954     handle_request_pending,
1955     CMD_CONTEXT_CLIENT,
1956     "PENDING <filename>\n"
1957     ,
1958     "Shows any 'pending' updates for a file, in order.\n"
1959     "The updates shown have not yet been written to the underlying RRD file.\n"
1960   },
1961   {
1962     "FORGET",
1963     handle_request_forget,
1964     CMD_CONTEXT_ANY,
1965     "FORGET <filename>\n"
1966     ,
1967     "Removes the file completely from the cache.\n"
1968     "Any pending updates for the file will be lost.\n"
1969   },
1970   {
1971     "QUEUE",
1972     handle_request_queue,
1973     CMD_CONTEXT_CLIENT,
1974     "QUEUE\n"
1975     ,
1976         "Shows all files in the output queue.\n"
1977     "The output is zero or more lines in the following format:\n"
1978     "(where <num_vals> is the number of values to be written)\n"
1979     "\n"
1980     "<num_vals> <filename>\n"
1981   },
1982   {
1983     "STATS",
1984     handle_request_stats,
1985     CMD_CONTEXT_CLIENT,
1986     "STATS\n"
1987     ,
1988     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1989     "a description of the values.\n"
1990   },
1991   {
1992     "HELP",
1993     handle_request_help,
1994     CMD_CONTEXT_CLIENT,
1995     "HELP [<command>]\n",
1996     NULL, /* special! */
1997   },
1998   {
1999     "BATCH",
2000     batch_start,
2001     CMD_CONTEXT_CLIENT,
2002     "BATCH\n"
2003     ,
2004     "The 'BATCH' command permits the client to initiate a bulk load\n"
2005     "   of commands to rrdcached.\n"
2006     "\n"
2007     "Usage:\n"
2008     "\n"
2009     "    client: BATCH\n"
2010     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2011     "    client: command #1\n"
2012     "    client: command #2\n"
2013     "    client: ... and so on\n"
2014     "    client: .\n"
2015     "    server: 2 errors\n"
2016     "    server: 7 message for command #7\n"
2017     "    server: 9 message for command #9\n"
2018     "\n"
2019     "For more information, consult the rrdcached(1) documentation.\n"
2020   },
2021   {
2022     ".",   /* BATCH terminator */
2023     batch_done,
2024     CMD_CONTEXT_BATCH,
2025     NULL,
2026     NULL
2027   },
2028   {
2029     "FETCH",
2030     handle_request_fetch,
2031     CMD_CONTEXT_CLIENT,
2032     "FETCH <file> <CF> [<start> [<end>]]\n"
2033     ,
2034     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2035   },
2036   {
2037     "INFO",
2038     handle_request_info,
2039     CMD_CONTEXT_CLIENT,
2040     "INFO <filename>\n",
2041     "The INFO command retrieves information about a specified RRD file.\n"
2042     "This is returned in standard rrdinfo format, a sequence of lines\n"
2043     "with the format <keyname> = <value>\n"
2044     "Note that this is the data as of the last update of the RRD file itself,\n"
2045     "not the last time data was received via rrdcached, so there may be pending\n"
2046     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2047   },
2048   {
2049     "FIRST",
2050     handle_request_first,
2051     CMD_CONTEXT_CLIENT,
2052     "FIRST <filename> <rra index>\n",
2053     "The FIRST command retrieves the first data time for a specified RRA in\n"
2054     "an RRD file.\n"
2055   },
2056   {
2057     "LAST",
2058     handle_request_last,
2059     CMD_CONTEXT_CLIENT,
2060     "LAST <filename>\n",
2061     "The LAST command retrieves the last update time for a specified RRD file.\n"
2062     "Note that this is the time of the last update of the RRD file itself, not\n"
2063     "the last time data was received via rrdcached, so there may be pending\n"
2064     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2065   },
2066   {
2067     "CREATE",
2068     handle_request_create,
2069     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2070     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2071     "The CREATE command will create an RRD file, overwriting any existing file\n"
2072     "unless the -O option is given or rrdcached was started with the -O option.\n"
2073     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2074     "not acceptable) and the step is in seconds (default is 300).\n"
2075     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2076   },
2077   {
2078     "QUIT",
2079     handle_request_quit,
2080     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2081     "QUIT\n"
2082     ,
2083     "Disconnect from rrdcached.\n"
2084   }
2085 }; /* }}} command_t list_of_commands[] */
2086 static size_t list_of_commands_len = sizeof (list_of_commands)
2087   / sizeof (list_of_commands[0]);
2088
2089 static command_t *find_command(char *cmd)
2090 {
2091   size_t i;
2092
2093   for (i = 0; i < list_of_commands_len; i++)
2094     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2095       return (&list_of_commands[i]);
2096   return NULL;
2097 }
2098
2099 /* We currently use the index in the `list_of_commands' array as a bit position
2100  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2101  * outside these functions so that switching to a more elegant storage method
2102  * is easily possible. */
2103 static ssize_t find_command_index (const char *cmd) /* {{{ */
2104 {
2105   size_t i;
2106
2107   for (i = 0; i < list_of_commands_len; i++)
2108     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2109       return ((ssize_t) i);
2110   return (-1);
2111 } /* }}} ssize_t find_command_index */
2112
2113 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2114     const char *cmd)
2115 {
2116   ssize_t i;
2117
2118   if (JOURNAL_REPLAY(sock))
2119     return (1);
2120
2121   if (cmd == NULL)
2122     return (-1);
2123
2124   if ((strcasecmp ("QUIT", cmd) == 0)
2125       || (strcasecmp ("HELP", cmd) == 0))
2126     return (1);
2127   else if (strcmp (".", cmd) == 0)
2128     cmd = "BATCH";
2129
2130   i = find_command_index (cmd);
2131   if (i < 0)
2132     return (-1);
2133   assert (i < 32);
2134
2135   if ((sock->permissions & (1 << i)) != 0)
2136     return (1);
2137   return (0);
2138 } /* }}} int socket_permission_check */
2139
2140 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2141     const char *cmd)
2142 {
2143   ssize_t i;
2144
2145   i = find_command_index (cmd);
2146   if (i < 0)
2147     return (-1);
2148   assert (i < 32);
2149
2150   sock->permissions |= (1 << i);
2151   return (0);
2152 } /* }}} int socket_permission_add */
2153
2154 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2155 {
2156   sock->permissions = 0;
2157 } /* }}} socket_permission_clear */
2158
2159 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2160     listen_socket_t *src)
2161 {
2162   dest->permissions = src->permissions;
2163 } /* }}} socket_permission_copy */
2164
2165 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2166 {
2167   size_t i;
2168
2169   sock->permissions = 0;
2170   for (i = 0; i < list_of_commands_len; i++)
2171     sock->permissions |= (1 << i);
2172 } /* }}} void socket_permission_set_all */
2173
2174 /* check whether commands are received in the expected context */
2175 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2176 {
2177   if (JOURNAL_REPLAY(sock))
2178     return (cmd->context & CMD_CONTEXT_JOURNAL);
2179   else if (sock->batch_start)
2180     return (cmd->context & CMD_CONTEXT_BATCH);
2181   else
2182     return (cmd->context & CMD_CONTEXT_CLIENT);
2183
2184   /* NOTREACHED */
2185   assert(1==0);
2186 }
2187
2188 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2189 {
2190   int status;
2191   char *cmd_str;
2192   char *resp_txt;
2193   command_t *help = NULL;
2194
2195   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2196   if (status == 0)
2197     help = find_command(cmd_str);
2198
2199   if (help && (help->syntax || help->help))
2200   {
2201     char tmp[RRD_CMD_MAX];
2202
2203     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2204     resp_txt = tmp;
2205
2206     if (help->syntax)
2207       add_response_info(sock, "Usage: %s\n", help->syntax);
2208
2209     if (help->help)
2210       add_response_info(sock, "%s\n", help->help);
2211   }
2212   else
2213   {
2214     size_t i;
2215
2216     resp_txt = "Command overview\n";
2217
2218     for (i = 0; i < list_of_commands_len; i++)
2219     {
2220       if (list_of_commands[i].syntax == NULL)
2221         continue;
2222       add_response_info (sock, "%s", list_of_commands[i].syntax);
2223     }
2224   }
2225
2226   return send_response(sock, RESP_OK, resp_txt);
2227 } /* }}} int handle_request_help */
2228
2229 static int handle_request (DISPATCH_PROTO) /* {{{ */
2230 {
2231   char *buffer_ptr = buffer;
2232   char *cmd_str = NULL;
2233   command_t *cmd = NULL;
2234   int status;
2235
2236   assert (buffer[buffer_size - 1] == '\0');
2237
2238   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2239   if (status != 0)
2240   {
2241     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2242     return (-1);
2243   }
2244
2245   if (sock != NULL && sock->batch_start)
2246     sock->batch_cmd++;
2247
2248   cmd = find_command(cmd_str);
2249   if (!cmd)
2250     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2251
2252   if (!socket_permission_check (sock, cmd->cmd))
2253     return send_response(sock, RESP_ERR, "Permission denied.\n");
2254
2255   if (!command_check_context(sock, cmd))
2256     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2257
2258   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2259 } /* }}} int handle_request */
2260
2261 static void journal_set_free (journal_set *js) /* {{{ */
2262 {
2263   if (js == NULL)
2264     return;
2265
2266   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2267
2268   free(js);
2269 } /* }}} journal_set_free */
2270
2271 static void journal_set_remove (journal_set *js) /* {{{ */
2272 {
2273   if (js == NULL)
2274     return;
2275
2276   for (uint i=0; i < js->files_num; i++)
2277   {
2278     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2279     unlink(js->files[i]);
2280   }
2281 } /* }}} journal_set_remove */
2282
2283 /* close current journal file handle.
2284  * MUST hold journal_lock before calling */
2285 static void journal_close(void) /* {{{ */
2286 {
2287   if (journal_fh != NULL)
2288   {
2289     if (fclose(journal_fh) != 0)
2290       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2291   }
2292
2293   journal_fh = NULL;
2294   journal_size = 0;
2295 } /* }}} journal_close */
2296
2297 /* MUST hold journal_lock before calling */
2298 static void journal_new_file(void) /* {{{ */
2299 {
2300   struct timeval now;
2301   int  new_fd;
2302   char new_file[PATH_MAX + 1];
2303
2304   assert(journal_dir != NULL);
2305   assert(journal_cur != NULL);
2306
2307   journal_close();
2308
2309   gettimeofday(&now, NULL);
2310   /* this format assures that the files sort in strcmp() order */
2311   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2312            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2313
2314   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2315                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2316   if (new_fd < 0)
2317     goto error;
2318
2319   journal_fh = fdopen(new_fd, "a");
2320   if (journal_fh == NULL)
2321     goto error;
2322
2323   journal_size = ftell(journal_fh);
2324   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2325
2326   /* record the file in the journal set */
2327   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2328
2329   return;
2330
2331 error:
2332   RRDD_LOG(LOG_CRIT,
2333            "JOURNALING DISABLED: Error while trying to create %s : %s",
2334            new_file, rrd_strerror(errno));
2335   RRDD_LOG(LOG_CRIT,
2336            "JOURNALING DISABLED: All values will be flushed at shutdown");
2337
2338   close(new_fd);
2339   config_flush_at_shutdown = 1;
2340
2341 } /* }}} journal_new_file */
2342
2343 /* MUST NOT hold journal_lock before calling this */
2344 static void journal_rotate(void) /* {{{ */
2345 {
2346   journal_set *old_js = NULL;
2347
2348   if (journal_dir == NULL)
2349     return;
2350
2351   RRDD_LOG(LOG_DEBUG, "rotating journals");
2352
2353   pthread_mutex_lock(&stats_lock);
2354   ++stats_journal_rotate;
2355   pthread_mutex_unlock(&stats_lock);
2356
2357   pthread_mutex_lock(&journal_lock);
2358
2359   journal_close();
2360
2361   /* rotate the journal sets */
2362   old_js = journal_old;
2363   journal_old = journal_cur;
2364   journal_cur = calloc(1, sizeof(journal_set));
2365
2366   if (journal_cur != NULL)
2367     journal_new_file();
2368   else
2369     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2370
2371   pthread_mutex_unlock(&journal_lock);
2372
2373   journal_set_remove(old_js);
2374   journal_set_free  (old_js);
2375
2376 } /* }}} static void journal_rotate */
2377
2378 /* MUST hold journal_lock when calling */
2379 static void journal_done(void) /* {{{ */
2380 {
2381   if (journal_cur == NULL)
2382     return;
2383
2384   journal_close();
2385
2386   if (config_flush_at_shutdown)
2387   {
2388     RRDD_LOG(LOG_INFO, "removing journals");
2389     journal_set_remove(journal_old);
2390     journal_set_remove(journal_cur);
2391   }
2392   else
2393   {
2394     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2395              "journals will be used at next startup");
2396   }
2397
2398   journal_set_free(journal_cur);
2399   journal_set_free(journal_old);
2400   free(journal_dir);
2401
2402 } /* }}} static void journal_done */
2403
2404 static int journal_write(char *cmd, char *args) /* {{{ */
2405 {
2406   int chars;
2407
2408   if (journal_fh == NULL)
2409     return 0;
2410
2411   pthread_mutex_lock(&journal_lock);
2412   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2413   journal_size += chars;
2414
2415   if (journal_size > JOURNAL_MAX)
2416     journal_new_file();
2417
2418   pthread_mutex_unlock(&journal_lock);
2419
2420   if (chars > 0)
2421   {
2422     pthread_mutex_lock(&stats_lock);
2423     stats_journal_bytes += chars;
2424     pthread_mutex_unlock(&stats_lock);
2425   }
2426
2427   return chars;
2428 } /* }}} static int journal_write */
2429
2430 static int journal_replay (const char *file) /* {{{ */
2431 {
2432   FILE *fh;
2433   int entry_cnt = 0;
2434   int fail_cnt = 0;
2435   uint64_t line = 0;
2436   char entry[RRD_CMD_MAX];
2437   time_t now;
2438
2439   if (file == NULL) return 0;
2440
2441   {
2442     char *reason = "unknown error";
2443     int status = 0;
2444     struct stat statbuf;
2445
2446     memset(&statbuf, 0, sizeof(statbuf));
2447     if (stat(file, &statbuf) != 0)
2448     {
2449       reason = "stat error";
2450       status = errno;
2451     }
2452     else if (!S_ISREG(statbuf.st_mode))
2453     {
2454       reason = "not a regular file";
2455       status = EPERM;
2456     }
2457     if (statbuf.st_uid != daemon_uid)
2458     {
2459       reason = "not owned by daemon user";
2460       status = EACCES;
2461     }
2462     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2463     {
2464       reason = "must not be user/group writable";
2465       status = EACCES;
2466     }
2467
2468     if (status != 0)
2469     {
2470       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2471                file, rrd_strerror(status), reason);
2472       return 0;
2473     }
2474   }
2475
2476   fh = fopen(file, "r");
2477   if (fh == NULL)
2478   {
2479     if (errno != ENOENT)
2480       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2481                file, rrd_strerror(errno));
2482     return 0;
2483   }
2484   else
2485     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2486
2487   now = time(NULL);
2488
2489   while(!feof(fh))
2490   {
2491     size_t entry_len;
2492
2493     ++line;
2494     if (fgets(entry, sizeof(entry), fh) == NULL)
2495       break;
2496     entry_len = strlen(entry);
2497
2498     /* check \n termination in case journal writing crashed mid-line */
2499     if (entry_len == 0)
2500       continue;
2501     else if (entry[entry_len - 1] != '\n')
2502     {
2503       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2504       ++fail_cnt;
2505       continue;
2506     }
2507
2508     entry[entry_len - 1] = '\0';
2509
2510     if (handle_request(NULL, now, entry, entry_len) == 0)
2511       ++entry_cnt;
2512     else
2513       ++fail_cnt;
2514   }
2515
2516   fclose(fh);
2517
2518   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2519            entry_cnt, fail_cnt);
2520
2521   return entry_cnt > 0 ? 1 : 0;
2522 } /* }}} static int journal_replay */
2523
2524 static int journal_sort(const void *v1, const void *v2)
2525 {
2526   char **jn1 = (char **) v1;
2527   char **jn2 = (char **) v2;
2528
2529   return strcmp(*jn1,*jn2);
2530 }
2531
2532 static void journal_init(void) /* {{{ */
2533 {
2534   int had_journal = 0;
2535   DIR *dir;
2536   struct dirent *dent;
2537   char path[PATH_MAX+1];
2538
2539   if (journal_dir == NULL) return;
2540
2541   pthread_mutex_lock(&journal_lock);
2542
2543   journal_cur = calloc(1, sizeof(journal_set));
2544   if (journal_cur == NULL)
2545   {
2546     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2547     return;
2548   }
2549
2550   RRDD_LOG(LOG_INFO, "checking for journal files");
2551
2552   /* Handle old journal files during transition.  This gives them the
2553    * correct sort order.  TODO: remove after first release
2554    */
2555   {
2556     char old_path[PATH_MAX+1];
2557     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2558     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2559     rename(old_path, path);
2560
2561     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2562     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2563     rename(old_path, path);
2564   }
2565
2566   dir = opendir(journal_dir);
2567   if (!dir) {
2568     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2569     return;
2570   }
2571   while ((dent = readdir(dir)) != NULL)
2572   {
2573     /* looks like a journal file? */
2574     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2575       continue;
2576
2577     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2578
2579     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2580     {
2581       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2582                dent->d_name);
2583       break;
2584     }
2585   }
2586   closedir(dir);
2587
2588   qsort(journal_cur->files, journal_cur->files_num,
2589         sizeof(journal_cur->files[0]), journal_sort);
2590
2591   for (uint i=0; i < journal_cur->files_num; i++)
2592     had_journal += journal_replay(journal_cur->files[i]);
2593
2594   journal_new_file();
2595
2596   /* it must have been a crash.  start a flush */
2597   if (had_journal && config_flush_at_shutdown)
2598     flush_old_values(-1);
2599
2600   pthread_mutex_unlock(&journal_lock);
2601
2602   RRDD_LOG(LOG_INFO, "journal processing complete");
2603
2604 } /* }}} static void journal_init */
2605
2606 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2607 {
2608   assert(sock != NULL);
2609
2610   free(sock->rbuf);  sock->rbuf = NULL;
2611   free(sock->wbuf);  sock->wbuf = NULL;
2612   free(sock);
2613 } /* }}} void free_listen_socket */
2614
2615 static void close_connection(listen_socket_t *sock) /* {{{ */
2616 {
2617   if (sock->fd >= 0)
2618   {
2619     close(sock->fd);
2620     sock->fd = -1;
2621   }
2622
2623   free_listen_socket(sock);
2624
2625 } /* }}} void close_connection */
2626
2627 static void *connection_thread_main (void *args) /* {{{ */
2628 {
2629   listen_socket_t *sock;
2630   int fd;
2631
2632   sock = (listen_socket_t *) args;
2633   fd = sock->fd;
2634
2635   /* init read buffers */
2636   sock->next_read = sock->next_cmd = 0;
2637   sock->rbuf = malloc(RBUF_SIZE);
2638   if (sock->rbuf == NULL)
2639   {
2640     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2641     close_connection(sock);
2642     return NULL;
2643   }
2644
2645   pthread_mutex_lock (&connection_threads_lock);
2646 #ifdef HAVE_LIBWRAP
2647   /* LIBWRAP does not support multiple threads! By putting this code
2648      inside pthread_mutex_lock we do not have to worry about request_info
2649      getting overwritten by another thread.
2650   */
2651   struct request_info req;
2652   request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2653   fromhost(&req);
2654   if(!hosts_access(&req)) {
2655     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2656     pthread_mutex_unlock (&connection_threads_lock);
2657     close_connection(sock);
2658     return NULL;
2659   }
2660 #endif /* HAVE_LIBWRAP */
2661   connection_threads_num++;
2662   pthread_mutex_unlock (&connection_threads_lock);
2663
2664   while (state == RUNNING)
2665   {
2666     char *cmd;
2667     ssize_t cmd_len;
2668     ssize_t rbytes;
2669     time_t now;
2670
2671     struct pollfd pollfd;
2672     int status;
2673
2674     pollfd.fd = fd;
2675     pollfd.events = POLLIN | POLLPRI;
2676     pollfd.revents = 0;
2677
2678     status = poll (&pollfd, 1, /* timeout = */ 500);
2679     if (state != RUNNING)
2680       break;
2681     else if (status == 0) /* timeout */
2682       continue;
2683     else if (status < 0) /* error */
2684     {
2685       status = errno;
2686       if (status != EINTR)
2687         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2688       continue;
2689     }
2690
2691     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2692       break;
2693     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2694     {
2695       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2696           "poll(2) returned something unexpected: %#04hx",
2697           pollfd.revents);
2698       break;
2699     }
2700
2701     rbytes = read(fd, sock->rbuf + sock->next_read,
2702                   RBUF_SIZE - sock->next_read);
2703     if (rbytes < 0)
2704     {
2705       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2706       break;
2707     }
2708     else if (rbytes == 0)
2709       break; /* eof */
2710
2711     sock->next_read += rbytes;
2712
2713     if (sock->batch_start)
2714       now = sock->batch_start;
2715     else
2716       now = time(NULL);
2717
2718     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2719     {
2720       status = handle_request (sock, now, cmd, cmd_len+1);
2721       if (status != 0)
2722         goto out_close;
2723     }
2724   }
2725
2726 out_close:
2727   close_connection(sock);
2728
2729   /* Remove this thread from the connection threads list */
2730   pthread_mutex_lock (&connection_threads_lock);
2731   connection_threads_num--;
2732   if (connection_threads_num <= 0)
2733     pthread_cond_broadcast(&connection_threads_done);
2734   pthread_mutex_unlock (&connection_threads_lock);
2735
2736   return (NULL);
2737 } /* }}} void *connection_thread_main */
2738
2739 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2740 {
2741   int fd;
2742   struct sockaddr_un sa;
2743   listen_socket_t *temp;
2744   int status;
2745   const char *path;
2746   char *path_copy, *dir;
2747
2748   path = sock->addr;
2749   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2750     path += strlen("unix:");
2751
2752   /* dirname may modify its argument */
2753   path_copy = strdup(path);
2754   if (path_copy == NULL)
2755   {
2756     fprintf(stderr, "rrdcached: strdup(): %s\n",
2757         rrd_strerror(errno));
2758     return (-1);
2759   }
2760
2761   dir = dirname(path_copy);
2762   if (rrd_mkdir_p(dir, 0777) != 0)
2763   {
2764     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2765         dir, rrd_strerror(errno));
2766     return (-1);
2767   }
2768
2769   free(path_copy);
2770
2771   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2772       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2773   if (temp == NULL)
2774   {
2775     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2776     return (-1);
2777   }
2778   listen_fds = temp;
2779   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2780
2781   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2782   if (fd < 0)
2783   {
2784     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2785              rrd_strerror(errno));
2786     return (-1);
2787   }
2788
2789   memset (&sa, 0, sizeof (sa));
2790   sa.sun_family = AF_UNIX;
2791   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2792
2793   /* if we've gotten this far, we own the pid file.  any daemon started
2794    * with the same args must not be alive.  therefore, ensure that we can
2795    * create the socket...
2796    */
2797   unlink(path);
2798
2799   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2800   if (status != 0)
2801   {
2802     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2803              path, rrd_strerror(errno));
2804     close (fd);
2805     return (-1);
2806   }
2807
2808   /* tweak the sockets group ownership */
2809   if (sock->socket_group != (gid_t)-1)
2810   {
2811     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2812          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2813     {
2814       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2815     }
2816   }
2817
2818   if (sock->socket_permissions != (mode_t)-1)
2819   {
2820     if (chmod(path, sock->socket_permissions) != 0)
2821       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2822           (unsigned int)sock->socket_permissions, strerror(errno));
2823   }
2824
2825   status = listen (fd, /* backlog = */ 10);
2826   if (status != 0)
2827   {
2828     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2829              path, rrd_strerror(errno));
2830     close (fd);
2831     unlink (path);
2832     return (-1);
2833   }
2834
2835   listen_fds[listen_fds_num].fd = fd;
2836   listen_fds[listen_fds_num].family = PF_UNIX;
2837   strncpy(listen_fds[listen_fds_num].addr, path,
2838           sizeof (listen_fds[listen_fds_num].addr) - 1);
2839   listen_fds_num++;
2840
2841   return (0);
2842 } /* }}} int open_listen_socket_unix */
2843
2844 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2845 {
2846   struct addrinfo ai_hints;
2847   struct addrinfo *ai_res;
2848   struct addrinfo *ai_ptr;
2849   char addr_copy[NI_MAXHOST];
2850   char *addr;
2851   char *port;
2852   int status;
2853
2854   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2855   addr_copy[sizeof (addr_copy) - 1] = 0;
2856   addr = addr_copy;
2857
2858   memset (&ai_hints, 0, sizeof (ai_hints));
2859   ai_hints.ai_flags = 0;
2860 #ifdef AI_ADDRCONFIG
2861   ai_hints.ai_flags |= AI_ADDRCONFIG;
2862 #endif
2863   ai_hints.ai_family = AF_UNSPEC;
2864   ai_hints.ai_socktype = SOCK_STREAM;
2865
2866   port = NULL;
2867   if (*addr == '[') /* IPv6+port format */
2868   {
2869     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2870     addr++;
2871
2872     port = strchr (addr, ']');
2873     if (port == NULL)
2874     {
2875       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2876       return (-1);
2877     }
2878     *port = 0;
2879     port++;
2880
2881     if (*port == ':')
2882       port++;
2883     else if (*port == 0)
2884       port = NULL;
2885     else
2886     {
2887       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2888       return (-1);
2889     }
2890   } /* if (*addr == '[') */
2891   else
2892   {
2893     port = rindex(addr, ':');
2894     if (port != NULL)
2895     {
2896       *port = 0;
2897       port++;
2898     }
2899   }
2900   ai_res = NULL;
2901   status = getaddrinfo (addr,
2902                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2903                         &ai_hints, &ai_res);
2904   if (status != 0)
2905   {
2906     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2907              addr, gai_strerror (status));
2908     return (-1);
2909   }
2910
2911   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2912   {
2913     int fd;
2914     listen_socket_t *temp;
2915     int one = 1;
2916
2917     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2918         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2919     if (temp == NULL)
2920     {
2921       fprintf (stderr,
2922                "rrdcached: open_listen_socket_network: realloc failed.\n");
2923       continue;
2924     }
2925     listen_fds = temp;
2926     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2927
2928     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2929     if (fd < 0)
2930     {
2931       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2932                rrd_strerror(errno));
2933       continue;
2934     }
2935
2936     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2937
2938     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2939     if (status != 0)
2940     {
2941       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2942                sock->addr, rrd_strerror(errno));
2943       close (fd);
2944       continue;
2945     }
2946
2947     status = listen (fd, /* backlog = */ 10);
2948     if (status != 0)
2949     {
2950       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2951                sock->addr, rrd_strerror(errno));
2952       close (fd);
2953       freeaddrinfo(ai_res);
2954       return (-1);
2955     }
2956
2957     listen_fds[listen_fds_num].fd = fd;
2958     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2959     listen_fds_num++;
2960   } /* for (ai_ptr) */
2961
2962   freeaddrinfo(ai_res);
2963   return (0);
2964 } /* }}} static int open_listen_socket_network */
2965
2966 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2967 {
2968   assert(sock != NULL);
2969   assert(sock->addr != NULL);
2970
2971   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2972       || sock->addr[0] == '/')
2973     return (open_listen_socket_unix(sock));
2974   else
2975     return (open_listen_socket_network(sock));
2976 } /* }}} int open_listen_socket */
2977
2978 #ifndef SD_LISTEN_FDS_START
2979 #  define SD_LISTEN_FDS_START 3
2980 #endif
2981 /*
2982  * returns number of descriptors passed from systemd
2983  */
2984 static int open_listen_sockets_systemd(void) /* {{{ */
2985 {
2986   listen_socket_t *temp;
2987   struct sockaddr_un sa;
2988   socklen_t l;
2989   int sd_fd;
2990   const char *env;
2991   unsigned long n;
2992
2993   /* check if it for us */
2994   env = getenv("LISTEN_PID");
2995   if (!env)
2996     return 0;
2997
2998   n = strtoul(env, NULL, 10);
2999   if (!n || n == ULONG_MAX || (pid_t)n != getpid())
3000     return 0;
3001
3002   /* get the number of passed descriptors */
3003   env = getenv("LISTEN_FDS");
3004   if (!env)
3005     return 0;
3006
3007   n = strtoul(env, NULL, 10);
3008   if (!n || n == ULONG_MAX)
3009     return 0;
3010
3011   temp = (listen_socket_t *) rrd_realloc (listen_fds,
3012      sizeof (listen_fds[0]) * (listen_fds_num + n));
3013   if (temp == NULL)
3014   {
3015     fprintf (stderr, "rrdcached: open_listen_socket_systemd: realloc failed.\n");
3016     return 0;
3017   }
3018   listen_fds = temp;
3019
3020   for (unsigned int i = 0; i < n; i++)
3021   {
3022     sd_fd = SD_LISTEN_FDS_START + i;
3023
3024     l = sizeof(sa);
3025     memset(&sa, 0, l);
3026     if (getsockname(sd_fd, &sa, &l) < 0)
3027     {
3028       fprintf(stderr, "open_listen_sockets_systemd: problem getting fd %d: %s\n", sd_fd, rrd_strerror (errno));
3029       return i;
3030     }
3031
3032     listen_fds[listen_fds_num].fd = sd_fd;
3033     listen_fds[listen_fds_num].family = sa.sun_family;
3034     listen_fds_num++;
3035   }
3036   
3037   return n;
3038 } /* }}} open_listen_sockets_systemd */
3039
3040 static void open_listen_sockets_traditional(void) /* {{{ */
3041 {
3042  if (config_listen_address_list_len > 0)
3043   {
3044     for (size_t i = 0; i < config_listen_address_list_len; i++)
3045       open_listen_socket (config_listen_address_list[i]);
3046
3047     rrd_free_ptrs((void ***) &config_listen_address_list,
3048                   &config_listen_address_list_len);
3049   }
3050   else
3051   {
3052     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3053         sizeof(default_socket.addr) - 1);
3054     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3055
3056     if (default_socket.permissions == 0)
3057       socket_permission_set_all (&default_socket);
3058
3059     open_listen_socket (&default_socket);
3060   }
3061 } /* }}} open_list_sockets_traditional */
3062
3063 static int close_listen_sockets (void) /* {{{ */
3064 {
3065   size_t i;
3066
3067   for (i = 0; i < listen_fds_num; i++)
3068   {
3069     close (listen_fds[i].fd);
3070
3071     if (listen_fds[i].family == PF_UNIX)
3072       unlink(listen_fds[i].addr);
3073   }
3074
3075   free (listen_fds);
3076   listen_fds = NULL;
3077   listen_fds_num = 0;
3078
3079   return (0);
3080 } /* }}} int close_listen_sockets */
3081
3082 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3083 {
3084   struct pollfd *pollfds;
3085   int pollfds_num;
3086   int status;
3087   int i;
3088
3089   if (listen_fds_num < 1)
3090   {
3091     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3092     return (NULL);
3093   }
3094
3095   pollfds_num = listen_fds_num;
3096   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3097   if (pollfds == NULL)
3098   {
3099     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3100     return (NULL);
3101   }
3102   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3103
3104   RRDD_LOG(LOG_INFO, "listening for connections");
3105
3106   while (state == RUNNING)
3107   {
3108     for (i = 0; i < pollfds_num; i++)
3109     {
3110       pollfds[i].fd = listen_fds[i].fd;
3111       pollfds[i].events = POLLIN | POLLPRI;
3112       pollfds[i].revents = 0;
3113     }
3114
3115     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3116     if (state != RUNNING)
3117       break;
3118     else if (status == 0) /* timeout */
3119       continue;
3120     else if (status < 0) /* error */
3121     {
3122       status = errno;
3123       if (status != EINTR)
3124       {
3125         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3126       }
3127       continue;
3128     }
3129
3130     for (i = 0; i < pollfds_num; i++)
3131     {
3132       listen_socket_t *client_sock;
3133       struct sockaddr_storage client_sa;
3134       socklen_t client_sa_size;
3135       pthread_t tid;
3136       pthread_attr_t attr;
3137
3138       if (pollfds[i].revents == 0)
3139         continue;
3140
3141       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3142       {
3143         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3144             "poll(2) returned something unexpected for listen FD #%i.",
3145             pollfds[i].fd);
3146         continue;
3147       }
3148
3149       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3150       if (client_sock == NULL)
3151       {
3152         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3153         continue;
3154       }
3155       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3156
3157       client_sa_size = sizeof (client_sa);
3158       client_sock->fd = accept (pollfds[i].fd,
3159           (struct sockaddr *) &client_sa, &client_sa_size);
3160       if (client_sock->fd < 0)
3161       {
3162         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3163         free(client_sock);
3164         continue;
3165       }
3166
3167       pthread_attr_init (&attr);
3168       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3169
3170       status = pthread_create (&tid, &attr, connection_thread_main,
3171                                client_sock);
3172       if (status != 0)
3173       {
3174         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3175         close_connection(client_sock);
3176         continue;
3177       }
3178     } /* for (pollfds_num) */
3179   } /* while (state == RUNNING) */
3180
3181   RRDD_LOG(LOG_INFO, "starting shutdown");
3182
3183   close_listen_sockets ();
3184
3185   pthread_mutex_lock (&connection_threads_lock);
3186   while (connection_threads_num > 0)
3187     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3188   pthread_mutex_unlock (&connection_threads_lock);
3189
3190   free(pollfds);
3191
3192   return (NULL);
3193 } /* }}} void *listen_thread_main */
3194
3195 static int daemonize (void) /* {{{ */
3196 {
3197   int pid_fd;
3198   char *base_dir;
3199
3200   daemon_uid = geteuid();
3201
3202   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3203   if (pid_fd < 0)
3204     pid_fd = check_pidfile();
3205   if (pid_fd < 0)
3206     return pid_fd;
3207
3208   /* gather sockets passed from systemd; 
3209    * if none, open all the listen sockets from config or default  */ 
3210
3211   if (!(open_listen_sockets_systemd() > 0))
3212     open_listen_sockets_traditional();
3213
3214   if (listen_fds_num < 1)
3215   {
3216     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3217     goto error;
3218   }
3219
3220   if (!stay_foreground)
3221   {
3222     pid_t child;
3223
3224     child = fork ();
3225     if (child < 0)
3226     {
3227       fprintf (stderr, "daemonize: fork(2) failed.\n");
3228       goto error;
3229     }
3230     else if (child > 0)
3231       exit(0);
3232
3233     /* Become session leader */
3234     setsid ();
3235
3236     /* Open the first three file descriptors to /dev/null */
3237     close (2);
3238     close (1);
3239     close (0);
3240
3241     open ("/dev/null", O_RDWR);
3242     if (dup(0) == -1 || dup(0) == -1){
3243         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3244     }
3245   } /* if (!stay_foreground) */
3246
3247   /* Change into the /tmp directory. */
3248   base_dir = (config_base_dir != NULL)
3249     ? config_base_dir
3250     : "/tmp";
3251
3252   if (chdir (base_dir) != 0)
3253   {
3254     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3255     goto error;
3256   }
3257
3258   install_signal_handlers();
3259
3260   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3261   RRDD_LOG(LOG_INFO, "starting up");
3262
3263   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3264                                 (GDestroyNotify) free_cache_item);
3265   if (cache_tree == NULL)
3266   {
3267     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3268     goto error;
3269   }
3270
3271   return write_pidfile (pid_fd);
3272
3273 error:
3274   remove_pidfile();
3275   return -1;
3276 } /* }}} int daemonize */
3277
3278 static int cleanup (void) /* {{{ */
3279 {
3280   pthread_cond_broadcast (&flush_cond);
3281   pthread_join (flush_thread, NULL);
3282
3283   pthread_cond_broadcast (&queue_cond);
3284   for (int i = 0; i < config_queue_threads; i++)
3285     pthread_join (queue_threads[i], NULL);
3286
3287   if (config_flush_at_shutdown)
3288   {
3289     assert(cache_queue_head == NULL);
3290     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3291   }
3292
3293   free(queue_threads);
3294   free(config_base_dir);
3295
3296   pthread_mutex_lock(&cache_lock);
3297   g_tree_destroy(cache_tree);
3298
3299   pthread_mutex_lock(&journal_lock);
3300   journal_done();
3301
3302   RRDD_LOG(LOG_INFO, "goodbye");
3303   closelog ();
3304
3305   remove_pidfile ();
3306   free(config_pid_file);
3307
3308   return (0);
3309 } /* }}} int cleanup */
3310
3311 static int read_options (int argc, char **argv) /* {{{ */
3312 {
3313   int option;
3314   int status = 0;
3315
3316   socket_permission_clear (&default_socket);
3317
3318   default_socket.socket_group = (gid_t)-1;
3319   default_socket.socket_permissions = (mode_t)-1;
3320
3321   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3322   {
3323     switch (option)
3324     {
3325       case 'O':
3326         opt_no_overwrite = 1;
3327         break;
3328
3329       case 'g':
3330         stay_foreground=1;
3331         break;
3332
3333       case 'l':
3334       {
3335         listen_socket_t *new;
3336
3337         new = malloc(sizeof(listen_socket_t));
3338         if (new == NULL)
3339         {
3340           fprintf(stderr, "read_options: malloc failed.\n");
3341           return(2);
3342         }
3343         memset(new, 0, sizeof(listen_socket_t));
3344
3345         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3346
3347         /* Add permissions to the socket {{{ */
3348         if (default_socket.permissions != 0)
3349         {
3350           socket_permission_copy (new, &default_socket);
3351         }
3352         else /* if (default_socket.permissions == 0) */
3353         {
3354           /* Add permission for ALL commands to the socket. */
3355           socket_permission_set_all (new);
3356         }
3357         /* }}} Done adding permissions. */
3358
3359         new->socket_group = default_socket.socket_group;
3360         new->socket_permissions = default_socket.socket_permissions;
3361
3362         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3363                          &config_listen_address_list_len, new))
3364         {
3365           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3366           return (2);
3367         }
3368       }
3369       break;
3370
3371       /* set socket group permissions */
3372       case 's':
3373       {
3374         gid_t group_gid;
3375         struct group *grp;
3376
3377         group_gid = strtoul(optarg, NULL, 10);
3378         if (errno != EINVAL && group_gid>0)
3379         {
3380           /* we were passed a number */
3381           grp = getgrgid(group_gid);
3382         }
3383         else
3384         {
3385           grp = getgrnam(optarg);
3386         }
3387
3388         if (grp)
3389         {
3390           default_socket.socket_group = grp->gr_gid;
3391         }
3392         else
3393         {
3394           /* no idea what the user wanted... */
3395           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3396           return (5);
3397         }
3398       }
3399       break;
3400
3401       /* set socket file permissions */
3402       case 'm':
3403       {
3404         long  tmp;
3405         char *endptr = NULL;
3406
3407         tmp = strtol (optarg, &endptr, 8);
3408         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3409             || (tmp > 07777) || (tmp < 0)) {
3410           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3411               optarg);
3412           return (5);
3413         }
3414
3415         default_socket.socket_permissions = (mode_t)tmp;
3416       }
3417       break;
3418
3419       case 'P':
3420       {
3421         char *optcopy;
3422         char *saveptr;
3423         char *dummy;
3424         char *ptr;
3425
3426         socket_permission_clear (&default_socket);
3427
3428         optcopy = strdup (optarg);
3429         dummy = optcopy;
3430         saveptr = NULL;
3431         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3432         {
3433           dummy = NULL;
3434           status = socket_permission_add (&default_socket, ptr);
3435           if (status != 0)
3436           {
3437             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3438                 "socket failed. Most likely, this permission doesn't "
3439                 "exist. Check your command line.\n", ptr);
3440             status = 4;
3441           }
3442         }
3443
3444         free (optcopy);
3445       }
3446       break;
3447
3448       case 'f':
3449       {
3450         int temp;
3451
3452         temp = atoi (optarg);
3453         if (temp > 0)
3454           config_flush_interval = temp;
3455         else
3456         {
3457           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3458           status = 3;
3459         }
3460       }
3461       break;
3462
3463       case 'w':
3464       {
3465         int temp;
3466
3467         temp = atoi (optarg);
3468         if (temp > 0)
3469           config_write_interval = temp;
3470         else
3471         {
3472           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3473           status = 2;
3474         }
3475       }
3476       break;
3477
3478       case 'z':
3479       {
3480         int temp;
3481
3482         temp = atoi(optarg);
3483         if (temp > 0)
3484           config_write_jitter = temp;
3485         else
3486         {
3487           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3488           status = 2;
3489         }
3490
3491         break;
3492       }
3493
3494       case 't':
3495       {
3496         int threads;
3497         threads = atoi(optarg);
3498         if (threads >= 1)
3499           config_queue_threads = threads;
3500         else
3501         {
3502           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3503           return 1;
3504         }
3505       }
3506       break;
3507
3508       case 'B':
3509         config_write_base_only = 1;
3510         break;
3511
3512       case 'b':
3513       {
3514         size_t len;
3515         char base_realpath[PATH_MAX];
3516
3517         if (config_base_dir != NULL)
3518           free (config_base_dir);
3519         config_base_dir = strdup (optarg);
3520         if (config_base_dir == NULL)
3521         {
3522           fprintf (stderr, "read_options: strdup failed.\n");
3523           return (3);
3524         }
3525
3526         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3527         {
3528           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3529               config_base_dir, rrd_strerror (errno));
3530           return (3);
3531         }
3532
3533         /* make sure that the base directory is not resolved via
3534          * symbolic links.  this makes some performance-enhancing
3535          * assumptions possible (we don't have to resolve paths
3536          * that start with a "/")
3537          */
3538         if (realpath(config_base_dir, base_realpath) == NULL)
3539         {
3540           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3541               "%s\n", config_base_dir, rrd_strerror(errno));
3542           return 5;
3543         }
3544
3545         len = strlen (config_base_dir);
3546         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3547         {
3548           config_base_dir[len - 1] = 0;
3549           len--;
3550         }
3551
3552         if (len < 1)
3553         {
3554           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3555           return (4);
3556         }
3557
3558         _config_base_dir_len = len;
3559
3560         len = strlen (base_realpath);
3561         while ((len > 0) && (base_realpath[len - 1] == '/'))
3562         {
3563           base_realpath[len - 1] = '\0';
3564           len--;
3565         }
3566
3567         if (strncmp(config_base_dir,
3568                          base_realpath, sizeof(base_realpath)) != 0)
3569         {
3570           fprintf(stderr,
3571                   "Base directory (-b) resolved via file system links!\n"
3572                   "Please consult rrdcached '-b' documentation!\n"
3573                   "Consider specifying the real directory (%s)\n",
3574                   base_realpath);
3575           return 5;
3576         }
3577       }
3578       break;
3579
3580       case 'p':
3581       {
3582         if (config_pid_file != NULL)
3583           free (config_pid_file);
3584         config_pid_file = strdup (optarg);
3585         if (config_pid_file == NULL)
3586         {
3587           fprintf (stderr, "read_options: strdup failed.\n");
3588           return (3);
3589         }
3590       }
3591       break;
3592
3593       case 'F':
3594         config_flush_at_shutdown = 1;
3595         break;
3596
3597       case 'j':
3598       {
3599         char journal_dir_actual[PATH_MAX];
3600         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3601         if (journal_dir)
3602         {
3603           // if we were able to properly resolve the path, lets have a copy
3604           // for use outside this block.
3605           journal_dir = strdup(journal_dir);           
3606           status = rrd_mkdir_p(journal_dir, 0777);
3607           if (status != 0)
3608           {
3609             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3610                     journal_dir, rrd_strerror(errno));
3611             return 6;
3612           }
3613           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3614           {
3615             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3616                     errno ? rrd_strerror(errno) : "");
3617             return 6;
3618           }
3619         } else {
3620           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3621                   errno ? rrd_strerror(errno) : "");
3622           return 6;
3623         }
3624       }
3625       break;
3626
3627       case 'a':
3628       {
3629         int temp = atoi(optarg);
3630         if (temp > 0)
3631           config_alloc_chunk = temp;
3632         else
3633         {
3634           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3635           return 10;
3636         }
3637       }
3638       break;
3639
3640       case 'h':
3641       case '?':
3642         printf ("RRDCacheD %s\n"
3643             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3644             "\n"
3645             "Usage: rrdcached [options]\n"
3646             "\n"
3647             "Valid options are:\n"
3648             "  -l <address>  Socket address to listen to.\n"
3649             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3650             "  -P <perms>    Sets the permissions to assign to all following "
3651                             "sockets\n"
3652             "  -w <seconds>  Interval in which to write data.\n"
3653             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3654             "  -t <threads>  Number of write threads.\n"
3655             "  -f <seconds>  Interval in which to flush dead data.\n"
3656             "  -p <file>     Location of the PID-file.\n"
3657             "  -b <dir>      Base directory to change to.\n"
3658             "  -B            Restrict file access to paths within -b <dir>\n"
3659             "  -g            Do not fork and run in the foreground.\n"
3660             "  -j <dir>      Directory in which to create the journal files.\n"
3661             "  -F            Always flush all updates at shutdown\n"
3662             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3663             "                (the socket will also have read/write permissions "
3664                             "for that group)\n"
3665             "  -m <mode>     File permissions (octal) of all following UNIX "
3666                             "sockets\n"
3667             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3668             "  -O            Do not allow CREATE commands to overwrite existing\n"
3669             "                files, even if asked to.\n"
3670             "\n"
3671             "For more information and a detailed description of all options "
3672             "please refer\n"
3673             "to the rrdcached(1) manual page.\n",
3674             VERSION);
3675         if (option == 'h')
3676           status = -1;
3677         else
3678           status = 1;
3679         break;
3680     } /* switch (option) */
3681   } /* while (getopt) */
3682
3683   /* advise the user when values are not sane */
3684   if (config_flush_interval < 2 * config_write_interval)
3685     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3686             " 2x write interval (-w) !\n");
3687   if (config_write_jitter > config_write_interval)
3688     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3689             " write interval (-w) !\n");
3690
3691   if (config_write_base_only && config_base_dir == NULL)
3692     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3693             "  Consult the rrdcached documentation\n");
3694
3695   if (journal_dir == NULL)
3696     config_flush_at_shutdown = 1;
3697
3698   return (status);
3699 } /* }}} int read_options */
3700
3701 int main (int argc, char **argv)
3702 {
3703   int status;
3704
3705   status = read_options (argc, argv);
3706   if (status != 0)
3707   {
3708     if (status < 0)
3709       status = 0;
3710     return (status);
3711   }
3712
3713   status = daemonize ();
3714   if (status != 0)
3715   {
3716     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3717     return (1);
3718   }
3719
3720   journal_init();
3721
3722   /* start the queue threads */
3723   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3724   if (queue_threads == NULL)
3725   {
3726     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3727     cleanup();
3728     return (1);
3729   }
3730   for (int i = 0; i < config_queue_threads; i++)
3731   {
3732     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3733     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3734     if (status != 0)
3735     {
3736       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3737       cleanup();
3738       return (1);
3739     }
3740   }
3741
3742   /* start the flush thread */
3743   memset(&flush_thread, 0, sizeof(flush_thread));
3744   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3745   if (status != 0)
3746   {
3747     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3748     cleanup();
3749     return (1);
3750   }
3751
3752   listen_thread_main (NULL);
3753   cleanup ();
3754
3755   return (0);
3756 } /* int main */
3757
3758 /*
3759  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3760  */