Ensure that response_read() always calls fflush() or fclose().
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #include <glib-2.0/glib.h>
113 /* }}} */
114
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) { \
118       fprintf(stderr, __VA_ARGS__); \
119       fprintf(stderr, "\n"); } \
120     syslog ((severity), __VA_ARGS__); \
121   } while (0)
122
123 /*
124  * Types
125  */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127
128 struct listen_socket_s
129 {
130   int fd;
131   char addr[PATH_MAX + 1];
132   int family;
133
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
137
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
142
143   char *wbuf;
144   ssize_t wbuf_len;
145
146   uint32_t permissions;
147
148   gid_t  socket_group;
149   mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
152
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
157                         time_t UNUSED(now),\
158                         char  UNUSED(*buffer),\
159                         size_t UNUSED(buffer_size)
160
161 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
162                         DISPATCH_PROTO
163
164 struct command_s {
165   char   *cmd;
166   int (*handler)(HANDLER_PROTO);
167
168   char  context;                /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT      (1<<0)
170 #define CMD_CONTEXT_BATCH       (1<<1)
171 #define CMD_CONTEXT_JOURNAL     (1<<2)
172 #define CMD_CONTEXT_ANY         (0x7f)
173
174   char *syntax;
175   char *help;
176 };
177
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182   char *file;
183   char **values;
184   size_t values_num;            /* number of valid pointers */
185   size_t values_alloc;          /* number of allocated pointers */
186   time_t last_flush_time;
187   time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE  (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190   int flags;
191   pthread_cond_t  flushed;
192   cache_item_t *prev;
193   cache_item_t *next;
194 };
195
196 struct callback_flush_data_s
197 {
198   time_t now;
199   time_t abs_timeout;
200   char **keys;
201   size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
204
205 enum queue_side_e
206 {
207   HEAD,
208   TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
211
212 /* describe a set of journal files */
213 typedef struct {
214   char **files;
215   size_t files_num;
216 } journal_set;
217
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
221
222 /*
223  * Variables
224  */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
227
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
230
231 static listen_socket_t default_socket;
232
233 enum {
234   RUNNING,              /* normal operation */
235   FLUSHING,             /* flushing remaining values */
236   SHUTDOWN              /* shutting down */
237 } state = RUNNING;
238
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
242
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
249
250 /* Cache stuff */
251 static GTree          *cache_tree = NULL;
252 static cache_item_t   *cache_queue_head = NULL;
253 static cache_item_t   *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255
256 static int config_write_interval = 300;
257 static int config_write_jitter   = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
265
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
268
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
277
278 static int opt_no_overwrite = 0;
279
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL;         /* current journal file handle */
287 static long  journal_size = 0;          /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
293
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
296
297 /* 
298  * Functions
299  */
300 static void sig_common (const char *sig) /* {{{ */
301 {
302   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303   state = FLUSHING;
304   pthread_cond_broadcast(&flush_cond);
305   pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
307
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
309 {
310   sig_common("INT");
311 } /* }}} void sig_int_handler */
312
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
314 {
315   sig_common("TERM");
316 } /* }}} void sig_term_handler */
317
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
319 {
320   config_flush_at_shutdown = 1;
321   sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
323
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
325 {
326   config_flush_at_shutdown = 0;
327   sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
329
330 static void install_signal_handlers(void) /* {{{ */
331 {
332   /* These structures are static, because `sigaction' behaves weird if the are
333    * overwritten.. */
334   static struct sigaction sa_int;
335   static struct sigaction sa_term;
336   static struct sigaction sa_pipe;
337   static struct sigaction sa_usr1;
338   static struct sigaction sa_usr2;
339
340   /* Install signal handlers */
341   memset (&sa_int, 0, sizeof (sa_int));
342   sa_int.sa_handler = sig_int_handler;
343   sigaction (SIGINT, &sa_int, NULL);
344
345   memset (&sa_term, 0, sizeof (sa_term));
346   sa_term.sa_handler = sig_term_handler;
347   sigaction (SIGTERM, &sa_term, NULL);
348
349   memset (&sa_pipe, 0, sizeof (sa_pipe));
350   sa_pipe.sa_handler = SIG_IGN;
351   sigaction (SIGPIPE, &sa_pipe, NULL);
352
353   memset (&sa_pipe, 0, sizeof (sa_usr1));
354   sa_usr1.sa_handler = sig_usr1_handler;
355   sigaction (SIGUSR1, &sa_usr1, NULL);
356
357   memset (&sa_usr2, 0, sizeof (sa_usr2));
358   sa_usr2.sa_handler = sig_usr2_handler;
359   sigaction (SIGUSR2, &sa_usr2, NULL);
360
361 } /* }}} void install_signal_handlers */
362
363 static int open_pidfile(char *action, int oflag) /* {{{ */
364 {
365   int fd;
366   const char *file;
367   char *file_copy, *dir;
368
369   file = (config_pid_file != NULL)
370     ? config_pid_file
371     : LOCALSTATEDIR "/run/rrdcached.pid";
372
373   /* dirname may modify its argument */
374   file_copy = strdup(file);
375   if (file_copy == NULL)
376   {
377     fprintf(stderr, "rrdcached: strdup(): %s\n",
378         rrd_strerror(errno));
379     return -1;
380   }
381
382   dir = dirname(file_copy);
383   if (rrd_mkdir_p(dir, 0777) != 0)
384   {
385     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386         dir, rrd_strerror(errno));
387     return -1;
388   }
389
390   free(file_copy);
391
392   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393   if (fd < 0)
394     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395             action, file, rrd_strerror(errno));
396
397   return(fd);
398 } /* }}} static int open_pidfile */
399
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
402 {
403   int pid_fd;
404   pid_t pid;
405   char pid_str[16];
406
407   pid_fd = open_pidfile("open", O_RDWR);
408   if (pid_fd < 0)
409     return pid_fd;
410
411   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412     return -1;
413
414   pid = atoi(pid_str);
415   if (pid <= 0)
416     return -1;
417
418   /* another running process that we can signal COULD be
419    * a competing rrdcached */
420   if (pid != getpid() && kill(pid, 0) == 0)
421   {
422     fprintf(stderr,
423             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424     close(pid_fd);
425     return -1;
426   }
427
428   lseek(pid_fd, 0, SEEK_SET);
429   if (ftruncate(pid_fd, 0) == -1)
430   {
431     fprintf(stderr,
432             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433     close(pid_fd);
434     return -1;
435   }
436
437   fprintf(stderr,
438           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439           "rrdcached: starting normally.\n", pid);
440
441   return pid_fd;
442 } /* }}} static int check_pidfile */
443
444 static int write_pidfile (int fd) /* {{{ */
445 {
446   pid_t pid;
447   FILE *fh;
448
449   pid = getpid ();
450
451   fh = fdopen (fd, "w");
452   if (fh == NULL)
453   {
454     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455     close(fd);
456     return (-1);
457   }
458
459   fprintf (fh, "%i\n", (int) pid);
460   fclose (fh);
461
462   return (0);
463 } /* }}} int write_pidfile */
464
465 static int remove_pidfile (void) /* {{{ */
466 {
467   char *file;
468   int status;
469
470   file = (config_pid_file != NULL)
471     ? config_pid_file
472     : LOCALSTATEDIR "/run/rrdcached.pid";
473
474   status = unlink (file);
475   if (status == 0)
476     return (0);
477   return (errno);
478 } /* }}} int remove_pidfile */
479
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
481 {
482   char *eol;
483
484   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485                sock->next_read - sock->next_cmd);
486
487   if (eol == NULL)
488   {
489     /* no commands left, move remainder back to front of rbuf */
490     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491             sock->next_read - sock->next_cmd);
492     sock->next_read -= sock->next_cmd;
493     sock->next_cmd = 0;
494     *len = 0;
495     return NULL;
496   }
497   else
498   {
499     char *cmd = sock->rbuf + sock->next_cmd;
500     *eol = '\0';
501
502     sock->next_cmd = eol - sock->rbuf + 1;
503
504     if (eol > sock->rbuf && *(eol-1) == '\r')
505       *(--eol) = '\0'; /* handle "\r\n" EOL */
506
507     *len = eol - cmd;
508
509     return cmd;
510   }
511
512   /* NOTREACHED */
513   assert(1==0);
514 } /* }}} char *next_cmd */
515
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
518 {
519   char *new_buf;
520
521   assert(sock != NULL);
522
523   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524   if (new_buf == NULL)
525   {
526     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527     return -1;
528   }
529
530   strncpy(new_buf + sock->wbuf_len, str, len + 1);
531
532   sock->wbuf = new_buf;
533   sock->wbuf_len += len;
534
535   return 0;
536 } /* }}} static int add_to_wbuf */
537
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
540 {
541   va_list argp;
542   char buffer[CMD_MAX];
543   int len;
544
545   if (JOURNAL_REPLAY(sock)) return 0;
546   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
547
548   va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552   len = vsprintf(buffer, fmt, argp);
553 #endif
554   va_end(argp);
555   if (len < 0)
556   {
557     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558     return -1;
559   }
560
561   return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
563
564 static int count_lines(char *str) /* {{{ */
565 {
566   int lines = 0;
567
568   if (str != NULL)
569   {
570     while ((str = strchr(str, '\n')) != NULL)
571     {
572       ++lines;
573       ++str;
574     }
575   }
576
577   return lines;
578 } /* }}} static int count_lines */
579
580 /* send the response back to the user.
581  * returns 0 on success, -1 on error
582  * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584                           char *fmt, ...) /* {{{ */
585 {
586   va_list argp;
587   char buffer[CMD_MAX];
588   int lines;
589   ssize_t wrote;
590   int rclen, len;
591
592   if (JOURNAL_REPLAY(sock)) return rc;
593
594   if (sock->batch_start)
595   {
596     if (rc == RESP_OK)
597       return rc; /* no response on success during BATCH */
598     lines = sock->batch_cmd;
599   }
600   else if (rc == RESP_OK)
601     lines = count_lines(sock->wbuf);
602   else
603     lines = -1;
604
605   rclen = sprintf(buffer, "%d ", lines);
606   va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610   len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612   va_end(argp);
613   if (len < 0)
614     return -1;
615
616   len += rclen;
617
618   /* append the result to the wbuf, don't write to the user */
619   if (sock->batch_start)
620     return add_to_wbuf(sock, buffer, len);
621
622   /* first write must be complete */
623   if (len != write(sock->fd, buffer, len))
624   {
625     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626     return -1;
627   }
628
629   if (sock->wbuf != NULL && rc == RESP_OK)
630   {
631     wrote = 0;
632     while (wrote < sock->wbuf_len)
633     {
634       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635       if (wb <= 0)
636       {
637         RRDD_LOG(LOG_INFO, "send_response: could not write results");
638         return -1;
639       }
640       wrote += wb;
641     }
642   }
643
644   free(sock->wbuf); sock->wbuf = NULL;
645   sock->wbuf_len = 0;
646
647   return 0;
648 } /* }}} */
649
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
651 {
652   ci->values = NULL;
653   ci->values_num = 0;
654   ci->values_alloc = 0;
655
656   ci->last_flush_time = when;
657   if (config_write_jitter > 0)
658     ci->last_flush_time += (rrd_random() % config_write_jitter);
659 }
660
661 /* remove_from_queue
662  * remove a "cache_item_t" item from the queue.
663  * must hold 'cache_lock' when calling this
664  */
665 static void remove_from_queue(cache_item_t *ci) /* {{{ */
666 {
667   if (ci == NULL) return;
668   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
669
670   if (ci->prev == NULL)
671     cache_queue_head = ci->next; /* reset head */
672   else
673     ci->prev->next = ci->next;
674
675   if (ci->next == NULL)
676     cache_queue_tail = ci->prev; /* reset the tail */
677   else
678     ci->next->prev = ci->prev;
679
680   ci->next = ci->prev = NULL;
681   ci->flags &= ~CI_FLAGS_IN_QUEUE;
682
683   pthread_mutex_lock (&stats_lock);
684   assert (stats_queue_length > 0);
685   stats_queue_length--;
686   pthread_mutex_unlock (&stats_lock);
687
688 } /* }}} static void remove_from_queue */
689
690 /* free the resources associated with the cache_item_t
691  * must hold cache_lock when calling this function
692  */
693 static void *free_cache_item(cache_item_t *ci) /* {{{ */
694 {
695   if (ci == NULL) return NULL;
696
697   remove_from_queue(ci);
698
699   for (size_t i=0; i < ci->values_num; i++)
700     free(ci->values[i]);
701
702   free (ci->values);
703   free (ci->file);
704
705   /* in case anyone is waiting */
706   pthread_cond_broadcast(&ci->flushed);
707   pthread_cond_destroy(&ci->flushed);
708
709   free (ci);
710
711   return NULL;
712 } /* }}} static void *free_cache_item */
713
714 /*
715  * enqueue_cache_item:
716  * `cache_lock' must be acquired before calling this function!
717  */
718 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
719     queue_side_t side)
720 {
721   if (ci == NULL)
722     return (-1);
723
724   if (ci->values_num == 0)
725     return (0);
726
727   if (side == HEAD)
728   {
729     if (cache_queue_head == ci)
730       return 0;
731
732     /* remove if further down in queue */
733     remove_from_queue(ci);
734
735     ci->prev = NULL;
736     ci->next = cache_queue_head;
737     if (ci->next != NULL)
738       ci->next->prev = ci;
739     cache_queue_head = ci;
740
741     if (cache_queue_tail == NULL)
742       cache_queue_tail = cache_queue_head;
743   }
744   else /* (side == TAIL) */
745   {
746     /* We don't move values back in the list.. */
747     if (ci->flags & CI_FLAGS_IN_QUEUE)
748       return (0);
749
750     assert (ci->next == NULL);
751     assert (ci->prev == NULL);
752
753     ci->prev = cache_queue_tail;
754
755     if (cache_queue_tail == NULL)
756       cache_queue_head = ci;
757     else
758       cache_queue_tail->next = ci;
759
760     cache_queue_tail = ci;
761   }
762
763   ci->flags |= CI_FLAGS_IN_QUEUE;
764
765   pthread_cond_signal(&queue_cond);
766   pthread_mutex_lock (&stats_lock);
767   stats_queue_length++;
768   pthread_mutex_unlock (&stats_lock);
769
770   return (0);
771 } /* }}} int enqueue_cache_item */
772
773 /*
774  * tree_callback_flush:
775  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
776  * while this is in progress.
777  */
778 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
779     gpointer data)
780 {
781   cache_item_t *ci;
782   callback_flush_data_t *cfd;
783
784   ci = (cache_item_t *) value;
785   cfd = (callback_flush_data_t *) data;
786
787   if (ci->flags & CI_FLAGS_IN_QUEUE)
788     return FALSE;
789
790   if (ci->values_num > 0
791       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
792   {
793     enqueue_cache_item (ci, TAIL);
794   }
795   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
796       && (ci->values_num <= 0))
797   {
798     assert ((char *) key == ci->file);
799     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
800     {
801       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
802       return (FALSE);
803     }
804   }
805
806   return (FALSE);
807 } /* }}} gboolean tree_callback_flush */
808
809 static int flush_old_values (int max_age)
810 {
811   callback_flush_data_t cfd;
812   size_t k;
813
814   memset (&cfd, 0, sizeof (cfd));
815   /* Pass the current time as user data so that we don't need to call
816    * `time' for each node. */
817   cfd.now = time (NULL);
818   cfd.keys = NULL;
819   cfd.keys_num = 0;
820
821   if (max_age > 0)
822     cfd.abs_timeout = cfd.now - max_age;
823   else
824     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
825
826   /* `tree_callback_flush' will return the keys of all values that haven't
827    * been touched in the last `config_flush_interval' seconds in `cfd'.
828    * The char*'s in this array point to the same memory as ci->file, so we
829    * don't need to free them separately. */
830   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
831
832   for (k = 0; k < cfd.keys_num; k++)
833   {
834     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
835     /* should never fail, since we have held the cache_lock
836      * the entire time */
837     assert(status == TRUE);
838   }
839
840   if (cfd.keys != NULL)
841   {
842     free (cfd.keys);
843     cfd.keys = NULL;
844   }
845
846   return (0);
847 } /* int flush_old_values */
848
849 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
850 {
851   struct timeval now;
852   struct timespec next_flush;
853   int status;
854
855   gettimeofday (&now, NULL);
856   next_flush.tv_sec = now.tv_sec + config_flush_interval;
857   next_flush.tv_nsec = 1000 * now.tv_usec;
858
859   pthread_mutex_lock(&cache_lock);
860
861   while (state == RUNNING)
862   {
863     gettimeofday (&now, NULL);
864     if ((now.tv_sec > next_flush.tv_sec)
865         || ((now.tv_sec == next_flush.tv_sec)
866           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
867     {
868       RRDD_LOG(LOG_DEBUG, "flushing old values");
869
870       /* Determine the time of the next cache flush. */
871       next_flush.tv_sec = now.tv_sec + config_flush_interval;
872
873       /* Flush all values that haven't been written in the last
874        * `config_write_interval' seconds. */
875       flush_old_values (config_write_interval);
876
877       /* unlock the cache while we rotate so we don't block incoming
878        * updates if the fsync() blocks on disk I/O */
879       pthread_mutex_unlock(&cache_lock);
880       journal_rotate();
881       pthread_mutex_lock(&cache_lock);
882     }
883
884     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
885     if (status != 0 && status != ETIMEDOUT)
886     {
887       RRDD_LOG (LOG_ERR, "flush_thread_main: "
888                 "pthread_cond_timedwait returned %i.", status);
889     }
890   }
891
892   if (config_flush_at_shutdown)
893     flush_old_values (-1); /* flush everything */
894
895   state = SHUTDOWN;
896
897   pthread_mutex_unlock(&cache_lock);
898
899   return NULL;
900 } /* void *flush_thread_main */
901
902 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
903 {
904   pthread_mutex_lock (&cache_lock);
905
906   while (state != SHUTDOWN
907          || (cache_queue_head != NULL && config_flush_at_shutdown))
908   {
909     cache_item_t *ci;
910     char *file;
911     char **values;
912     size_t values_num;
913     int status;
914
915     /* Now, check if there's something to store away. If not, wait until
916      * something comes in. */
917     if (cache_queue_head == NULL)
918     {
919       status = pthread_cond_wait (&queue_cond, &cache_lock);
920       if ((status != 0) && (status != ETIMEDOUT))
921       {
922         RRDD_LOG (LOG_ERR, "queue_thread_main: "
923             "pthread_cond_wait returned %i.", status);
924       }
925     }
926
927     /* Check if a value has arrived. This may be NULL if we timed out or there
928      * was an interrupt such as a signal. */
929     if (cache_queue_head == NULL)
930       continue;
931
932     ci = cache_queue_head;
933
934     /* copy the relevant parts */
935     file = strdup (ci->file);
936     if (file == NULL)
937     {
938       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
939       continue;
940     }
941
942     assert(ci->values != NULL);
943     assert(ci->values_num > 0);
944
945     values = ci->values;
946     values_num = ci->values_num;
947
948     wipe_ci_values(ci, time(NULL));
949     remove_from_queue(ci);
950
951     pthread_mutex_unlock (&cache_lock);
952
953     rrd_clear_error ();
954     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
955     if (status != 0)
956     {
957       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
958           "rrd_update_r (%s) failed with status %i. (%s)",
959           file, status, rrd_get_error());
960     }
961
962     journal_write("wrote", file);
963
964     /* Search again in the tree.  It's possible someone issued a "FORGET"
965      * while we were writing the update values. */
966     pthread_mutex_lock(&cache_lock);
967     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
968     if (ci)
969       pthread_cond_broadcast(&ci->flushed);
970     pthread_mutex_unlock(&cache_lock);
971
972     if (status == 0)
973     {
974       pthread_mutex_lock (&stats_lock);
975       stats_updates_written++;
976       stats_data_sets_written += values_num;
977       pthread_mutex_unlock (&stats_lock);
978     }
979
980     rrd_free_ptrs((void ***) &values, &values_num);
981     free(file);
982
983     pthread_mutex_lock (&cache_lock);
984   }
985   pthread_mutex_unlock (&cache_lock);
986
987   return (NULL);
988 } /* }}} void *queue_thread_main */
989
990 static int buffer_get_field (char **buffer_ret, /* {{{ */
991     size_t *buffer_size_ret, char **field_ret)
992 {
993   char *buffer;
994   size_t buffer_pos;
995   size_t buffer_size;
996   char *field;
997   size_t field_size;
998   int status;
999
1000   buffer = *buffer_ret;
1001   buffer_pos = 0;
1002   buffer_size = *buffer_size_ret;
1003   field = *buffer_ret;
1004   field_size = 0;
1005
1006   if (buffer_size <= 0)
1007     return (-1);
1008
1009   /* This is ensured by `handle_request'. */
1010   assert (buffer[buffer_size - 1] == '\0');
1011
1012   status = -1;
1013   while (buffer_pos < buffer_size)
1014   {
1015     /* Check for end-of-field or end-of-buffer */
1016     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1017     {
1018       field[field_size] = 0;
1019       field_size++;
1020       buffer_pos++;
1021       status = 0;
1022       break;
1023     }
1024     /* Handle escaped characters. */
1025     else if (buffer[buffer_pos] == '\\')
1026     {
1027       if (buffer_pos >= (buffer_size - 1))
1028         break;
1029       buffer_pos++;
1030       field[field_size] = buffer[buffer_pos];
1031       field_size++;
1032       buffer_pos++;
1033     }
1034     /* Normal operation */ 
1035     else
1036     {
1037       field[field_size] = buffer[buffer_pos];
1038       field_size++;
1039       buffer_pos++;
1040     }
1041   } /* while (buffer_pos < buffer_size) */
1042
1043   if (status != 0)
1044     return (status);
1045
1046   *buffer_ret = buffer + buffer_pos;
1047   *buffer_size_ret = buffer_size - buffer_pos;
1048   *field_ret = field;
1049
1050   return (0);
1051 } /* }}} int buffer_get_field */
1052
1053 /* if we're restricting writes to the base directory,
1054  * check whether the file falls within the dir
1055  * returns 1 if OK, otherwise 0
1056  */
1057 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1058 {
1059   assert(file != NULL);
1060
1061   if (!config_write_base_only
1062       || JOURNAL_REPLAY(sock)
1063       || config_base_dir == NULL)
1064     return 1;
1065
1066   if (strstr(file, "../") != NULL) goto err;
1067
1068   /* relative paths without "../" are ok */
1069   if (*file != '/') return 1;
1070
1071   /* file must be of the format base + "/" + <1+ char filename> */
1072   if (strlen(file) < _config_base_dir_len + 2) goto err;
1073   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1074   if (*(file + _config_base_dir_len) != '/') goto err;
1075
1076   return 1;
1077
1078 err:
1079   if (sock != NULL && sock->fd >= 0)
1080     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1081
1082   return 0;
1083 } /* }}} static int check_file_access */
1084
1085 /* when using a base dir, convert relative paths to absolute paths.
1086  * if necessary, modifies the "filename" pointer to point
1087  * to the new path created in "tmp".  "tmp" is provided
1088  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1089  *
1090  * this allows us to optimize for the expected case (absolute path)
1091  * with a no-op.
1092  */
1093 static void get_abs_path(char **filename, char *tmp)
1094 {
1095   assert(tmp != NULL);
1096   assert(filename != NULL && *filename != NULL);
1097
1098   if (config_base_dir == NULL || **filename == '/')
1099     return;
1100
1101   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1102   *filename = tmp;
1103 } /* }}} static int get_abs_path */
1104
1105 static int flush_file (const char *filename) /* {{{ */
1106 {
1107   cache_item_t *ci;
1108
1109   pthread_mutex_lock (&cache_lock);
1110
1111   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1112   if (ci == NULL)
1113   {
1114     pthread_mutex_unlock (&cache_lock);
1115     return (ENOENT);
1116   }
1117
1118   if (ci->values_num > 0)
1119   {
1120     /* Enqueue at head */
1121     enqueue_cache_item (ci, HEAD);
1122     pthread_cond_wait(&ci->flushed, &cache_lock);
1123   }
1124
1125   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1126    * may have been purged during our cond_wait() */
1127
1128   pthread_mutex_unlock(&cache_lock);
1129
1130   return (0);
1131 } /* }}} int flush_file */
1132
1133 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1134 {
1135   char *err = "Syntax error.\n";
1136
1137   if (cmd && cmd->syntax)
1138     err = cmd->syntax;
1139
1140   return send_response(sock, RESP_ERR, "Usage: %s", err);
1141 } /* }}} static int syntax_error() */
1142
1143 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1144 {
1145   uint64_t copy_queue_length;
1146   uint64_t copy_updates_received;
1147   uint64_t copy_flush_received;
1148   uint64_t copy_updates_written;
1149   uint64_t copy_data_sets_written;
1150   uint64_t copy_journal_bytes;
1151   uint64_t copy_journal_rotate;
1152
1153   uint64_t tree_nodes_number;
1154   uint64_t tree_depth;
1155
1156   pthread_mutex_lock (&stats_lock);
1157   copy_queue_length       = stats_queue_length;
1158   copy_updates_received   = stats_updates_received;
1159   copy_flush_received     = stats_flush_received;
1160   copy_updates_written    = stats_updates_written;
1161   copy_data_sets_written  = stats_data_sets_written;
1162   copy_journal_bytes      = stats_journal_bytes;
1163   copy_journal_rotate     = stats_journal_rotate;
1164   pthread_mutex_unlock (&stats_lock);
1165
1166   pthread_mutex_lock (&cache_lock);
1167   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1168   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1169   pthread_mutex_unlock (&cache_lock);
1170
1171   add_response_info(sock,
1172                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1173   add_response_info(sock,
1174                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1175   add_response_info(sock,
1176                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1177   add_response_info(sock,
1178                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1179   add_response_info(sock,
1180                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1181   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1182   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1183   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1184   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1185
1186   send_response(sock, RESP_OK, "Statistics follow\n");
1187
1188   return (0);
1189 } /* }}} int handle_request_stats */
1190
1191 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1192 {
1193   char *file, file_tmp[PATH_MAX];
1194   int status;
1195
1196   status = buffer_get_field (&buffer, &buffer_size, &file);
1197   if (status != 0)
1198   {
1199     return syntax_error(sock,cmd);
1200   }
1201   else
1202   {
1203     pthread_mutex_lock(&stats_lock);
1204     stats_flush_received++;
1205     pthread_mutex_unlock(&stats_lock);
1206
1207     get_abs_path(&file, file_tmp);
1208     if (!check_file_access(file, sock)) return 0;
1209
1210     status = flush_file (file);
1211     if (status == 0)
1212       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1213     else if (status == ENOENT)
1214     {
1215       /* no file in our tree; see whether it exists at all */
1216       struct stat statbuf;
1217
1218       memset(&statbuf, 0, sizeof(statbuf));
1219       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1220         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1221       else
1222         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1223     }
1224     else if (status < 0)
1225       return send_response(sock, RESP_ERR, "Internal error.\n");
1226     else
1227       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1228   }
1229
1230   /* NOTREACHED */
1231   assert(1==0);
1232 } /* }}} int handle_request_flush */
1233
1234 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1235 {
1236   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1237
1238   pthread_mutex_lock(&cache_lock);
1239   flush_old_values(-1);
1240   pthread_mutex_unlock(&cache_lock);
1241
1242   return send_response(sock, RESP_OK, "Started flush.\n");
1243 } /* }}} static int handle_request_flushall */
1244
1245 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1246 {
1247   int status;
1248   char *file, file_tmp[PATH_MAX];
1249   cache_item_t *ci;
1250
1251   status = buffer_get_field(&buffer, &buffer_size, &file);
1252   if (status != 0)
1253     return syntax_error(sock,cmd);
1254
1255   get_abs_path(&file, file_tmp);
1256
1257   pthread_mutex_lock(&cache_lock);
1258   ci = g_tree_lookup(cache_tree, file);
1259   if (ci == NULL)
1260   {
1261     pthread_mutex_unlock(&cache_lock);
1262     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1263   }
1264
1265   for (size_t i=0; i < ci->values_num; i++)
1266     add_response_info(sock, "%s\n", ci->values[i]);
1267
1268   pthread_mutex_unlock(&cache_lock);
1269   return send_response(sock, RESP_OK, "updates pending\n");
1270 } /* }}} static int handle_request_pending */
1271
1272 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1273 {
1274   int status;
1275   gboolean found;
1276   char *file, file_tmp[PATH_MAX];
1277
1278   status = buffer_get_field(&buffer, &buffer_size, &file);
1279   if (status != 0)
1280     return syntax_error(sock,cmd);
1281
1282   get_abs_path(&file, file_tmp);
1283   if (!check_file_access(file, sock)) return 0;
1284
1285   pthread_mutex_lock(&cache_lock);
1286   found = g_tree_remove(cache_tree, file);
1287   pthread_mutex_unlock(&cache_lock);
1288
1289   if (found == TRUE)
1290   {
1291     if (!JOURNAL_REPLAY(sock))
1292       journal_write("forget", file);
1293
1294     return send_response(sock, RESP_OK, "Gone!\n");
1295   }
1296   else
1297     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1298
1299   /* NOTREACHED */
1300   assert(1==0);
1301 } /* }}} static int handle_request_forget */
1302
1303 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1304 {
1305   cache_item_t *ci;
1306
1307   pthread_mutex_lock(&cache_lock);
1308
1309   ci = cache_queue_head;
1310   while (ci != NULL)
1311   {
1312     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1313     ci = ci->next;
1314   }
1315
1316   pthread_mutex_unlock(&cache_lock);
1317
1318   return send_response(sock, RESP_OK, "in queue.\n");
1319 } /* }}} int handle_request_queue */
1320
1321 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1322 {
1323   char *file, file_tmp[PATH_MAX];
1324   int values_num = 0;
1325   int status;
1326   char orig_buf[CMD_MAX];
1327
1328   cache_item_t *ci;
1329
1330   /* save it for the journal later */
1331   if (!JOURNAL_REPLAY(sock))
1332     strncpy(orig_buf, buffer, buffer_size);
1333
1334   status = buffer_get_field (&buffer, &buffer_size, &file);
1335   if (status != 0)
1336     return syntax_error(sock,cmd);
1337
1338   pthread_mutex_lock(&stats_lock);
1339   stats_updates_received++;
1340   pthread_mutex_unlock(&stats_lock);
1341
1342   get_abs_path(&file, file_tmp);
1343   if (!check_file_access(file, sock)) return 0;
1344
1345   pthread_mutex_lock (&cache_lock);
1346   ci = g_tree_lookup (cache_tree, file);
1347
1348   if (ci == NULL) /* {{{ */
1349   {
1350     struct stat statbuf;
1351     cache_item_t *tmp;
1352
1353     /* don't hold the lock while we setup; stat(2) might block */
1354     pthread_mutex_unlock(&cache_lock);
1355
1356     memset (&statbuf, 0, sizeof (statbuf));
1357     status = stat (file, &statbuf);
1358     if (status != 0)
1359     {
1360       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1361
1362       status = errno;
1363       if (status == ENOENT)
1364         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1365       else
1366         return send_response(sock, RESP_ERR,
1367                              "stat failed with error %i.\n", status);
1368     }
1369     if (!S_ISREG (statbuf.st_mode))
1370       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1371
1372     if (access(file, R_OK|W_OK) != 0)
1373       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1374                            file, rrd_strerror(errno));
1375
1376     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1377     if (ci == NULL)
1378     {
1379       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1380
1381       return send_response(sock, RESP_ERR, "malloc failed.\n");
1382     }
1383     memset (ci, 0, sizeof (cache_item_t));
1384
1385     ci->file = strdup (file);
1386     if (ci->file == NULL)
1387     {
1388       free (ci);
1389       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1390
1391       return send_response(sock, RESP_ERR, "strdup failed.\n");
1392     }
1393
1394     wipe_ci_values(ci, now);
1395     ci->flags = CI_FLAGS_IN_TREE;
1396     pthread_cond_init(&ci->flushed, NULL);
1397
1398     pthread_mutex_lock(&cache_lock);
1399
1400     /* another UPDATE might have added this entry in the meantime */
1401     tmp = g_tree_lookup (cache_tree, file);
1402     if (tmp == NULL)
1403       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1404     else
1405     {
1406       free_cache_item (ci);
1407       ci = tmp;
1408     }
1409
1410     /* state may have changed while we were unlocked */
1411     if (state == SHUTDOWN)
1412       return -1;
1413   } /* }}} */
1414   assert (ci != NULL);
1415
1416   /* don't re-write updates in replay mode */
1417   if (!JOURNAL_REPLAY(sock))
1418     journal_write("update", orig_buf);
1419
1420   while (buffer_size > 0)
1421   {
1422     char *value;
1423     time_t stamp;
1424     char *eostamp;
1425
1426     status = buffer_get_field (&buffer, &buffer_size, &value);
1427     if (status != 0)
1428     {
1429       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1430       break;
1431     }
1432
1433     /* make sure update time is always moving forward */
1434     stamp = strtol(value, &eostamp, 10);
1435     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1436     {
1437       pthread_mutex_unlock(&cache_lock);
1438       return send_response(sock, RESP_ERR,
1439                            "Cannot find timestamp in '%s'!\n", value);
1440     }
1441     else if (stamp <= ci->last_update_stamp)
1442     {
1443       pthread_mutex_unlock(&cache_lock);
1444       return send_response(sock, RESP_ERR,
1445                            "illegal attempt to update using time %ld when last"
1446                            " update time is %ld (minimum one second step)\n",
1447                            stamp, ci->last_update_stamp);
1448     }
1449     else
1450       ci->last_update_stamp = stamp;
1451
1452     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1453                               &ci->values_alloc, config_alloc_chunk))
1454     {
1455       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1456       continue;
1457     }
1458
1459     values_num++;
1460   }
1461
1462   if (((now - ci->last_flush_time) >= config_write_interval)
1463       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1464       && (ci->values_num > 0))
1465   {
1466     enqueue_cache_item (ci, TAIL);
1467   }
1468
1469   pthread_mutex_unlock (&cache_lock);
1470
1471   if (values_num < 1)
1472     return send_response(sock, RESP_ERR, "No values updated.\n");
1473   else
1474     return send_response(sock, RESP_OK,
1475                          "errors, enqueued %i value(s).\n", values_num);
1476
1477   /* NOTREACHED */
1478   assert(1==0);
1479
1480 } /* }}} int handle_request_update */
1481
1482 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1483 {
1484   char *file, file_tmp[PATH_MAX];
1485   char *cf;
1486
1487   char *start_str;
1488   char *end_str;
1489   time_t start_tm;
1490   time_t end_tm;
1491
1492   unsigned long step;
1493   unsigned long ds_cnt;
1494   char **ds_namv;
1495   rrd_value_t *data;
1496
1497   int status;
1498   unsigned long i;
1499   time_t t;
1500   rrd_value_t *data_ptr;
1501
1502   file = NULL;
1503   cf = NULL;
1504   start_str = NULL;
1505   end_str = NULL;
1506
1507   /* Read the arguments */
1508   do /* while (0) */
1509   {
1510     status = buffer_get_field (&buffer, &buffer_size, &file);
1511     if (status != 0)
1512       break;
1513
1514     status = buffer_get_field (&buffer, &buffer_size, &cf);
1515     if (status != 0)
1516       break;
1517
1518     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1519     if (status != 0)
1520     {
1521       start_str = NULL;
1522       status = 0;
1523       break;
1524     }
1525
1526     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1527     if (status != 0)
1528     {
1529       end_str = NULL;
1530       status = 0;
1531       break;
1532     }
1533   } while (0);
1534
1535   if (status != 0)
1536     return (syntax_error(sock,cmd));
1537
1538   get_abs_path(&file, file_tmp);
1539   if (!check_file_access(file, sock)) return 0;
1540
1541   status = flush_file (file);
1542   if ((status != 0) && (status != ENOENT))
1543     return (send_response (sock, RESP_ERR,
1544           "flush_file (%s) failed with status %i.\n", file, status));
1545
1546   t = time (NULL); /* "now" */
1547
1548   /* Parse start time */
1549   if (start_str != NULL)
1550   {
1551     char *endptr;
1552     long value;
1553
1554     endptr = NULL;
1555     errno = 0;
1556     value = strtol (start_str, &endptr, /* base = */ 0);
1557     if ((endptr == start_str) || (errno != 0))
1558       return (send_response(sock, RESP_ERR,
1559             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1560             start_str));
1561
1562     if (value > 0)
1563       start_tm = (time_t) value;
1564     else
1565       start_tm = (time_t) (t + value);
1566   }
1567   else
1568   {
1569     start_tm = t - 86400;
1570   }
1571
1572   /* Parse end time */
1573   if (end_str != NULL)
1574   {
1575     char *endptr;
1576     long value;
1577
1578     endptr = NULL;
1579     errno = 0;
1580     value = strtol (end_str, &endptr, /* base = */ 0);
1581     if ((endptr == end_str) || (errno != 0))
1582       return (send_response(sock, RESP_ERR,
1583             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1584             end_str));
1585
1586     if (value > 0)
1587       end_tm = (time_t) value;
1588     else
1589       end_tm = (time_t) (t + value);
1590   }
1591   else
1592   {
1593     end_tm = t;
1594   }
1595
1596   step = -1;
1597   ds_cnt = 0;
1598   ds_namv = NULL;
1599   data = NULL;
1600
1601   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1602       &ds_cnt, &ds_namv, &data);
1603   if (status != 0)
1604     return (send_response(sock, RESP_ERR,
1605           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1606
1607   add_response_info (sock, "FlushVersion: %lu\n", 1);
1608   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1609   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1610   add_response_info (sock, "Step: %lu\n", step);
1611   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1612
1613 #define SSTRCAT(buffer,str,buffer_fill) do { \
1614     size_t str_len = strlen (str); \
1615     if ((buffer_fill + str_len) > sizeof (buffer)) \
1616       str_len = sizeof (buffer) - buffer_fill; \
1617     if (str_len > 0) { \
1618       strncpy (buffer + buffer_fill, str, str_len); \
1619       buffer_fill += str_len; \
1620       assert (buffer_fill <= sizeof (buffer)); \
1621       if (buffer_fill == sizeof (buffer)) \
1622         buffer[buffer_fill - 1] = 0; \
1623       else \
1624         buffer[buffer_fill] = 0; \
1625     } \
1626   } while (0)
1627
1628   { /* Add list of DS names */
1629     char linebuf[1024];
1630     size_t linebuf_fill;
1631
1632     memset (linebuf, 0, sizeof (linebuf));
1633     linebuf_fill = 0;
1634     for (i = 0; i < ds_cnt; i++)
1635     {
1636       if (i > 0)
1637         SSTRCAT (linebuf, " ", linebuf_fill);
1638       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1639       rrd_freemem(ds_namv[i]);
1640     }
1641     rrd_freemem(ds_namv);
1642     add_response_info (sock, "DSName: %s\n", linebuf);
1643   }
1644
1645   /* Add the actual data */
1646   assert (step > 0);
1647   data_ptr = data;
1648   for (t = start_tm + step; t <= end_tm; t += step)
1649   {
1650     char linebuf[1024];
1651     size_t linebuf_fill;
1652     char tmp[128];
1653
1654     memset (linebuf, 0, sizeof (linebuf));
1655     linebuf_fill = 0;
1656     for (i = 0; i < ds_cnt; i++)
1657     {
1658       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1659       tmp[sizeof (tmp) - 1] = 0;
1660       SSTRCAT (linebuf, tmp, linebuf_fill);
1661
1662       data_ptr++;
1663     }
1664
1665     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1666   } /* for (t) */
1667   rrd_freemem(data);
1668
1669   return (send_response (sock, RESP_OK, "Success\n"));
1670 #undef SSTRCAT
1671 } /* }}} int handle_request_fetch */
1672
1673 /* we came across a "WROTE" entry during journal replay.
1674  * throw away any values that we have accumulated for this file
1675  */
1676 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1677 {
1678   cache_item_t *ci;
1679   const char *file = buffer;
1680
1681   pthread_mutex_lock(&cache_lock);
1682
1683   ci = g_tree_lookup(cache_tree, file);
1684   if (ci == NULL)
1685   {
1686     pthread_mutex_unlock(&cache_lock);
1687     return (0);
1688   }
1689
1690   if (ci->values)
1691     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1692
1693   wipe_ci_values(ci, now);
1694   remove_from_queue(ci);
1695
1696   pthread_mutex_unlock(&cache_lock);
1697   return (0);
1698 } /* }}} int handle_request_wrote */
1699
1700 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1701 {
1702   char *file, file_tmp[PATH_MAX];
1703   int status;
1704   rrd_info_t *data;
1705
1706   /* obtain filename */
1707   status = buffer_get_field(&buffer, &buffer_size, &file);
1708   if (status != 0)
1709     return syntax_error(sock,cmd);
1710   /* get full pathname */
1711   get_abs_path(&file, file_tmp);
1712   if (!check_file_access(file, sock)) {
1713     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1714   }
1715   /* get data */
1716   rrd_clear_error ();
1717   data = rrd_info_r(file);
1718   if(!data) {
1719     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1720   }
1721   while (data) {
1722       switch (data->type) {
1723       case RD_I_VAL:
1724           if (isnan(data->value.u_val))
1725               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1726           else
1727               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1728           break;
1729       case RD_I_CNT:
1730           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1731           break;
1732       case RD_I_INT:
1733           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1734           break;
1735       case RD_I_STR:
1736           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1737           break;
1738       case RD_I_BLO:
1739           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1740           break;
1741       }
1742       data = data->next;
1743   }
1744   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info  */
1746
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1748 {
1749   char *i, *file, file_tmp[PATH_MAX];
1750   int status;
1751   int idx;
1752   time_t t;
1753
1754   /* obtain filename */
1755   status = buffer_get_field(&buffer, &buffer_size, &file);
1756   if (status != 0)
1757     return syntax_error(sock,cmd);
1758   /* get full pathname */
1759   get_abs_path(&file, file_tmp);
1760   if (!check_file_access(file, sock)) {
1761     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1762   }
1763
1764   status = buffer_get_field(&buffer, &buffer_size, &i);
1765   if (status != 0)
1766     return syntax_error(sock,cmd);
1767   idx = atoi(i);
1768   if(idx<0) { 
1769     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1770   }
1771
1772   /* get data */
1773   rrd_clear_error ();
1774   t = rrd_first_r(file,idx);
1775   if(t<1) {
1776     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1777   }
1778   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_last  */
1780
1781 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1782 {
1783   char *file, file_tmp[PATH_MAX];
1784   int status;
1785   time_t t;
1786
1787   /* obtain filename */
1788   status = buffer_get_field(&buffer, &buffer_size, &file);
1789   if (status != 0)
1790     return syntax_error(sock,cmd);
1791   /* get full pathname */
1792   get_abs_path(&file, file_tmp);
1793   if (!check_file_access(file, sock)) {
1794     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1795   }
1796   /* get data */
1797   rrd_clear_error ();
1798   t = rrd_last_r(file);
1799   if(t<1) {
1800     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1801   }
1802   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1803 } /* }}} static int handle_request_last  */
1804
1805 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1806 {
1807   char *file, file_tmp[PATH_MAX];
1808   char *tok;
1809   int ac = 0;
1810   char *av[128];
1811   int status;
1812   ulong step = 300;
1813   time_t last_up = time(NULL)-10;
1814   rrd_time_value_t last_up_tv;
1815   char     *parsetime_error = NULL;
1816   int no_overwrite = opt_no_overwrite;
1817
1818
1819   /* obtain filename */
1820   status = buffer_get_field(&buffer, &buffer_size, &file);
1821   if (status != 0)
1822     return syntax_error(sock,cmd);
1823   /* get full pathname */
1824   get_abs_path(&file, file_tmp);
1825   if (!check_file_access(file, sock)) {
1826     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1827   }
1828   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1829
1830   status = buffer_get_field(&buffer, &buffer_size, &tok );
1831   for(;(tok && !status);status = buffer_get_field(&buffer, &buffer_size, &tok )) {
1832     if( ! strncmp(tok,"-b",2) ) {
1833       status = buffer_get_field(&buffer, &buffer_size, &tok );
1834       if (status != 0) return syntax_error(sock,cmd);
1835       if ((parsetime_error = rrd_parsetime(tok, &last_up_tv))) 
1836         return send_response(sock, RESP_ERR, "start time: %s\n", parsetime_error);
1837       if (last_up_tv.type == RELATIVE_TO_END_TIME ||
1838         last_up_tv.type == RELATIVE_TO_START_TIME) {
1839         return send_response(sock, RESP_ERR, "Cannot specify time relative to start or end here.\n");
1840       }
1841       last_up = mktime(&last_up_tv.tm) +last_up_tv.offset;
1842
1843       continue;
1844     }
1845     if( ! strncmp(tok,"-s",2) ) {
1846       status = buffer_get_field(&buffer, &buffer_size, &tok );
1847       if (status != 0) return syntax_error(sock,cmd);
1848       step = atol(tok);
1849       continue;
1850     }
1851     if( ! strncmp(tok,"-O",2) ) {
1852       no_overwrite = 1;
1853       continue;
1854     }
1855     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1856     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1857     return syntax_error(sock,cmd);
1858   }
1859   if(step<1) {
1860     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1861   }
1862   if (last_up < 3600 * 24 * 365 * 10) {
1863     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1864   }
1865
1866   rrd_create_set_no_overwrite(no_overwrite);
1867   rrd_clear_error ();
1868   status = rrd_create_r(file,step,last_up,ac,(const char **)av);
1869
1870   if(!status) {
1871     return send_response(sock, RESP_OK, "RRD created OK\n");
1872   }
1873   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1874 } /* }}} static int handle_request_create  */
1875
1876 /* start "BATCH" processing */
1877 static int batch_start (HANDLER_PROTO) /* {{{ */
1878 {
1879   int status;
1880   if (sock->batch_start)
1881     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1882
1883   status = send_response(sock, RESP_OK,
1884                          "Go ahead.  End with dot '.' on its own line.\n");
1885   sock->batch_start = time(NULL);
1886   sock->batch_cmd = 0;
1887
1888   return status;
1889 } /* }}} static int batch_start */
1890
1891 /* finish "BATCH" processing and return results to the client */
1892 static int batch_done (HANDLER_PROTO) /* {{{ */
1893 {
1894   assert(sock->batch_start);
1895   sock->batch_start = 0;
1896   sock->batch_cmd  = 0;
1897   return send_response(sock, RESP_OK, "errors\n");
1898 } /* }}} static int batch_done */
1899
1900 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1901 {
1902   return -1;
1903 } /* }}} static int handle_request_quit */
1904
1905 static command_t list_of_commands[] = { /* {{{ */
1906   {
1907     "UPDATE",
1908     handle_request_update,
1909     CMD_CONTEXT_ANY,
1910     "UPDATE <filename> <values> [<values> ...]\n"
1911     ,
1912     "Adds the given file to the internal cache if it is not yet known and\n"
1913     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1914     "for details.\n"
1915     "\n"
1916     "Each <values> has the following form:\n"
1917     "  <values> = <time>:<value>[:<value>[...]]\n"
1918     "See the rrdupdate(1) manpage for details.\n"
1919   },
1920   {
1921     "WROTE",
1922     handle_request_wrote,
1923     CMD_CONTEXT_JOURNAL,
1924     NULL,
1925     NULL
1926   },
1927   {
1928     "FLUSH",
1929     handle_request_flush,
1930     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1931     "FLUSH <filename>\n"
1932     ,
1933     "Adds the given filename to the head of the update queue and returns\n"
1934     "after it has been dequeued.\n"
1935   },
1936   {
1937     "FLUSHALL",
1938     handle_request_flushall,
1939     CMD_CONTEXT_CLIENT,
1940     "FLUSHALL\n"
1941     ,
1942     "Triggers writing of all pending updates.  Returns immediately.\n"
1943   },
1944   {
1945     "PENDING",
1946     handle_request_pending,
1947     CMD_CONTEXT_CLIENT,
1948     "PENDING <filename>\n"
1949     ,
1950     "Shows any 'pending' updates for a file, in order.\n"
1951     "The updates shown have not yet been written to the underlying RRD file.\n"
1952   },
1953   {
1954     "FORGET",
1955     handle_request_forget,
1956     CMD_CONTEXT_ANY,
1957     "FORGET <filename>\n"
1958     ,
1959     "Removes the file completely from the cache.\n"
1960     "Any pending updates for the file will be lost.\n"
1961   },
1962   {
1963     "QUEUE",
1964     handle_request_queue,
1965     CMD_CONTEXT_CLIENT,
1966     "QUEUE\n"
1967     ,
1968         "Shows all files in the output queue.\n"
1969     "The output is zero or more lines in the following format:\n"
1970     "(where <num_vals> is the number of values to be written)\n"
1971     "\n"
1972     "<num_vals> <filename>\n"
1973   },
1974   {
1975     "STATS",
1976     handle_request_stats,
1977     CMD_CONTEXT_CLIENT,
1978     "STATS\n"
1979     ,
1980     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1981     "a description of the values.\n"
1982   },
1983   {
1984     "HELP",
1985     handle_request_help,
1986     CMD_CONTEXT_CLIENT,
1987     "HELP [<command>]\n",
1988     NULL, /* special! */
1989   },
1990   {
1991     "BATCH",
1992     batch_start,
1993     CMD_CONTEXT_CLIENT,
1994     "BATCH\n"
1995     ,
1996     "The 'BATCH' command permits the client to initiate a bulk load\n"
1997     "   of commands to rrdcached.\n"
1998     "\n"
1999     "Usage:\n"
2000     "\n"
2001     "    client: BATCH\n"
2002     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2003     "    client: command #1\n"
2004     "    client: command #2\n"
2005     "    client: ... and so on\n"
2006     "    client: .\n"
2007     "    server: 2 errors\n"
2008     "    server: 7 message for command #7\n"
2009     "    server: 9 message for command #9\n"
2010     "\n"
2011     "For more information, consult the rrdcached(1) documentation.\n"
2012   },
2013   {
2014     ".",   /* BATCH terminator */
2015     batch_done,
2016     CMD_CONTEXT_BATCH,
2017     NULL,
2018     NULL
2019   },
2020   {
2021     "FETCH",
2022     handle_request_fetch,
2023     CMD_CONTEXT_CLIENT,
2024     "FETCH <file> <CF> [<start> [<end>]]\n"
2025     ,
2026     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2027   },
2028   {
2029     "INFO",
2030     handle_request_info,
2031     CMD_CONTEXT_CLIENT,
2032     "INFO <filename>\n",
2033     "The INFO command retrieves information about a specified RRD file.\n"
2034     "This is returned in standard rrdinfo format, a sequence of lines\n"
2035     "with the format <keyname> = <value>\n"
2036     "Note that this is the data as of the last update of the RRD file itself,\n"
2037     "not the last time data was received via rrdcached, so there may be pending\n"
2038     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2039   },
2040   {
2041     "FIRST",
2042     handle_request_first,
2043     CMD_CONTEXT_CLIENT,
2044     "FIRST <filename> <rra index>\n",
2045     "The FIRST command retrieves the first data time for a specified RRA in\n"
2046     "an RRD file.\n"
2047   },
2048   {
2049     "LAST",
2050     handle_request_last,
2051     CMD_CONTEXT_CLIENT,
2052     "LAST <filename>\n",
2053     "The LAST command retrieves the last update time for a specified RRD file.\n"
2054     "Note that this is the time of the last update of the RRD file itself, not\n"
2055     "the last time data was received via rrdcached, so there may be pending\n"
2056     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2057   },
2058   {
2059     "CREATE",
2060     handle_request_create,
2061     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2062     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2063     "The CREATE command will create an RRD file, overwriting any existing file\n"
2064     "unless the -O option is given or rrdcached was started with the -O option.\n"
2065     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2066   },
2067   {
2068     "QUIT",
2069     handle_request_quit,
2070     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2071     "QUIT\n"
2072     ,
2073     "Disconnect from rrdcached.\n"
2074   }
2075 }; /* }}} command_t list_of_commands[] */
2076 static size_t list_of_commands_len = sizeof (list_of_commands)
2077   / sizeof (list_of_commands[0]);
2078
2079 static command_t *find_command(char *cmd)
2080 {
2081   size_t i;
2082
2083   for (i = 0; i < list_of_commands_len; i++)
2084     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2085       return (&list_of_commands[i]);
2086   return NULL;
2087 }
2088
2089 /* We currently use the index in the `list_of_commands' array as a bit position
2090  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2091  * outside these functions so that switching to a more elegant storage method
2092  * is easily possible. */
2093 static ssize_t find_command_index (const char *cmd) /* {{{ */
2094 {
2095   size_t i;
2096
2097   for (i = 0; i < list_of_commands_len; i++)
2098     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2099       return ((ssize_t) i);
2100   return (-1);
2101 } /* }}} ssize_t find_command_index */
2102
2103 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2104     const char *cmd)
2105 {
2106   ssize_t i;
2107
2108   if (JOURNAL_REPLAY(sock))
2109     return (1);
2110
2111   if (cmd == NULL)
2112     return (-1);
2113
2114   if ((strcasecmp ("QUIT", cmd) == 0)
2115       || (strcasecmp ("HELP", cmd) == 0))
2116     return (1);
2117   else if (strcmp (".", cmd) == 0)
2118     cmd = "BATCH";
2119
2120   i = find_command_index (cmd);
2121   if (i < 0)
2122     return (-1);
2123   assert (i < 32);
2124
2125   if ((sock->permissions & (1 << i)) != 0)
2126     return (1);
2127   return (0);
2128 } /* }}} int socket_permission_check */
2129
2130 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2131     const char *cmd)
2132 {
2133   ssize_t i;
2134
2135   i = find_command_index (cmd);
2136   if (i < 0)
2137     return (-1);
2138   assert (i < 32);
2139
2140   sock->permissions |= (1 << i);
2141   return (0);
2142 } /* }}} int socket_permission_add */
2143
2144 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2145 {
2146   sock->permissions = 0;
2147 } /* }}} socket_permission_clear */
2148
2149 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2150     listen_socket_t *src)
2151 {
2152   dest->permissions = src->permissions;
2153 } /* }}} socket_permission_copy */
2154
2155 /* check whether commands are received in the expected context */
2156 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2157 {
2158   if (JOURNAL_REPLAY(sock))
2159     return (cmd->context & CMD_CONTEXT_JOURNAL);
2160   else if (sock->batch_start)
2161     return (cmd->context & CMD_CONTEXT_BATCH);
2162   else
2163     return (cmd->context & CMD_CONTEXT_CLIENT);
2164
2165   /* NOTREACHED */
2166   assert(1==0);
2167 }
2168
2169 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2170 {
2171   int status;
2172   char *cmd_str;
2173   char *resp_txt;
2174   command_t *help = NULL;
2175
2176   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2177   if (status == 0)
2178     help = find_command(cmd_str);
2179
2180   if (help && (help->syntax || help->help))
2181   {
2182     char tmp[CMD_MAX];
2183
2184     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2185     resp_txt = tmp;
2186
2187     if (help->syntax)
2188       add_response_info(sock, "Usage: %s\n", help->syntax);
2189
2190     if (help->help)
2191       add_response_info(sock, "%s\n", help->help);
2192   }
2193   else
2194   {
2195     size_t i;
2196
2197     resp_txt = "Command overview\n";
2198
2199     for (i = 0; i < list_of_commands_len; i++)
2200     {
2201       if (list_of_commands[i].syntax == NULL)
2202         continue;
2203       add_response_info (sock, "%s", list_of_commands[i].syntax);
2204     }
2205   }
2206
2207   return send_response(sock, RESP_OK, resp_txt);
2208 } /* }}} int handle_request_help */
2209
2210 static int handle_request (DISPATCH_PROTO) /* {{{ */
2211 {
2212   char *buffer_ptr = buffer;
2213   char *cmd_str = NULL;
2214   command_t *cmd = NULL;
2215   int status;
2216
2217   assert (buffer[buffer_size - 1] == '\0');
2218
2219   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2220   if (status != 0)
2221   {
2222     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2223     return (-1);
2224   }
2225
2226   if (sock != NULL && sock->batch_start)
2227     sock->batch_cmd++;
2228
2229   cmd = find_command(cmd_str);
2230   if (!cmd)
2231     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2232
2233   if (!socket_permission_check (sock, cmd->cmd))
2234     return send_response(sock, RESP_ERR, "Permission denied.\n");
2235
2236   if (!command_check_context(sock, cmd))
2237     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2238
2239   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2240 } /* }}} int handle_request */
2241
2242 static void journal_set_free (journal_set *js) /* {{{ */
2243 {
2244   if (js == NULL)
2245     return;
2246
2247   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2248
2249   free(js);
2250 } /* }}} journal_set_free */
2251
2252 static void journal_set_remove (journal_set *js) /* {{{ */
2253 {
2254   if (js == NULL)
2255     return;
2256
2257   for (uint i=0; i < js->files_num; i++)
2258   {
2259     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2260     unlink(js->files[i]);
2261   }
2262 } /* }}} journal_set_remove */
2263
2264 /* close current journal file handle.
2265  * MUST hold journal_lock before calling */
2266 static void journal_close(void) /* {{{ */
2267 {
2268   if (journal_fh != NULL)
2269   {
2270     if (fclose(journal_fh) != 0)
2271       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2272   }
2273
2274   journal_fh = NULL;
2275   journal_size = 0;
2276 } /* }}} journal_close */
2277
2278 /* MUST hold journal_lock before calling */
2279 static void journal_new_file(void) /* {{{ */
2280 {
2281   struct timeval now;
2282   int  new_fd;
2283   char new_file[PATH_MAX + 1];
2284
2285   assert(journal_dir != NULL);
2286   assert(journal_cur != NULL);
2287
2288   journal_close();
2289
2290   gettimeofday(&now, NULL);
2291   /* this format assures that the files sort in strcmp() order */
2292   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2293            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2294
2295   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2296                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2297   if (new_fd < 0)
2298     goto error;
2299
2300   journal_fh = fdopen(new_fd, "a");
2301   if (journal_fh == NULL)
2302     goto error;
2303
2304   journal_size = ftell(journal_fh);
2305   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2306
2307   /* record the file in the journal set */
2308   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2309
2310   return;
2311
2312 error:
2313   RRDD_LOG(LOG_CRIT,
2314            "JOURNALING DISABLED: Error while trying to create %s : %s",
2315            new_file, rrd_strerror(errno));
2316   RRDD_LOG(LOG_CRIT,
2317            "JOURNALING DISABLED: All values will be flushed at shutdown");
2318
2319   close(new_fd);
2320   config_flush_at_shutdown = 1;
2321
2322 } /* }}} journal_new_file */
2323
2324 /* MUST NOT hold journal_lock before calling this */
2325 static void journal_rotate(void) /* {{{ */
2326 {
2327   journal_set *old_js = NULL;
2328
2329   if (journal_dir == NULL)
2330     return;
2331
2332   RRDD_LOG(LOG_DEBUG, "rotating journals");
2333
2334   pthread_mutex_lock(&stats_lock);
2335   ++stats_journal_rotate;
2336   pthread_mutex_unlock(&stats_lock);
2337
2338   pthread_mutex_lock(&journal_lock);
2339
2340   journal_close();
2341
2342   /* rotate the journal sets */
2343   old_js = journal_old;
2344   journal_old = journal_cur;
2345   journal_cur = calloc(1, sizeof(journal_set));
2346
2347   if (journal_cur != NULL)
2348     journal_new_file();
2349   else
2350     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2351
2352   pthread_mutex_unlock(&journal_lock);
2353
2354   journal_set_remove(old_js);
2355   journal_set_free  (old_js);
2356
2357 } /* }}} static void journal_rotate */
2358
2359 /* MUST hold journal_lock when calling */
2360 static void journal_done(void) /* {{{ */
2361 {
2362   if (journal_cur == NULL)
2363     return;
2364
2365   journal_close();
2366
2367   if (config_flush_at_shutdown)
2368   {
2369     RRDD_LOG(LOG_INFO, "removing journals");
2370     journal_set_remove(journal_old);
2371     journal_set_remove(journal_cur);
2372   }
2373   else
2374   {
2375     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2376              "journals will be used at next startup");
2377   }
2378
2379   journal_set_free(journal_cur);
2380   journal_set_free(journal_old);
2381   free(journal_dir);
2382
2383 } /* }}} static void journal_done */
2384
2385 static int journal_write(char *cmd, char *args) /* {{{ */
2386 {
2387   int chars;
2388
2389   if (journal_fh == NULL)
2390     return 0;
2391
2392   pthread_mutex_lock(&journal_lock);
2393   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2394   journal_size += chars;
2395
2396   if (journal_size > JOURNAL_MAX)
2397     journal_new_file();
2398
2399   pthread_mutex_unlock(&journal_lock);
2400
2401   if (chars > 0)
2402   {
2403     pthread_mutex_lock(&stats_lock);
2404     stats_journal_bytes += chars;
2405     pthread_mutex_unlock(&stats_lock);
2406   }
2407
2408   return chars;
2409 } /* }}} static int journal_write */
2410
2411 static int journal_replay (const char *file) /* {{{ */
2412 {
2413   FILE *fh;
2414   int entry_cnt = 0;
2415   int fail_cnt = 0;
2416   uint64_t line = 0;
2417   char entry[CMD_MAX];
2418   time_t now;
2419
2420   if (file == NULL) return 0;
2421
2422   {
2423     char *reason = "unknown error";
2424     int status = 0;
2425     struct stat statbuf;
2426
2427     memset(&statbuf, 0, sizeof(statbuf));
2428     if (stat(file, &statbuf) != 0)
2429     {
2430       reason = "stat error";
2431       status = errno;
2432     }
2433     else if (!S_ISREG(statbuf.st_mode))
2434     {
2435       reason = "not a regular file";
2436       status = EPERM;
2437     }
2438     if (statbuf.st_uid != daemon_uid)
2439     {
2440       reason = "not owned by daemon user";
2441       status = EACCES;
2442     }
2443     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2444     {
2445       reason = "must not be user/group writable";
2446       status = EACCES;
2447     }
2448
2449     if (status != 0)
2450     {
2451       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2452                file, rrd_strerror(status), reason);
2453       return 0;
2454     }
2455   }
2456
2457   fh = fopen(file, "r");
2458   if (fh == NULL)
2459   {
2460     if (errno != ENOENT)
2461       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2462                file, rrd_strerror(errno));
2463     return 0;
2464   }
2465   else
2466     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2467
2468   now = time(NULL);
2469
2470   while(!feof(fh))
2471   {
2472     size_t entry_len;
2473
2474     ++line;
2475     if (fgets(entry, sizeof(entry), fh) == NULL)
2476       break;
2477     entry_len = strlen(entry);
2478
2479     /* check \n termination in case journal writing crashed mid-line */
2480     if (entry_len == 0)
2481       continue;
2482     else if (entry[entry_len - 1] != '\n')
2483     {
2484       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2485       ++fail_cnt;
2486       continue;
2487     }
2488
2489     entry[entry_len - 1] = '\0';
2490
2491     if (handle_request(NULL, now, entry, entry_len) == 0)
2492       ++entry_cnt;
2493     else
2494       ++fail_cnt;
2495   }
2496
2497   fclose(fh);
2498
2499   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2500            entry_cnt, fail_cnt);
2501
2502   return entry_cnt > 0 ? 1 : 0;
2503 } /* }}} static int journal_replay */
2504
2505 static int journal_sort(const void *v1, const void *v2)
2506 {
2507   char **jn1 = (char **) v1;
2508   char **jn2 = (char **) v2;
2509
2510   return strcmp(*jn1,*jn2);
2511 }
2512
2513 static void journal_init(void) /* {{{ */
2514 {
2515   int had_journal = 0;
2516   DIR *dir;
2517   struct dirent *dent;
2518   char path[PATH_MAX+1];
2519
2520   if (journal_dir == NULL) return;
2521
2522   pthread_mutex_lock(&journal_lock);
2523
2524   journal_cur = calloc(1, sizeof(journal_set));
2525   if (journal_cur == NULL)
2526   {
2527     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2528     return;
2529   }
2530
2531   RRDD_LOG(LOG_INFO, "checking for journal files");
2532
2533   /* Handle old journal files during transition.  This gives them the
2534    * correct sort order.  TODO: remove after first release
2535    */
2536   {
2537     char old_path[PATH_MAX+1];
2538     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2539     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2540     rename(old_path, path);
2541
2542     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2543     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2544     rename(old_path, path);
2545   }
2546
2547   dir = opendir(journal_dir);
2548   if (!dir) {
2549     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2550     return;
2551   }
2552   while ((dent = readdir(dir)) != NULL)
2553   {
2554     /* looks like a journal file? */
2555     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2556       continue;
2557
2558     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2559
2560     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2561     {
2562       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2563                dent->d_name);
2564       break;
2565     }
2566   }
2567   closedir(dir);
2568
2569   qsort(journal_cur->files, journal_cur->files_num,
2570         sizeof(journal_cur->files[0]), journal_sort);
2571
2572   for (uint i=0; i < journal_cur->files_num; i++)
2573     had_journal += journal_replay(journal_cur->files[i]);
2574
2575   journal_new_file();
2576
2577   /* it must have been a crash.  start a flush */
2578   if (had_journal && config_flush_at_shutdown)
2579     flush_old_values(-1);
2580
2581   pthread_mutex_unlock(&journal_lock);
2582
2583   RRDD_LOG(LOG_INFO, "journal processing complete");
2584
2585 } /* }}} static void journal_init */
2586
2587 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2588 {
2589   assert(sock != NULL);
2590
2591   free(sock->rbuf);  sock->rbuf = NULL;
2592   free(sock->wbuf);  sock->wbuf = NULL;
2593   free(sock);
2594 } /* }}} void free_listen_socket */
2595
2596 static void close_connection(listen_socket_t *sock) /* {{{ */
2597 {
2598   if (sock->fd >= 0)
2599   {
2600     close(sock->fd);
2601     sock->fd = -1;
2602   }
2603
2604   free_listen_socket(sock);
2605
2606 } /* }}} void close_connection */
2607
2608 static void *connection_thread_main (void *args) /* {{{ */
2609 {
2610   listen_socket_t *sock;
2611   int fd;
2612
2613   sock = (listen_socket_t *) args;
2614   fd = sock->fd;
2615
2616   /* init read buffers */
2617   sock->next_read = sock->next_cmd = 0;
2618   sock->rbuf = malloc(RBUF_SIZE);
2619   if (sock->rbuf == NULL)
2620   {
2621     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2622     close_connection(sock);
2623     return NULL;
2624   }
2625
2626   pthread_mutex_lock (&connection_threads_lock);
2627   connection_threads_num++;
2628   pthread_mutex_unlock (&connection_threads_lock);
2629
2630   while (state == RUNNING)
2631   {
2632     char *cmd;
2633     ssize_t cmd_len;
2634     ssize_t rbytes;
2635     time_t now;
2636
2637     struct pollfd pollfd;
2638     int status;
2639
2640     pollfd.fd = fd;
2641     pollfd.events = POLLIN | POLLPRI;
2642     pollfd.revents = 0;
2643
2644     status = poll (&pollfd, 1, /* timeout = */ 500);
2645     if (state != RUNNING)
2646       break;
2647     else if (status == 0) /* timeout */
2648       continue;
2649     else if (status < 0) /* error */
2650     {
2651       status = errno;
2652       if (status != EINTR)
2653         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2654       continue;
2655     }
2656
2657     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2658       break;
2659     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2660     {
2661       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2662           "poll(2) returned something unexpected: %#04hx",
2663           pollfd.revents);
2664       break;
2665     }
2666
2667     rbytes = read(fd, sock->rbuf + sock->next_read,
2668                   RBUF_SIZE - sock->next_read);
2669     if (rbytes < 0)
2670     {
2671       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2672       break;
2673     }
2674     else if (rbytes == 0)
2675       break; /* eof */
2676
2677     sock->next_read += rbytes;
2678
2679     if (sock->batch_start)
2680       now = sock->batch_start;
2681     else
2682       now = time(NULL);
2683
2684     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2685     {
2686       status = handle_request (sock, now, cmd, cmd_len+1);
2687       if (status != 0)
2688         goto out_close;
2689     }
2690   }
2691
2692 out_close:
2693   close_connection(sock);
2694
2695   /* Remove this thread from the connection threads list */
2696   pthread_mutex_lock (&connection_threads_lock);
2697   connection_threads_num--;
2698   if (connection_threads_num <= 0)
2699     pthread_cond_broadcast(&connection_threads_done);
2700   pthread_mutex_unlock (&connection_threads_lock);
2701
2702   return (NULL);
2703 } /* }}} void *connection_thread_main */
2704
2705 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2706 {
2707   int fd;
2708   struct sockaddr_un sa;
2709   listen_socket_t *temp;
2710   int status;
2711   const char *path;
2712   char *path_copy, *dir;
2713
2714   path = sock->addr;
2715   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2716     path += strlen("unix:");
2717
2718   /* dirname may modify its argument */
2719   path_copy = strdup(path);
2720   if (path_copy == NULL)
2721   {
2722     fprintf(stderr, "rrdcached: strdup(): %s\n",
2723         rrd_strerror(errno));
2724     return (-1);
2725   }
2726
2727   dir = dirname(path_copy);
2728   if (rrd_mkdir_p(dir, 0777) != 0)
2729   {
2730     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2731         dir, rrd_strerror(errno));
2732     return (-1);
2733   }
2734
2735   free(path_copy);
2736
2737   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2738       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2739   if (temp == NULL)
2740   {
2741     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2742     return (-1);
2743   }
2744   listen_fds = temp;
2745   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2746
2747   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2748   if (fd < 0)
2749   {
2750     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2751              rrd_strerror(errno));
2752     return (-1);
2753   }
2754
2755   memset (&sa, 0, sizeof (sa));
2756   sa.sun_family = AF_UNIX;
2757   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2758
2759   /* if we've gotten this far, we own the pid file.  any daemon started
2760    * with the same args must not be alive.  therefore, ensure that we can
2761    * create the socket...
2762    */
2763   unlink(path);
2764
2765   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2766   if (status != 0)
2767   {
2768     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2769              path, rrd_strerror(errno));
2770     close (fd);
2771     return (-1);
2772   }
2773
2774   /* tweak the sockets group ownership */
2775   if (sock->socket_group != (gid_t)-1)
2776   {
2777     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2778          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2779     {
2780       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2781     }
2782   }
2783
2784   if (sock->socket_permissions != (mode_t)-1)
2785   {
2786     if (chmod(path, sock->socket_permissions) != 0)
2787       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2788           (unsigned int)sock->socket_permissions, strerror(errno));
2789   }
2790
2791   status = listen (fd, /* backlog = */ 10);
2792   if (status != 0)
2793   {
2794     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2795              path, rrd_strerror(errno));
2796     close (fd);
2797     unlink (path);
2798     return (-1);
2799   }
2800
2801   listen_fds[listen_fds_num].fd = fd;
2802   listen_fds[listen_fds_num].family = PF_UNIX;
2803   strncpy(listen_fds[listen_fds_num].addr, path,
2804           sizeof (listen_fds[listen_fds_num].addr) - 1);
2805   listen_fds_num++;
2806
2807   return (0);
2808 } /* }}} int open_listen_socket_unix */
2809
2810 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2811 {
2812   struct addrinfo ai_hints;
2813   struct addrinfo *ai_res;
2814   struct addrinfo *ai_ptr;
2815   char addr_copy[NI_MAXHOST];
2816   char *addr;
2817   char *port;
2818   int status;
2819
2820   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2821   addr_copy[sizeof (addr_copy) - 1] = 0;
2822   addr = addr_copy;
2823
2824   memset (&ai_hints, 0, sizeof (ai_hints));
2825   ai_hints.ai_flags = 0;
2826 #ifdef AI_ADDRCONFIG
2827   ai_hints.ai_flags |= AI_ADDRCONFIG;
2828 #endif
2829   ai_hints.ai_family = AF_UNSPEC;
2830   ai_hints.ai_socktype = SOCK_STREAM;
2831
2832   port = NULL;
2833   if (*addr == '[') /* IPv6+port format */
2834   {
2835     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2836     addr++;
2837
2838     port = strchr (addr, ']');
2839     if (port == NULL)
2840     {
2841       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2842       return (-1);
2843     }
2844     *port = 0;
2845     port++;
2846
2847     if (*port == ':')
2848       port++;
2849     else if (*port == 0)
2850       port = NULL;
2851     else
2852     {
2853       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2854       return (-1);
2855     }
2856   } /* if (*addr == '[') */
2857   else
2858   {
2859     port = rindex(addr, ':');
2860     if (port != NULL)
2861     {
2862       *port = 0;
2863       port++;
2864     }
2865   }
2866   ai_res = NULL;
2867   status = getaddrinfo (addr,
2868                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2869                         &ai_hints, &ai_res);
2870   if (status != 0)
2871   {
2872     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2873              addr, gai_strerror (status));
2874     return (-1);
2875   }
2876
2877   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2878   {
2879     int fd;
2880     listen_socket_t *temp;
2881     int one = 1;
2882
2883     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2884         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2885     if (temp == NULL)
2886     {
2887       fprintf (stderr,
2888                "rrdcached: open_listen_socket_network: realloc failed.\n");
2889       continue;
2890     }
2891     listen_fds = temp;
2892     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2893
2894     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2895     if (fd < 0)
2896     {
2897       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2898                rrd_strerror(errno));
2899       continue;
2900     }
2901
2902     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2903
2904     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2905     if (status != 0)
2906     {
2907       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2908                sock->addr, rrd_strerror(errno));
2909       close (fd);
2910       continue;
2911     }
2912
2913     status = listen (fd, /* backlog = */ 10);
2914     if (status != 0)
2915     {
2916       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2917                sock->addr, rrd_strerror(errno));
2918       close (fd);
2919       freeaddrinfo(ai_res);
2920       return (-1);
2921     }
2922
2923     listen_fds[listen_fds_num].fd = fd;
2924     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2925     listen_fds_num++;
2926   } /* for (ai_ptr) */
2927
2928   freeaddrinfo(ai_res);
2929   return (0);
2930 } /* }}} static int open_listen_socket_network */
2931
2932 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2933 {
2934   assert(sock != NULL);
2935   assert(sock->addr != NULL);
2936
2937   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2938       || sock->addr[0] == '/')
2939     return (open_listen_socket_unix(sock));
2940   else
2941     return (open_listen_socket_network(sock));
2942 } /* }}} int open_listen_socket */
2943
2944 static int close_listen_sockets (void) /* {{{ */
2945 {
2946   size_t i;
2947
2948   for (i = 0; i < listen_fds_num; i++)
2949   {
2950     close (listen_fds[i].fd);
2951
2952     if (listen_fds[i].family == PF_UNIX)
2953       unlink(listen_fds[i].addr);
2954   }
2955
2956   free (listen_fds);
2957   listen_fds = NULL;
2958   listen_fds_num = 0;
2959
2960   return (0);
2961 } /* }}} int close_listen_sockets */
2962
2963 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2964 {
2965   struct pollfd *pollfds;
2966   int pollfds_num;
2967   int status;
2968   int i;
2969
2970   if (listen_fds_num < 1)
2971   {
2972     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2973     return (NULL);
2974   }
2975
2976   pollfds_num = listen_fds_num;
2977   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2978   if (pollfds == NULL)
2979   {
2980     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2981     return (NULL);
2982   }
2983   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2984
2985   RRDD_LOG(LOG_INFO, "listening for connections");
2986
2987   while (state == RUNNING)
2988   {
2989     for (i = 0; i < pollfds_num; i++)
2990     {
2991       pollfds[i].fd = listen_fds[i].fd;
2992       pollfds[i].events = POLLIN | POLLPRI;
2993       pollfds[i].revents = 0;
2994     }
2995
2996     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2997     if (state != RUNNING)
2998       break;
2999     else if (status == 0) /* timeout */
3000       continue;
3001     else if (status < 0) /* error */
3002     {
3003       status = errno;
3004       if (status != EINTR)
3005       {
3006         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3007       }
3008       continue;
3009     }
3010
3011     for (i = 0; i < pollfds_num; i++)
3012     {
3013       listen_socket_t *client_sock;
3014       struct sockaddr_storage client_sa;
3015       socklen_t client_sa_size;
3016       pthread_t tid;
3017       pthread_attr_t attr;
3018
3019       if (pollfds[i].revents == 0)
3020         continue;
3021
3022       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3023       {
3024         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3025             "poll(2) returned something unexpected for listen FD #%i.",
3026             pollfds[i].fd);
3027         continue;
3028       }
3029
3030       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3031       if (client_sock == NULL)
3032       {
3033         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3034         continue;
3035       }
3036       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3037
3038       client_sa_size = sizeof (client_sa);
3039       client_sock->fd = accept (pollfds[i].fd,
3040           (struct sockaddr *) &client_sa, &client_sa_size);
3041       if (client_sock->fd < 0)
3042       {
3043         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3044         free(client_sock);
3045         continue;
3046       }
3047
3048       pthread_attr_init (&attr);
3049       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3050
3051       status = pthread_create (&tid, &attr, connection_thread_main,
3052                                client_sock);
3053       if (status != 0)
3054       {
3055         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3056         close_connection(client_sock);
3057         continue;
3058       }
3059     } /* for (pollfds_num) */
3060   } /* while (state == RUNNING) */
3061
3062   RRDD_LOG(LOG_INFO, "starting shutdown");
3063
3064   close_listen_sockets ();
3065
3066   pthread_mutex_lock (&connection_threads_lock);
3067   while (connection_threads_num > 0)
3068     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3069   pthread_mutex_unlock (&connection_threads_lock);
3070
3071   free(pollfds);
3072
3073   return (NULL);
3074 } /* }}} void *listen_thread_main */
3075
3076 static int daemonize (void) /* {{{ */
3077 {
3078   int pid_fd;
3079   char *base_dir;
3080
3081   daemon_uid = geteuid();
3082
3083   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3084   if (pid_fd < 0)
3085     pid_fd = check_pidfile();
3086   if (pid_fd < 0)
3087     return pid_fd;
3088
3089   /* open all the listen sockets */
3090   if (config_listen_address_list_len > 0)
3091   {
3092     for (size_t i = 0; i < config_listen_address_list_len; i++)
3093       open_listen_socket (config_listen_address_list[i]);
3094
3095     rrd_free_ptrs((void ***) &config_listen_address_list,
3096                   &config_listen_address_list_len);
3097   }
3098   else
3099   {
3100     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3101         sizeof(default_socket.addr) - 1);
3102     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3103     open_listen_socket (&default_socket);
3104   }
3105
3106   if (listen_fds_num < 1)
3107   {
3108     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3109     goto error;
3110   }
3111
3112   if (!stay_foreground)
3113   {
3114     pid_t child;
3115
3116     child = fork ();
3117     if (child < 0)
3118     {
3119       fprintf (stderr, "daemonize: fork(2) failed.\n");
3120       goto error;
3121     }
3122     else if (child > 0)
3123       exit(0);
3124
3125     /* Become session leader */
3126     setsid ();
3127
3128     /* Open the first three file descriptors to /dev/null */
3129     close (2);
3130     close (1);
3131     close (0);
3132
3133     open ("/dev/null", O_RDWR);
3134     if (dup(0) == -1 || dup(0) == -1){
3135         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3136     }
3137   } /* if (!stay_foreground) */
3138
3139   /* Change into the /tmp directory. */
3140   base_dir = (config_base_dir != NULL)
3141     ? config_base_dir
3142     : "/tmp";
3143
3144   if (chdir (base_dir) != 0)
3145   {
3146     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3147     goto error;
3148   }
3149
3150   install_signal_handlers();
3151
3152   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3153   RRDD_LOG(LOG_INFO, "starting up");
3154
3155   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3156                                 (GDestroyNotify) free_cache_item);
3157   if (cache_tree == NULL)
3158   {
3159     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3160     goto error;
3161   }
3162
3163   return write_pidfile (pid_fd);
3164
3165 error:
3166   remove_pidfile();
3167   return -1;
3168 } /* }}} int daemonize */
3169
3170 static int cleanup (void) /* {{{ */
3171 {
3172   pthread_cond_broadcast (&flush_cond);
3173   pthread_join (flush_thread, NULL);
3174
3175   pthread_cond_broadcast (&queue_cond);
3176   for (int i = 0; i < config_queue_threads; i++)
3177     pthread_join (queue_threads[i], NULL);
3178
3179   if (config_flush_at_shutdown)
3180   {
3181     assert(cache_queue_head == NULL);
3182     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3183   }
3184
3185   free(queue_threads);
3186   free(config_base_dir);
3187
3188   pthread_mutex_lock(&cache_lock);
3189   g_tree_destroy(cache_tree);
3190
3191   pthread_mutex_lock(&journal_lock);
3192   journal_done();
3193
3194   RRDD_LOG(LOG_INFO, "goodbye");
3195   closelog ();
3196
3197   remove_pidfile ();
3198   free(config_pid_file);
3199
3200   return (0);
3201 } /* }}} int cleanup */
3202
3203 static int read_options (int argc, char **argv) /* {{{ */
3204 {
3205   int option;
3206   int status = 0;
3207
3208   socket_permission_clear (&default_socket);
3209
3210   default_socket.socket_group = (gid_t)-1;
3211   default_socket.socket_permissions = (mode_t)-1;
3212
3213   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3214   {
3215     switch (option)
3216     {
3217       case 'O':
3218         opt_no_overwrite = 1;
3219         break;
3220
3221       case 'g':
3222         stay_foreground=1;
3223         break;
3224
3225       case 'l':
3226       {
3227         listen_socket_t *new;
3228
3229         new = malloc(sizeof(listen_socket_t));
3230         if (new == NULL)
3231         {
3232           fprintf(stderr, "read_options: malloc failed.\n");
3233           return(2);
3234         }
3235         memset(new, 0, sizeof(listen_socket_t));
3236
3237         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3238
3239         /* Add permissions to the socket {{{ */
3240         if (default_socket.permissions != 0)
3241         {
3242           socket_permission_copy (new, &default_socket);
3243         }
3244         else /* if (default_socket.permissions == 0) */
3245         {
3246           /* Add permission for ALL commands to the socket. */
3247           size_t i;
3248           for (i = 0; i < list_of_commands_len; i++)
3249           {
3250             status = socket_permission_add (new, list_of_commands[i].cmd);
3251             if (status != 0)
3252             {
3253               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3254                   "socket failed. This should never happen, ever! Sorry.\n",
3255                   list_of_commands[i].cmd);
3256               status = 4;
3257             }
3258           }
3259         }
3260         /* }}} Done adding permissions. */
3261
3262         new->socket_group = default_socket.socket_group;
3263         new->socket_permissions = default_socket.socket_permissions;
3264
3265         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3266                          &config_listen_address_list_len, new))
3267         {
3268           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3269           return (2);
3270         }
3271       }
3272       break;
3273
3274       /* set socket group permissions */
3275       case 's':
3276       {
3277         gid_t group_gid;
3278         struct group *grp;
3279
3280         group_gid = strtoul(optarg, NULL, 10);
3281         if (errno != EINVAL && group_gid>0)
3282         {
3283           /* we were passed a number */
3284           grp = getgrgid(group_gid);
3285         }
3286         else
3287         {
3288           grp = getgrnam(optarg);
3289         }
3290
3291         if (grp)
3292         {
3293           default_socket.socket_group = grp->gr_gid;
3294         }
3295         else
3296         {
3297           /* no idea what the user wanted... */
3298           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3299           return (5);
3300         }
3301       }
3302       break;
3303
3304       /* set socket file permissions */
3305       case 'm':
3306       {
3307         long  tmp;
3308         char *endptr = NULL;
3309
3310         tmp = strtol (optarg, &endptr, 8);
3311         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3312             || (tmp > 07777) || (tmp < 0)) {
3313           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3314               optarg);
3315           return (5);
3316         }
3317
3318         default_socket.socket_permissions = (mode_t)tmp;
3319       }
3320       break;
3321
3322       case 'P':
3323       {
3324         char *optcopy;
3325         char *saveptr;
3326         char *dummy;
3327         char *ptr;
3328
3329         socket_permission_clear (&default_socket);
3330
3331         optcopy = strdup (optarg);
3332         dummy = optcopy;
3333         saveptr = NULL;
3334         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3335         {
3336           dummy = NULL;
3337           status = socket_permission_add (&default_socket, ptr);
3338           if (status != 0)
3339           {
3340             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3341                 "socket failed. Most likely, this permission doesn't "
3342                 "exist. Check your command line.\n", ptr);
3343             status = 4;
3344           }
3345         }
3346
3347         free (optcopy);
3348       }
3349       break;
3350
3351       case 'f':
3352       {
3353         int temp;
3354
3355         temp = atoi (optarg);
3356         if (temp > 0)
3357           config_flush_interval = temp;
3358         else
3359         {
3360           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3361           status = 3;
3362         }
3363       }
3364       break;
3365
3366       case 'w':
3367       {
3368         int temp;
3369
3370         temp = atoi (optarg);
3371         if (temp > 0)
3372           config_write_interval = temp;
3373         else
3374         {
3375           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3376           status = 2;
3377         }
3378       }
3379       break;
3380
3381       case 'z':
3382       {
3383         int temp;
3384
3385         temp = atoi(optarg);
3386         if (temp > 0)
3387           config_write_jitter = temp;
3388         else
3389         {
3390           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3391           status = 2;
3392         }
3393
3394         break;
3395       }
3396
3397       case 't':
3398       {
3399         int threads;
3400         threads = atoi(optarg);
3401         if (threads >= 1)
3402           config_queue_threads = threads;
3403         else
3404         {
3405           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3406           return 1;
3407         }
3408       }
3409       break;
3410
3411       case 'B':
3412         config_write_base_only = 1;
3413         break;
3414
3415       case 'b':
3416       {
3417         size_t len;
3418         char base_realpath[PATH_MAX];
3419
3420         if (config_base_dir != NULL)
3421           free (config_base_dir);
3422         config_base_dir = strdup (optarg);
3423         if (config_base_dir == NULL)
3424         {
3425           fprintf (stderr, "read_options: strdup failed.\n");
3426           return (3);
3427         }
3428
3429         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3430         {
3431           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3432               config_base_dir, rrd_strerror (errno));
3433           return (3);
3434         }
3435
3436         /* make sure that the base directory is not resolved via
3437          * symbolic links.  this makes some performance-enhancing
3438          * assumptions possible (we don't have to resolve paths
3439          * that start with a "/")
3440          */
3441         if (realpath(config_base_dir, base_realpath) == NULL)
3442         {
3443           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3444               "%s\n", config_base_dir, rrd_strerror(errno));
3445           return 5;
3446         }
3447
3448         len = strlen (config_base_dir);
3449         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3450         {
3451           config_base_dir[len - 1] = 0;
3452           len--;
3453         }
3454
3455         if (len < 1)
3456         {
3457           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3458           return (4);
3459         }
3460
3461         _config_base_dir_len = len;
3462
3463         len = strlen (base_realpath);
3464         while ((len > 0) && (base_realpath[len - 1] == '/'))
3465         {
3466           base_realpath[len - 1] = '\0';
3467           len--;
3468         }
3469
3470         if (strncmp(config_base_dir,
3471                          base_realpath, sizeof(base_realpath)) != 0)
3472         {
3473           fprintf(stderr,
3474                   "Base directory (-b) resolved via file system links!\n"
3475                   "Please consult rrdcached '-b' documentation!\n"
3476                   "Consider specifying the real directory (%s)\n",
3477                   base_realpath);
3478           return 5;
3479         }
3480       }
3481       break;
3482
3483       case 'p':
3484       {
3485         if (config_pid_file != NULL)
3486           free (config_pid_file);
3487         config_pid_file = strdup (optarg);
3488         if (config_pid_file == NULL)
3489         {
3490           fprintf (stderr, "read_options: strdup failed.\n");
3491           return (3);
3492         }
3493       }
3494       break;
3495
3496       case 'F':
3497         config_flush_at_shutdown = 1;
3498         break;
3499
3500       case 'j':
3501       {
3502         char journal_dir_actual[PATH_MAX];
3503         const char *dir;
3504         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3505
3506         status = rrd_mkdir_p(dir, 0777);
3507         if (status != 0)
3508         {
3509           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3510               dir, rrd_strerror(errno));
3511           return 6;
3512         }
3513
3514         if (access(dir, R_OK|W_OK|X_OK) != 0)
3515         {
3516           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3517                   errno ? rrd_strerror(errno) : "");
3518           return 6;
3519         }
3520       }
3521       break;
3522
3523       case 'a':
3524       {
3525         int temp = atoi(optarg);
3526         if (temp > 0)
3527           config_alloc_chunk = temp;
3528         else
3529         {
3530           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3531           return 10;
3532         }
3533       }
3534       break;
3535
3536       case 'h':
3537       case '?':
3538         printf ("RRDCacheD %s\n"
3539             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3540             "\n"
3541             "Usage: rrdcached [options]\n"
3542             "\n"
3543             "Valid options are:\n"
3544             "  -l <address>  Socket address to listen to.\n"
3545             "  -P <perms>    Sets the permissions to assign to all following "
3546                             "sockets\n"
3547             "  -w <seconds>  Interval in which to write data.\n"
3548             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3549             "  -t <threads>  Number of write threads.\n"
3550             "  -f <seconds>  Interval in which to flush dead data.\n"
3551             "  -p <file>     Location of the PID-file.\n"
3552             "  -b <dir>      Base directory to change to.\n"
3553             "  -B            Restrict file access to paths within -b <dir>\n"
3554             "  -g            Do not fork and run in the foreground.\n"
3555             "  -j <dir>      Directory in which to create the journal files.\n"
3556             "  -F            Always flush all updates at shutdown\n"
3557             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3558             "                (the socket will also have read/write permissions "
3559                             "for that group)\n"
3560             "  -m <mode>     File permissions (octal) of all following UNIX "
3561                             "sockets\n"
3562             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3563             "  -O            Do not allow CREATE commands to overwrite existing\n"
3564             "                files, even if asked to.\n"
3565             "\n"
3566             "For more information and a detailed description of all options "
3567             "please refer\n"
3568             "to the rrdcached(1) manual page.\n",
3569             VERSION);
3570         if (option == 'h')
3571           status = -1;
3572         else
3573           status = 1;
3574         break;
3575     } /* switch (option) */
3576   } /* while (getopt) */
3577
3578   /* advise the user when values are not sane */
3579   if (config_flush_interval < 2 * config_write_interval)
3580     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3581             " 2x write interval (-w) !\n");
3582   if (config_write_jitter > config_write_interval)
3583     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3584             " write interval (-w) !\n");
3585
3586   if (config_write_base_only && config_base_dir == NULL)
3587     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3588             "  Consult the rrdcached documentation\n");
3589
3590   if (journal_dir == NULL)
3591     config_flush_at_shutdown = 1;
3592
3593   return (status);
3594 } /* }}} int read_options */
3595
3596 int main (int argc, char **argv)
3597 {
3598   int status;
3599
3600   status = read_options (argc, argv);
3601   if (status != 0)
3602   {
3603     if (status < 0)
3604       status = 0;
3605     return (status);
3606   }
3607
3608   status = daemonize ();
3609   if (status != 0)
3610   {
3611     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3612     return (1);
3613   }
3614
3615   journal_init();
3616
3617   /* start the queue threads */
3618   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3619   if (queue_threads == NULL)
3620   {
3621     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3622     cleanup();
3623     return (1);
3624   }
3625   for (int i = 0; i < config_queue_threads; i++)
3626   {
3627     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3628     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3629     if (status != 0)
3630     {
3631       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3632       cleanup();
3633       return (1);
3634     }
3635   }
3636
3637   /* start the flush thread */
3638   memset(&flush_thread, 0, sizeof(flush_thread));
3639   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3640   if (status != 0)
3641   {
3642     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3643     cleanup();
3644     return (1);
3645   }
3646
3647   listen_thread_main (NULL);
3648   cleanup ();
3649
3650   return (0);
3651 } /* int main */
3652
3653 /*
3654  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3655  */