Under most circumstances, rrdcached can detect a stale pid file.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
149
150 struct callback_flush_data_s
151 {
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
158
159 enum queue_side_e
160 {
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
165
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
169
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
175
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
178
179 static int do_shutdown = 0;
180
181 static pthread_t queue_thread;
182
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
186
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
193
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
202
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
205
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
223
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
228 {
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
233
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235 {
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
238
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240 {
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
243
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245 {
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
249
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251 {
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
255
256 static void install_signal_handlers(void) /* {{{ */
257 {
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
265
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
270
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
274
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
278
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
282
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
286
287 } /* }}} void install_signal_handlers */
288
289 static int open_pidfile(char *action, int oflag) /* {{{ */
290 {
291   int fd;
292   char *file;
293
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
297
298   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
301             action, file, rrd_strerror(errno));
302
303   return(fd);
304 } /* }}} static int open_pidfile */
305
306 /* check existing pid file to see whether a daemon is running */
307 static int check_pidfile(void)
308 {
309   int pid_fd;
310   pid_t pid;
311   char pid_str[16];
312
313   pid_fd = open_pidfile("open", O_RDWR);
314   if (pid_fd < 0)
315     return pid_fd;
316
317   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
318     return -1;
319
320   pid = atoi(pid_str);
321   if (pid <= 0)
322     return -1;
323
324   /* another running process that we can signal COULD be
325    * a competing rrdcached */
326   if (pid != getpid() && kill(pid, 0) == 0)
327   {
328     fprintf(stderr,
329             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
330     close(pid_fd);
331     return -1;
332   }
333
334   lseek(pid_fd, 0, SEEK_SET);
335   ftruncate(pid_fd, 0);
336
337   fprintf(stderr,
338           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
339           "rrdcached: starting normally.\n", pid);
340
341   return pid_fd;
342 } /* }}} static int check_pidfile */
343
344 static int write_pidfile (int fd) /* {{{ */
345 {
346   pid_t pid;
347   FILE *fh;
348
349   pid = getpid ();
350
351   fh = fdopen (fd, "w");
352   if (fh == NULL)
353   {
354     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
355     close(fd);
356     return (-1);
357   }
358
359   fprintf (fh, "%i\n", (int) pid);
360   fclose (fh);
361
362   return (0);
363 } /* }}} int write_pidfile */
364
365 static int remove_pidfile (void) /* {{{ */
366 {
367   char *file;
368   int status;
369
370   file = (config_pid_file != NULL)
371     ? config_pid_file
372     : LOCALSTATEDIR "/run/rrdcached.pid";
373
374   status = unlink (file);
375   if (status == 0)
376     return (0);
377   return (errno);
378 } /* }}} int remove_pidfile */
379
380 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
381 {
382   char *eol;
383
384   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
385                sock->next_read - sock->next_cmd);
386
387   if (eol == NULL)
388   {
389     /* no commands left, move remainder back to front of rbuf */
390     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
391             sock->next_read - sock->next_cmd);
392     sock->next_read -= sock->next_cmd;
393     sock->next_cmd = 0;
394     *len = 0;
395     return NULL;
396   }
397   else
398   {
399     char *cmd = sock->rbuf + sock->next_cmd;
400     *eol = '\0';
401
402     sock->next_cmd = eol - sock->rbuf + 1;
403
404     if (eol > sock->rbuf && *(eol-1) == '\r')
405       *(--eol) = '\0'; /* handle "\r\n" EOL */
406
407     *len = eol - cmd;
408
409     return cmd;
410   }
411
412   /* NOTREACHED */
413   assert(1==0);
414 }
415
416 /* add the characters directly to the write buffer */
417 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
418 {
419   char *new_buf;
420
421   assert(sock != NULL);
422
423   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
424   if (new_buf == NULL)
425   {
426     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
427     return -1;
428   }
429
430   strncpy(new_buf + sock->wbuf_len, str, len + 1);
431
432   sock->wbuf = new_buf;
433   sock->wbuf_len += len;
434
435   return 0;
436 } /* }}} static int add_to_wbuf */
437
438 /* add the text to the "extra" info that's sent after the status line */
439 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
440 {
441   va_list argp;
442   char buffer[CMD_MAX];
443   int len;
444
445   if (sock == NULL) return 0; /* journal replay mode */
446   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
447
448   va_start(argp, fmt);
449 #ifdef HAVE_VSNPRINTF
450   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
451 #else
452   len = vsprintf(buffer, fmt, argp);
453 #endif
454   va_end(argp);
455   if (len < 0)
456   {
457     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
458     return -1;
459   }
460
461   return add_to_wbuf(sock, buffer, len);
462 } /* }}} static int add_response_info */
463
464 static int count_lines(char *str) /* {{{ */
465 {
466   int lines = 0;
467
468   if (str != NULL)
469   {
470     while ((str = strchr(str, '\n')) != NULL)
471     {
472       ++lines;
473       ++str;
474     }
475   }
476
477   return lines;
478 } /* }}} static int count_lines */
479
480 /* send the response back to the user.
481  * returns 0 on success, -1 on error
482  * write buffer is always zeroed after this call */
483 static int send_response (listen_socket_t *sock, response_code rc,
484                           char *fmt, ...) /* {{{ */
485 {
486   va_list argp;
487   char buffer[CMD_MAX];
488   int lines;
489   ssize_t wrote;
490   int rclen, len;
491
492   if (sock == NULL) return rc;  /* journal replay mode */
493
494   if (sock->batch_start)
495   {
496     if (rc == RESP_OK)
497       return rc; /* no response on success during BATCH */
498     lines = sock->batch_cmd;
499   }
500   else if (rc == RESP_OK)
501     lines = count_lines(sock->wbuf);
502   else
503     lines = -1;
504
505   rclen = sprintf(buffer, "%d ", lines);
506   va_start(argp, fmt);
507 #ifdef HAVE_VSNPRINTF
508   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
509 #else
510   len = vsprintf(buffer+rclen, fmt, argp);
511 #endif
512   va_end(argp);
513   if (len < 0)
514     return -1;
515
516   len += rclen;
517
518   /* append the result to the wbuf, don't write to the user */
519   if (sock->batch_start)
520     return add_to_wbuf(sock, buffer, len);
521
522   /* first write must be complete */
523   if (len != write(sock->fd, buffer, len))
524   {
525     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
526     return -1;
527   }
528
529   if (sock->wbuf != NULL && rc == RESP_OK)
530   {
531     wrote = 0;
532     while (wrote < sock->wbuf_len)
533     {
534       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
535       if (wb <= 0)
536       {
537         RRDD_LOG(LOG_INFO, "send_response: could not write results");
538         return -1;
539       }
540       wrote += wb;
541     }
542   }
543
544   free(sock->wbuf); sock->wbuf = NULL;
545   sock->wbuf_len = 0;
546
547   return 0;
548 } /* }}} */
549
550 static void wipe_ci_values(cache_item_t *ci, time_t when)
551 {
552   ci->values = NULL;
553   ci->values_num = 0;
554
555   ci->last_flush_time = when;
556   if (config_write_jitter > 0)
557     ci->last_flush_time += (random() % config_write_jitter);
558 }
559
560 /* remove_from_queue
561  * remove a "cache_item_t" item from the queue.
562  * must hold 'cache_lock' when calling this
563  */
564 static void remove_from_queue(cache_item_t *ci) /* {{{ */
565 {
566   if (ci == NULL) return;
567
568   if (ci->prev == NULL)
569     cache_queue_head = ci->next; /* reset head */
570   else
571     ci->prev->next = ci->next;
572
573   if (ci->next == NULL)
574     cache_queue_tail = ci->prev; /* reset the tail */
575   else
576     ci->next->prev = ci->prev;
577
578   ci->next = ci->prev = NULL;
579   ci->flags &= ~CI_FLAGS_IN_QUEUE;
580 } /* }}} static void remove_from_queue */
581
582 /* remove an entry from the tree and free all its resources.
583  * must hold 'cache lock' while calling this.
584  * returns 0 on success, otherwise errno */
585 static int forget_file(const char *file)
586 {
587   cache_item_t *ci;
588
589   ci = g_tree_lookup(cache_tree, file);
590   if (ci == NULL)
591     return ENOENT;
592
593   g_tree_remove (cache_tree, file);
594   remove_from_queue(ci);
595
596   for (int i=0; i < ci->values_num; i++)
597     free(ci->values[i]);
598
599   free (ci->values);
600   free (ci->file);
601
602   /* in case anyone is waiting */
603   pthread_cond_broadcast(&ci->flushed);
604
605   free (ci);
606
607   return 0;
608 } /* }}} static int forget_file */
609
610 /*
611  * enqueue_cache_item:
612  * `cache_lock' must be acquired before calling this function!
613  */
614 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
615     queue_side_t side)
616 {
617   if (ci == NULL)
618     return (-1);
619
620   if (ci->values_num == 0)
621     return (0);
622
623   if (side == HEAD)
624   {
625     if (cache_queue_head == ci)
626       return 0;
627
628     /* remove from the double linked list */
629     if (ci->flags & CI_FLAGS_IN_QUEUE)
630       remove_from_queue(ci);
631
632     ci->prev = NULL;
633     ci->next = cache_queue_head;
634     if (ci->next != NULL)
635       ci->next->prev = ci;
636     cache_queue_head = ci;
637
638     if (cache_queue_tail == NULL)
639       cache_queue_tail = cache_queue_head;
640   }
641   else /* (side == TAIL) */
642   {
643     /* We don't move values back in the list.. */
644     if (ci->flags & CI_FLAGS_IN_QUEUE)
645       return (0);
646
647     assert (ci->next == NULL);
648     assert (ci->prev == NULL);
649
650     ci->prev = cache_queue_tail;
651
652     if (cache_queue_tail == NULL)
653       cache_queue_head = ci;
654     else
655       cache_queue_tail->next = ci;
656
657     cache_queue_tail = ci;
658   }
659
660   ci->flags |= CI_FLAGS_IN_QUEUE;
661
662   pthread_cond_broadcast(&cache_cond);
663   pthread_mutex_lock (&stats_lock);
664   stats_queue_length++;
665   pthread_mutex_unlock (&stats_lock);
666
667   return (0);
668 } /* }}} int enqueue_cache_item */
669
670 /*
671  * tree_callback_flush:
672  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
673  * while this is in progress.
674  */
675 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
676     gpointer data)
677 {
678   cache_item_t *ci;
679   callback_flush_data_t *cfd;
680
681   ci = (cache_item_t *) value;
682   cfd = (callback_flush_data_t *) data;
683
684   if ((ci->last_flush_time <= cfd->abs_timeout)
685       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
686       && (ci->values_num > 0))
687   {
688     enqueue_cache_item (ci, TAIL);
689   }
690   else if ((do_shutdown != 0)
691       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
692       && (ci->values_num > 0))
693   {
694     enqueue_cache_item (ci, TAIL);
695   }
696   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
697       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
698       && (ci->values_num <= 0))
699   {
700     char **temp;
701
702     temp = (char **) realloc (cfd->keys,
703         sizeof (char *) * (cfd->keys_num + 1));
704     if (temp == NULL)
705     {
706       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
707       return (FALSE);
708     }
709     cfd->keys = temp;
710     /* Make really sure this points to the _same_ place */
711     assert ((char *) key == ci->file);
712     cfd->keys[cfd->keys_num] = (char *) key;
713     cfd->keys_num++;
714   }
715
716   return (FALSE);
717 } /* }}} gboolean tree_callback_flush */
718
719 static int flush_old_values (int max_age)
720 {
721   callback_flush_data_t cfd;
722   size_t k;
723
724   memset (&cfd, 0, sizeof (cfd));
725   /* Pass the current time as user data so that we don't need to call
726    * `time' for each node. */
727   cfd.now = time (NULL);
728   cfd.keys = NULL;
729   cfd.keys_num = 0;
730
731   if (max_age > 0)
732     cfd.abs_timeout = cfd.now - max_age;
733   else
734     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
735
736   /* `tree_callback_flush' will return the keys of all values that haven't
737    * been touched in the last `config_flush_interval' seconds in `cfd'.
738    * The char*'s in this array point to the same memory as ci->file, so we
739    * don't need to free them separately. */
740   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
741
742   for (k = 0; k < cfd.keys_num; k++)
743   {
744     /* should never fail, since we have held the cache_lock
745      * the entire time */
746     assert( forget_file(cfd.keys[k]) == 0 );
747   }
748
749   if (cfd.keys != NULL)
750   {
751     free (cfd.keys);
752     cfd.keys = NULL;
753   }
754
755   return (0);
756 } /* int flush_old_values */
757
758 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
759 {
760   struct timeval now;
761   struct timespec next_flush;
762   int final_flush = 0; /* make sure we only flush once on shutdown */
763
764   gettimeofday (&now, NULL);
765   next_flush.tv_sec = now.tv_sec + config_flush_interval;
766   next_flush.tv_nsec = 1000 * now.tv_usec;
767
768   pthread_mutex_lock (&cache_lock);
769   while ((do_shutdown == 0) || (cache_queue_head != NULL))
770   {
771     cache_item_t *ci;
772     char *file;
773     char **values;
774     int values_num;
775     int status;
776     int i;
777
778     /* First, check if it's time to do the cache flush. */
779     gettimeofday (&now, NULL);
780     if ((now.tv_sec > next_flush.tv_sec)
781         || ((now.tv_sec == next_flush.tv_sec)
782           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
783     {
784       /* Flush all values that haven't been written in the last
785        * `config_write_interval' seconds. */
786       flush_old_values (config_write_interval);
787
788       /* Determine the time of the next cache flush. */
789       next_flush.tv_sec =
790         now.tv_sec + next_flush.tv_sec % config_flush_interval;
791
792       /* unlock the cache while we rotate so we don't block incoming
793        * updates if the fsync() blocks on disk I/O */
794       pthread_mutex_unlock(&cache_lock);
795       journal_rotate();
796       pthread_mutex_lock(&cache_lock);
797     }
798
799     /* Now, check if there's something to store away. If not, wait until
800      * something comes in or it's time to do the cache flush.  if we are
801      * shutting down, do not wait around.  */
802     if (cache_queue_head == NULL && !do_shutdown)
803     {
804       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
805       if ((status != 0) && (status != ETIMEDOUT))
806       {
807         RRDD_LOG (LOG_ERR, "queue_thread_main: "
808             "pthread_cond_timedwait returned %i.", status);
809       }
810     }
811
812     /* We're about to shut down */
813     if (do_shutdown != 0 && !final_flush++)
814     {
815       if (config_flush_at_shutdown)
816         flush_old_values (-1); /* flush everything */
817       else
818         break;
819     }
820
821     /* Check if a value has arrived. This may be NULL if we timed out or there
822      * was an interrupt such as a signal. */
823     if (cache_queue_head == NULL)
824       continue;
825
826     ci = cache_queue_head;
827
828     /* copy the relevant parts */
829     file = strdup (ci->file);
830     if (file == NULL)
831     {
832       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
833       continue;
834     }
835
836     assert(ci->values != NULL);
837     assert(ci->values_num > 0);
838
839     values = ci->values;
840     values_num = ci->values_num;
841
842     wipe_ci_values(ci, time(NULL));
843     remove_from_queue(ci);
844
845     pthread_mutex_lock (&stats_lock);
846     assert (stats_queue_length > 0);
847     stats_queue_length--;
848     pthread_mutex_unlock (&stats_lock);
849
850     pthread_mutex_unlock (&cache_lock);
851
852     rrd_clear_error ();
853     status = rrd_update_r (file, NULL, values_num, (void *) values);
854     if (status != 0)
855     {
856       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
857           "rrd_update_r (%s) failed with status %i. (%s)",
858           file, status, rrd_get_error());
859     }
860
861     journal_write("wrote", file);
862     pthread_cond_broadcast(&ci->flushed);
863
864     for (i = 0; i < values_num; i++)
865       free (values[i]);
866
867     free(values);
868     free(file);
869
870     if (status == 0)
871     {
872       pthread_mutex_lock (&stats_lock);
873       stats_updates_written++;
874       stats_data_sets_written += values_num;
875       pthread_mutex_unlock (&stats_lock);
876     }
877
878     pthread_mutex_lock (&cache_lock);
879
880     /* We're about to shut down */
881     if (do_shutdown != 0 && !final_flush++)
882     {
883       if (config_flush_at_shutdown)
884           flush_old_values (-1); /* flush everything */
885       else
886         break;
887     }
888   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
889   pthread_mutex_unlock (&cache_lock);
890
891   if (config_flush_at_shutdown)
892   {
893     assert(cache_queue_head == NULL);
894     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
895   }
896
897   journal_done();
898
899   return (NULL);
900 } /* }}} void *queue_thread_main */
901
902 static int buffer_get_field (char **buffer_ret, /* {{{ */
903     size_t *buffer_size_ret, char **field_ret)
904 {
905   char *buffer;
906   size_t buffer_pos;
907   size_t buffer_size;
908   char *field;
909   size_t field_size;
910   int status;
911
912   buffer = *buffer_ret;
913   buffer_pos = 0;
914   buffer_size = *buffer_size_ret;
915   field = *buffer_ret;
916   field_size = 0;
917
918   if (buffer_size <= 0)
919     return (-1);
920
921   /* This is ensured by `handle_request'. */
922   assert (buffer[buffer_size - 1] == '\0');
923
924   status = -1;
925   while (buffer_pos < buffer_size)
926   {
927     /* Check for end-of-field or end-of-buffer */
928     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
929     {
930       field[field_size] = 0;
931       field_size++;
932       buffer_pos++;
933       status = 0;
934       break;
935     }
936     /* Handle escaped characters. */
937     else if (buffer[buffer_pos] == '\\')
938     {
939       if (buffer_pos >= (buffer_size - 1))
940         break;
941       buffer_pos++;
942       field[field_size] = buffer[buffer_pos];
943       field_size++;
944       buffer_pos++;
945     }
946     /* Normal operation */ 
947     else
948     {
949       field[field_size] = buffer[buffer_pos];
950       field_size++;
951       buffer_pos++;
952     }
953   } /* while (buffer_pos < buffer_size) */
954
955   if (status != 0)
956     return (status);
957
958   *buffer_ret = buffer + buffer_pos;
959   *buffer_size_ret = buffer_size - buffer_pos;
960   *field_ret = field;
961
962   return (0);
963 } /* }}} int buffer_get_field */
964
965 /* if we're restricting writes to the base directory,
966  * check whether the file falls within the dir
967  * returns 1 if OK, otherwise 0
968  */
969 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
970 {
971   assert(file != NULL);
972
973   if (!config_write_base_only
974       || sock == NULL /* journal replay */
975       || config_base_dir == NULL)
976     return 1;
977
978   if (strstr(file, "../") != NULL) goto err;
979
980   /* relative paths without "../" are ok */
981   if (*file != '/') return 1;
982
983   /* file must be of the format base + "/" + <1+ char filename> */
984   if (strlen(file) < _config_base_dir_len + 2) goto err;
985   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
986   if (*(file + _config_base_dir_len) != '/') goto err;
987
988   return 1;
989
990 err:
991   if (sock != NULL && sock->fd >= 0)
992     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
993
994   return 0;
995 } /* }}} static int check_file_access */
996
997 /* when using a base dir, convert relative paths to absolute paths.
998  * if necessary, modifies the "filename" pointer to point
999  * to the new path created in "tmp".  "tmp" is provided
1000  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1001  *
1002  * this allows us to optimize for the expected case (absolute path)
1003  * with a no-op.
1004  */
1005 static void get_abs_path(char **filename, char *tmp)
1006 {
1007   assert(tmp != NULL);
1008   assert(filename != NULL && *filename != NULL);
1009
1010   if (config_base_dir == NULL || **filename == '/')
1011     return;
1012
1013   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1014   *filename = tmp;
1015 } /* }}} static int get_abs_path */
1016
1017 /* returns 1 if we have the required privilege level,
1018  * otherwise issue an error to the user on sock */
1019 static int has_privilege (listen_socket_t *sock, /* {{{ */
1020                           socket_privilege priv)
1021 {
1022   if (sock == NULL) /* journal replay */
1023     return 1;
1024
1025   if (sock->privilege >= priv)
1026     return 1;
1027
1028   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1029 } /* }}} static int has_privilege */
1030
1031 static int flush_file (const char *filename) /* {{{ */
1032 {
1033   cache_item_t *ci;
1034
1035   pthread_mutex_lock (&cache_lock);
1036
1037   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1038   if (ci == NULL)
1039   {
1040     pthread_mutex_unlock (&cache_lock);
1041     return (ENOENT);
1042   }
1043
1044   if (ci->values_num > 0)
1045   {
1046     /* Enqueue at head */
1047     enqueue_cache_item (ci, HEAD);
1048     pthread_cond_wait(&ci->flushed, &cache_lock);
1049   }
1050
1051   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1052    * may have been purged during our cond_wait() */
1053
1054   pthread_mutex_unlock(&cache_lock);
1055
1056   return (0);
1057 } /* }}} int flush_file */
1058
1059 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1060     char *buffer, size_t buffer_size)
1061 {
1062   int status;
1063   char **help_text;
1064   char *command;
1065
1066   char *help_help[2] =
1067   {
1068     "Command overview\n"
1069     ,
1070     "HELP [<command>]\n"
1071     "FLUSH <filename>\n"
1072     "FLUSHALL\n"
1073     "PENDING <filename>\n"
1074     "FORGET <filename>\n"
1075     "UPDATE <filename> <values> [<values> ...]\n"
1076     "BATCH\n"
1077     "STATS\n"
1078   };
1079
1080   char *help_flush[2] =
1081   {
1082     "Help for FLUSH\n"
1083     ,
1084     "Usage: FLUSH <filename>\n"
1085     "\n"
1086     "Adds the given filename to the head of the update queue and returns\n"
1087     "after is has been dequeued.\n"
1088   };
1089
1090   char *help_flushall[2] =
1091   {
1092     "Help for FLUSHALL\n"
1093     ,
1094     "Usage: FLUSHALL\n"
1095     "\n"
1096     "Triggers writing of all pending updates.  Returns immediately.\n"
1097   };
1098
1099   char *help_pending[2] =
1100   {
1101     "Help for PENDING\n"
1102     ,
1103     "Usage: PENDING <filename>\n"
1104     "\n"
1105     "Shows any 'pending' updates for a file, in order.\n"
1106     "The updates shown have not yet been written to the underlying RRD file.\n"
1107   };
1108
1109   char *help_forget[2] =
1110   {
1111     "Help for FORGET\n"
1112     ,
1113     "Usage: FORGET <filename>\n"
1114     "\n"
1115     "Removes the file completely from the cache.\n"
1116     "Any pending updates for the file will be lost.\n"
1117   };
1118
1119   char *help_update[2] =
1120   {
1121     "Help for UPDATE\n"
1122     ,
1123     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1124     "\n"
1125     "Adds the given file to the internal cache if it is not yet known and\n"
1126     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1127     "for details.\n"
1128     "\n"
1129     "Each <values> has the following form:\n"
1130     "  <values> = <time>:<value>[:<value>[...]]\n"
1131     "See the rrdupdate(1) manpage for details.\n"
1132   };
1133
1134   char *help_stats[2] =
1135   {
1136     "Help for STATS\n"
1137     ,
1138     "Usage: STATS\n"
1139     "\n"
1140     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1141     "a description of the values.\n"
1142   };
1143
1144   char *help_batch[2] =
1145   {
1146     "Help for BATCH\n"
1147     ,
1148     "The 'BATCH' command permits the client to initiate a bulk load\n"
1149     "   of commands to rrdcached.\n"
1150     "\n"
1151     "Usage:\n"
1152     "\n"
1153     "    client: BATCH\n"
1154     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1155     "    client: command #1\n"
1156     "    client: command #2\n"
1157     "    client: ... and so on\n"
1158     "    client: .\n"
1159     "    server: 2 errors\n"
1160     "    server: 7 message for command #7\n"
1161     "    server: 9 message for command #9\n"
1162     "\n"
1163     "For more information, consult the rrdcached(1) documentation.\n"
1164   };
1165
1166   status = buffer_get_field (&buffer, &buffer_size, &command);
1167   if (status != 0)
1168     help_text = help_help;
1169   else
1170   {
1171     if (strcasecmp (command, "update") == 0)
1172       help_text = help_update;
1173     else if (strcasecmp (command, "flush") == 0)
1174       help_text = help_flush;
1175     else if (strcasecmp (command, "flushall") == 0)
1176       help_text = help_flushall;
1177     else if (strcasecmp (command, "pending") == 0)
1178       help_text = help_pending;
1179     else if (strcasecmp (command, "forget") == 0)
1180       help_text = help_forget;
1181     else if (strcasecmp (command, "stats") == 0)
1182       help_text = help_stats;
1183     else if (strcasecmp (command, "batch") == 0)
1184       help_text = help_batch;
1185     else
1186       help_text = help_help;
1187   }
1188
1189   add_response_info(sock, help_text[1]);
1190   return send_response(sock, RESP_OK, help_text[0]);
1191 } /* }}} int handle_request_help */
1192
1193 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1194 {
1195   uint64_t copy_queue_length;
1196   uint64_t copy_updates_received;
1197   uint64_t copy_flush_received;
1198   uint64_t copy_updates_written;
1199   uint64_t copy_data_sets_written;
1200   uint64_t copy_journal_bytes;
1201   uint64_t copy_journal_rotate;
1202
1203   uint64_t tree_nodes_number;
1204   uint64_t tree_depth;
1205
1206   pthread_mutex_lock (&stats_lock);
1207   copy_queue_length       = stats_queue_length;
1208   copy_updates_received   = stats_updates_received;
1209   copy_flush_received     = stats_flush_received;
1210   copy_updates_written    = stats_updates_written;
1211   copy_data_sets_written  = stats_data_sets_written;
1212   copy_journal_bytes      = stats_journal_bytes;
1213   copy_journal_rotate     = stats_journal_rotate;
1214   pthread_mutex_unlock (&stats_lock);
1215
1216   pthread_mutex_lock (&cache_lock);
1217   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1218   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1219   pthread_mutex_unlock (&cache_lock);
1220
1221   add_response_info(sock,
1222                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1223   add_response_info(sock,
1224                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1225   add_response_info(sock,
1226                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1227   add_response_info(sock,
1228                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1229   add_response_info(sock,
1230                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1231   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1232   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1233   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1234   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1235
1236   send_response(sock, RESP_OK, "Statistics follow\n");
1237
1238   return (0);
1239 } /* }}} int handle_request_stats */
1240
1241 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1242     char *buffer, size_t buffer_size)
1243 {
1244   char *file, file_tmp[PATH_MAX];
1245   int status;
1246
1247   status = buffer_get_field (&buffer, &buffer_size, &file);
1248   if (status != 0)
1249   {
1250     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1251   }
1252   else
1253   {
1254     pthread_mutex_lock(&stats_lock);
1255     stats_flush_received++;
1256     pthread_mutex_unlock(&stats_lock);
1257
1258     get_abs_path(&file, file_tmp);
1259     if (!check_file_access(file, sock)) return 0;
1260
1261     status = flush_file (file);
1262     if (status == 0)
1263       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1264     else if (status == ENOENT)
1265     {
1266       /* no file in our tree; see whether it exists at all */
1267       struct stat statbuf;
1268
1269       memset(&statbuf, 0, sizeof(statbuf));
1270       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1271         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1272       else
1273         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1274     }
1275     else if (status < 0)
1276       return send_response(sock, RESP_ERR, "Internal error.\n");
1277     else
1278       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1279   }
1280
1281   /* NOTREACHED */
1282   assert(1==0);
1283 } /* }}} int handle_request_flush */
1284
1285 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1286 {
1287   int status;
1288
1289   status = has_privilege(sock, PRIV_HIGH);
1290   if (status <= 0)
1291     return status;
1292
1293   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1294
1295   pthread_mutex_lock(&cache_lock);
1296   flush_old_values(-1);
1297   pthread_mutex_unlock(&cache_lock);
1298
1299   return send_response(sock, RESP_OK, "Started flush.\n");
1300 } /* }}} static int handle_request_flushall */
1301
1302 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1303                                   char *buffer, size_t buffer_size)
1304 {
1305   int status;
1306   char *file, file_tmp[PATH_MAX];
1307   cache_item_t *ci;
1308
1309   status = buffer_get_field(&buffer, &buffer_size, &file);
1310   if (status != 0)
1311     return send_response(sock, RESP_ERR,
1312                          "Usage: PENDING <filename>\n");
1313
1314   status = has_privilege(sock, PRIV_HIGH);
1315   if (status <= 0)
1316     return status;
1317
1318   get_abs_path(&file, file_tmp);
1319
1320   pthread_mutex_lock(&cache_lock);
1321   ci = g_tree_lookup(cache_tree, file);
1322   if (ci == NULL)
1323   {
1324     pthread_mutex_unlock(&cache_lock);
1325     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1326   }
1327
1328   for (int i=0; i < ci->values_num; i++)
1329     add_response_info(sock, "%s\n", ci->values[i]);
1330
1331   pthread_mutex_unlock(&cache_lock);
1332   return send_response(sock, RESP_OK, "updates pending\n");
1333 } /* }}} static int handle_request_pending */
1334
1335 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1336                                  char *buffer, size_t buffer_size)
1337 {
1338   int status;
1339   char *file, file_tmp[PATH_MAX];
1340
1341   status = buffer_get_field(&buffer, &buffer_size, &file);
1342   if (status != 0)
1343     return send_response(sock, RESP_ERR,
1344                          "Usage: FORGET <filename>\n");
1345
1346   status = has_privilege(sock, PRIV_HIGH);
1347   if (status <= 0)
1348     return status;
1349
1350   get_abs_path(&file, file_tmp);
1351   if (!check_file_access(file, sock)) return 0;
1352
1353   pthread_mutex_lock(&cache_lock);
1354   status = forget_file(file);
1355   pthread_mutex_unlock(&cache_lock);
1356
1357   if (status == 0)
1358   {
1359     if (sock != NULL)
1360       journal_write("forget", file);
1361
1362     return send_response(sock, RESP_OK, "Gone!\n");
1363   }
1364   else
1365     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1366                          status < 0 ? "Internal error" : rrd_strerror(status));
1367
1368   /* NOTREACHED */
1369   assert(1==0);
1370 } /* }}} static int handle_request_forget */
1371
1372 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1373                                   time_t now,
1374                                   char *buffer, size_t buffer_size)
1375 {
1376   char *file, file_tmp[PATH_MAX];
1377   int values_num = 0;
1378   int bad_timestamps = 0;
1379   int status;
1380   char orig_buf[CMD_MAX];
1381
1382   cache_item_t *ci;
1383
1384   status = has_privilege(sock, PRIV_HIGH);
1385   if (status <= 0)
1386     return status;
1387
1388   /* save it for the journal later */
1389   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1390
1391   status = buffer_get_field (&buffer, &buffer_size, &file);
1392   if (status != 0)
1393     return send_response(sock, RESP_ERR,
1394                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1395
1396   pthread_mutex_lock(&stats_lock);
1397   stats_updates_received++;
1398   pthread_mutex_unlock(&stats_lock);
1399
1400   get_abs_path(&file, file_tmp);
1401   if (!check_file_access(file, sock)) return 0;
1402
1403   pthread_mutex_lock (&cache_lock);
1404   ci = g_tree_lookup (cache_tree, file);
1405
1406   if (ci == NULL) /* {{{ */
1407   {
1408     struct stat statbuf;
1409
1410     /* don't hold the lock while we setup; stat(2) might block */
1411     pthread_mutex_unlock(&cache_lock);
1412
1413     memset (&statbuf, 0, sizeof (statbuf));
1414     status = stat (file, &statbuf);
1415     if (status != 0)
1416     {
1417       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1418
1419       status = errno;
1420       if (status == ENOENT)
1421         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1422       else
1423         return send_response(sock, RESP_ERR,
1424                              "stat failed with error %i.\n", status);
1425     }
1426     if (!S_ISREG (statbuf.st_mode))
1427       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1428
1429     if (access(file, R_OK|W_OK) != 0)
1430       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1431                            file, rrd_strerror(errno));
1432
1433     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1434     if (ci == NULL)
1435     {
1436       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1437
1438       return send_response(sock, RESP_ERR, "malloc failed.\n");
1439     }
1440     memset (ci, 0, sizeof (cache_item_t));
1441
1442     ci->file = strdup (file);
1443     if (ci->file == NULL)
1444     {
1445       free (ci);
1446       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1447
1448       return send_response(sock, RESP_ERR, "strdup failed.\n");
1449     }
1450
1451     wipe_ci_values(ci, now);
1452     ci->flags = CI_FLAGS_IN_TREE;
1453
1454     pthread_mutex_lock(&cache_lock);
1455     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1456   } /* }}} */
1457   assert (ci != NULL);
1458
1459   /* don't re-write updates in replay mode */
1460   if (sock != NULL)
1461     journal_write("update", orig_buf);
1462
1463   while (buffer_size > 0)
1464   {
1465     char **temp;
1466     char *value;
1467     time_t stamp;
1468     char *eostamp;
1469
1470     status = buffer_get_field (&buffer, &buffer_size, &value);
1471     if (status != 0)
1472     {
1473       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1474       break;
1475     }
1476
1477     /* make sure update time is always moving forward */
1478     stamp = strtol(value, &eostamp, 10);
1479     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1480     {
1481       ++bad_timestamps;
1482       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1483       continue;
1484     }
1485     else if (stamp <= ci->last_update_stamp)
1486     {
1487       ++bad_timestamps;
1488       add_response_info(sock,
1489                         "illegal attempt to update using time %ld when"
1490                         " last update time is %ld (minimum one second step)\n",
1491                         stamp, ci->last_update_stamp);
1492       continue;
1493     }
1494     else
1495       ci->last_update_stamp = stamp;
1496
1497     temp = (char **) realloc (ci->values,
1498         sizeof (char *) * (ci->values_num + 1));
1499     if (temp == NULL)
1500     {
1501       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1502       continue;
1503     }
1504     ci->values = temp;
1505
1506     ci->values[ci->values_num] = strdup (value);
1507     if (ci->values[ci->values_num] == NULL)
1508     {
1509       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1510       continue;
1511     }
1512     ci->values_num++;
1513
1514     values_num++;
1515   }
1516
1517   if (((now - ci->last_flush_time) >= config_write_interval)
1518       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1519       && (ci->values_num > 0))
1520   {
1521     enqueue_cache_item (ci, TAIL);
1522   }
1523
1524   pthread_mutex_unlock (&cache_lock);
1525
1526   if (values_num < 1)
1527   {
1528     /* if we had only one update attempt, then return the full
1529        error message... try to get the most information out
1530        of the limited error space allowed by the protocol
1531     */
1532     if (bad_timestamps == 1)
1533       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1534     else
1535       return send_response(sock, RESP_ERR,
1536                            "No values updated (%d bad timestamps).\n",
1537                            bad_timestamps);
1538   }
1539   else
1540     return send_response(sock, RESP_OK,
1541                          "errors, enqueued %i value(s).\n", values_num);
1542
1543   /* NOTREACHED */
1544   assert(1==0);
1545
1546 } /* }}} int handle_request_update */
1547
1548 /* we came across a "WROTE" entry during journal replay.
1549  * throw away any values that we have accumulated for this file
1550  */
1551 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1552 {
1553   int i;
1554   cache_item_t *ci;
1555   const char *file = buffer;
1556
1557   pthread_mutex_lock(&cache_lock);
1558
1559   ci = g_tree_lookup(cache_tree, file);
1560   if (ci == NULL)
1561   {
1562     pthread_mutex_unlock(&cache_lock);
1563     return (0);
1564   }
1565
1566   if (ci->values)
1567   {
1568     for (i=0; i < ci->values_num; i++)
1569       free(ci->values[i]);
1570
1571     free(ci->values);
1572   }
1573
1574   wipe_ci_values(ci, now);
1575   remove_from_queue(ci);
1576
1577   pthread_mutex_unlock(&cache_lock);
1578   return (0);
1579 } /* }}} int handle_request_wrote */
1580
1581 /* start "BATCH" processing */
1582 static int batch_start (listen_socket_t *sock) /* {{{ */
1583 {
1584   int status;
1585   if (sock->batch_start)
1586     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1587
1588   status = send_response(sock, RESP_OK,
1589                          "Go ahead.  End with dot '.' on its own line.\n");
1590   sock->batch_start = time(NULL);
1591   sock->batch_cmd = 0;
1592
1593   return status;
1594 } /* }}} static int batch_start */
1595
1596 /* finish "BATCH" processing and return results to the client */
1597 static int batch_done (listen_socket_t *sock) /* {{{ */
1598 {
1599   assert(sock->batch_start);
1600   sock->batch_start = 0;
1601   sock->batch_cmd  = 0;
1602   return send_response(sock, RESP_OK, "errors\n");
1603 } /* }}} static int batch_done */
1604
1605 /* if sock==NULL, we are in journal replay mode */
1606 static int handle_request (listen_socket_t *sock, /* {{{ */
1607                            time_t now,
1608                            char *buffer, size_t buffer_size)
1609 {
1610   char *buffer_ptr;
1611   char *command;
1612   int status;
1613
1614   assert (buffer[buffer_size - 1] == '\0');
1615
1616   buffer_ptr = buffer;
1617   command = NULL;
1618   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1619   if (status != 0)
1620   {
1621     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1622     return (-1);
1623   }
1624
1625   if (sock != NULL && sock->batch_start)
1626     sock->batch_cmd++;
1627
1628   if (strcasecmp (command, "update") == 0)
1629     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1630   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1631   {
1632     /* this is only valid in replay mode */
1633     return (handle_request_wrote (buffer_ptr, now));
1634   }
1635   else if (strcasecmp (command, "flush") == 0)
1636     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1637   else if (strcasecmp (command, "flushall") == 0)
1638     return (handle_request_flushall(sock));
1639   else if (strcasecmp (command, "pending") == 0)
1640     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1641   else if (strcasecmp (command, "forget") == 0)
1642     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1643   else if (strcasecmp (command, "stats") == 0)
1644     return (handle_request_stats (sock));
1645   else if (strcasecmp (command, "help") == 0)
1646     return (handle_request_help (sock, buffer_ptr, buffer_size));
1647   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1648     return batch_start(sock);
1649   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1650     return batch_done(sock);
1651   else
1652     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1653
1654   /* NOTREACHED */
1655   assert(1==0);
1656 } /* }}} int handle_request */
1657
1658 /* MUST NOT hold journal_lock before calling this */
1659 static void journal_rotate(void) /* {{{ */
1660 {
1661   FILE *old_fh = NULL;
1662   int new_fd;
1663
1664   if (journal_cur == NULL || journal_old == NULL)
1665     return;
1666
1667   pthread_mutex_lock(&journal_lock);
1668
1669   /* we rotate this way (rename before close) so that the we can release
1670    * the journal lock as fast as possible.  Journal writes to the new
1671    * journal can proceed immediately after the new file is opened.  The
1672    * fclose can then block without affecting new updates.
1673    */
1674   if (journal_fh != NULL)
1675   {
1676     old_fh = journal_fh;
1677     journal_fh = NULL;
1678     rename(journal_cur, journal_old);
1679     ++stats_journal_rotate;
1680   }
1681
1682   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1683                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1684   if (new_fd >= 0)
1685   {
1686     journal_fh = fdopen(new_fd, "a");
1687     if (journal_fh == NULL)
1688       close(new_fd);
1689   }
1690
1691   pthread_mutex_unlock(&journal_lock);
1692
1693   if (old_fh != NULL)
1694     fclose(old_fh);
1695
1696   if (journal_fh == NULL)
1697   {
1698     RRDD_LOG(LOG_CRIT,
1699              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1700              journal_cur, rrd_strerror(errno));
1701
1702     RRDD_LOG(LOG_ERR,
1703              "JOURNALING DISABLED: All values will be flushed at shutdown");
1704     config_flush_at_shutdown = 1;
1705   }
1706
1707 } /* }}} static void journal_rotate */
1708
1709 static void journal_done(void) /* {{{ */
1710 {
1711   if (journal_cur == NULL)
1712     return;
1713
1714   pthread_mutex_lock(&journal_lock);
1715   if (journal_fh != NULL)
1716   {
1717     fclose(journal_fh);
1718     journal_fh = NULL;
1719   }
1720
1721   if (config_flush_at_shutdown)
1722   {
1723     RRDD_LOG(LOG_INFO, "removing journals");
1724     unlink(journal_old);
1725     unlink(journal_cur);
1726   }
1727   else
1728   {
1729     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1730              "journals will be used at next startup");
1731   }
1732
1733   pthread_mutex_unlock(&journal_lock);
1734
1735 } /* }}} static void journal_done */
1736
1737 static int journal_write(char *cmd, char *args) /* {{{ */
1738 {
1739   int chars;
1740
1741   if (journal_fh == NULL)
1742     return 0;
1743
1744   pthread_mutex_lock(&journal_lock);
1745   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1746   pthread_mutex_unlock(&journal_lock);
1747
1748   if (chars > 0)
1749   {
1750     pthread_mutex_lock(&stats_lock);
1751     stats_journal_bytes += chars;
1752     pthread_mutex_unlock(&stats_lock);
1753   }
1754
1755   return chars;
1756 } /* }}} static int journal_write */
1757
1758 static int journal_replay (const char *file) /* {{{ */
1759 {
1760   FILE *fh;
1761   int entry_cnt = 0;
1762   int fail_cnt = 0;
1763   uint64_t line = 0;
1764   char entry[CMD_MAX];
1765   time_t now;
1766
1767   if (file == NULL) return 0;
1768
1769   {
1770     char *reason;
1771     int status = 0;
1772     struct stat statbuf;
1773
1774     memset(&statbuf, 0, sizeof(statbuf));
1775     if (stat(file, &statbuf) != 0)
1776     {
1777       if (errno == ENOENT)
1778         return 0;
1779
1780       reason = "stat error";
1781       status = errno;
1782     }
1783     else if (!S_ISREG(statbuf.st_mode))
1784     {
1785       reason = "not a regular file";
1786       status = EPERM;
1787     }
1788     if (statbuf.st_uid != daemon_uid)
1789     {
1790       reason = "not owned by daemon user";
1791       status = EACCES;
1792     }
1793     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1794     {
1795       reason = "must not be user/group writable";
1796       status = EACCES;
1797     }
1798
1799     if (status != 0)
1800     {
1801       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1802                file, rrd_strerror(status), reason);
1803       return 0;
1804     }
1805   }
1806
1807   fh = fopen(file, "r");
1808   if (fh == NULL)
1809   {
1810     if (errno != ENOENT)
1811       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1812                file, rrd_strerror(errno));
1813     return 0;
1814   }
1815   else
1816     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1817
1818   now = time(NULL);
1819
1820   while(!feof(fh))
1821   {
1822     size_t entry_len;
1823
1824     ++line;
1825     if (fgets(entry, sizeof(entry), fh) == NULL)
1826       break;
1827     entry_len = strlen(entry);
1828
1829     /* check \n termination in case journal writing crashed mid-line */
1830     if (entry_len == 0)
1831       continue;
1832     else if (entry[entry_len - 1] != '\n')
1833     {
1834       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1835       ++fail_cnt;
1836       continue;
1837     }
1838
1839     entry[entry_len - 1] = '\0';
1840
1841     if (handle_request(NULL, now, entry, entry_len) == 0)
1842       ++entry_cnt;
1843     else
1844       ++fail_cnt;
1845   }
1846
1847   fclose(fh);
1848
1849   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1850            entry_cnt, fail_cnt);
1851
1852   return entry_cnt > 0 ? 1 : 0;
1853 } /* }}} static int journal_replay */
1854
1855 static void journal_init(void) /* {{{ */
1856 {
1857   int had_journal = 0;
1858
1859   if (journal_cur == NULL) return;
1860
1861   pthread_mutex_lock(&journal_lock);
1862
1863   RRDD_LOG(LOG_INFO, "checking for journal files");
1864
1865   had_journal += journal_replay(journal_old);
1866   had_journal += journal_replay(journal_cur);
1867
1868   /* it must have been a crash.  start a flush */
1869   if (had_journal && config_flush_at_shutdown)
1870     flush_old_values(-1);
1871
1872   pthread_mutex_unlock(&journal_lock);
1873   journal_rotate();
1874
1875   RRDD_LOG(LOG_INFO, "journal processing complete");
1876
1877 } /* }}} static void journal_init */
1878
1879 static void close_connection(listen_socket_t *sock)
1880 {
1881   close(sock->fd) ;  sock->fd   = -1;
1882   free(sock->rbuf);  sock->rbuf = NULL;
1883   free(sock->wbuf);  sock->wbuf = NULL;
1884
1885   free(sock);
1886 }
1887
1888 static void *connection_thread_main (void *args) /* {{{ */
1889 {
1890   pthread_t self;
1891   listen_socket_t *sock;
1892   int i;
1893   int fd;
1894
1895   sock = (listen_socket_t *) args;
1896   fd = sock->fd;
1897
1898   /* init read buffers */
1899   sock->next_read = sock->next_cmd = 0;
1900   sock->rbuf = malloc(RBUF_SIZE);
1901   if (sock->rbuf == NULL)
1902   {
1903     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1904     close_connection(sock);
1905     return NULL;
1906   }
1907
1908   pthread_mutex_lock (&connection_threads_lock);
1909   {
1910     pthread_t *temp;
1911
1912     temp = (pthread_t *) realloc (connection_threads,
1913         sizeof (pthread_t) * (connection_threads_num + 1));
1914     if (temp == NULL)
1915     {
1916       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1917     }
1918     else
1919     {
1920       connection_threads = temp;
1921       connection_threads[connection_threads_num] = pthread_self ();
1922       connection_threads_num++;
1923     }
1924   }
1925   pthread_mutex_unlock (&connection_threads_lock);
1926
1927   while (do_shutdown == 0)
1928   {
1929     char *cmd;
1930     ssize_t cmd_len;
1931     ssize_t rbytes;
1932     time_t now;
1933
1934     struct pollfd pollfd;
1935     int status;
1936
1937     pollfd.fd = fd;
1938     pollfd.events = POLLIN | POLLPRI;
1939     pollfd.revents = 0;
1940
1941     status = poll (&pollfd, 1, /* timeout = */ 500);
1942     if (do_shutdown)
1943       break;
1944     else if (status == 0) /* timeout */
1945       continue;
1946     else if (status < 0) /* error */
1947     {
1948       status = errno;
1949       if (status != EINTR)
1950         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1951       continue;
1952     }
1953
1954     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1955       break;
1956     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1957     {
1958       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1959           "poll(2) returned something unexpected: %#04hx",
1960           pollfd.revents);
1961       break;
1962     }
1963
1964     rbytes = read(fd, sock->rbuf + sock->next_read,
1965                   RBUF_SIZE - sock->next_read);
1966     if (rbytes < 0)
1967     {
1968       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1969       break;
1970     }
1971     else if (rbytes == 0)
1972       break; /* eof */
1973
1974     sock->next_read += rbytes;
1975
1976     if (sock->batch_start)
1977       now = sock->batch_start;
1978     else
1979       now = time(NULL);
1980
1981     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1982     {
1983       status = handle_request (sock, now, cmd, cmd_len+1);
1984       if (status != 0)
1985         goto out_close;
1986     }
1987   }
1988
1989 out_close:
1990   close_connection(sock);
1991
1992   self = pthread_self ();
1993   /* Remove this thread from the connection threads list */
1994   pthread_mutex_lock (&connection_threads_lock);
1995   /* Find out own index in the array */
1996   for (i = 0; i < connection_threads_num; i++)
1997     if (pthread_equal (connection_threads[i], self) != 0)
1998       break;
1999   assert (i < connection_threads_num);
2000
2001   /* Move the trailing threads forward. */
2002   if (i < (connection_threads_num - 1))
2003   {
2004     memmove (connection_threads + i,
2005         connection_threads + i + 1,
2006         sizeof (pthread_t) * (connection_threads_num - i - 1));
2007   }
2008
2009   connection_threads_num--;
2010   pthread_mutex_unlock (&connection_threads_lock);
2011
2012   return (NULL);
2013 } /* }}} void *connection_thread_main */
2014
2015 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2016 {
2017   int fd;
2018   struct sockaddr_un sa;
2019   listen_socket_t *temp;
2020   int status;
2021   const char *path;
2022
2023   path = sock->addr;
2024   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2025     path += strlen("unix:");
2026
2027   temp = (listen_socket_t *) realloc (listen_fds,
2028       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2029   if (temp == NULL)
2030   {
2031     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
2032     return (-1);
2033   }
2034   listen_fds = temp;
2035   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2036
2037   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2038   if (fd < 0)
2039   {
2040     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
2041     return (-1);
2042   }
2043
2044   memset (&sa, 0, sizeof (sa));
2045   sa.sun_family = AF_UNIX;
2046   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2047
2048   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2049   if (status != 0)
2050   {
2051     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
2052     close (fd);
2053     unlink (path);
2054     return (-1);
2055   }
2056
2057   status = listen (fd, /* backlog = */ 10);
2058   if (status != 0)
2059   {
2060     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
2061     close (fd);
2062     unlink (path);
2063     return (-1);
2064   }
2065
2066   listen_fds[listen_fds_num].fd = fd;
2067   listen_fds[listen_fds_num].family = PF_UNIX;
2068   strncpy(listen_fds[listen_fds_num].addr, path,
2069           sizeof (listen_fds[listen_fds_num].addr) - 1);
2070   listen_fds_num++;
2071
2072   return (0);
2073 } /* }}} int open_listen_socket_unix */
2074
2075 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2076 {
2077   struct addrinfo ai_hints;
2078   struct addrinfo *ai_res;
2079   struct addrinfo *ai_ptr;
2080   char addr_copy[NI_MAXHOST];
2081   char *addr;
2082   char *port;
2083   int status;
2084
2085   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2086   addr_copy[sizeof (addr_copy) - 1] = 0;
2087   addr = addr_copy;
2088
2089   memset (&ai_hints, 0, sizeof (ai_hints));
2090   ai_hints.ai_flags = 0;
2091 #ifdef AI_ADDRCONFIG
2092   ai_hints.ai_flags |= AI_ADDRCONFIG;
2093 #endif
2094   ai_hints.ai_family = AF_UNSPEC;
2095   ai_hints.ai_socktype = SOCK_STREAM;
2096
2097   port = NULL;
2098   if (*addr == '[') /* IPv6+port format */
2099   {
2100     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2101     addr++;
2102
2103     port = strchr (addr, ']');
2104     if (port == NULL)
2105     {
2106       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2107           sock->addr);
2108       return (-1);
2109     }
2110     *port = 0;
2111     port++;
2112
2113     if (*port == ':')
2114       port++;
2115     else if (*port == 0)
2116       port = NULL;
2117     else
2118     {
2119       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2120           port);
2121       return (-1);
2122     }
2123   } /* if (*addr = ']') */
2124   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2125   {
2126     port = rindex(addr, ':');
2127     if (port != NULL)
2128     {
2129       *port = 0;
2130       port++;
2131     }
2132   }
2133   ai_res = NULL;
2134   status = getaddrinfo (addr,
2135                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2136                         &ai_hints, &ai_res);
2137   if (status != 0)
2138   {
2139     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2140         "%s", addr, gai_strerror (status));
2141     return (-1);
2142   }
2143
2144   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2145   {
2146     int fd;
2147     listen_socket_t *temp;
2148     int one = 1;
2149
2150     temp = (listen_socket_t *) realloc (listen_fds,
2151         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2152     if (temp == NULL)
2153     {
2154       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2155       continue;
2156     }
2157     listen_fds = temp;
2158     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2159
2160     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2161     if (fd < 0)
2162     {
2163       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2164       continue;
2165     }
2166
2167     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2168
2169     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2170     if (status != 0)
2171     {
2172       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2173       close (fd);
2174       continue;
2175     }
2176
2177     status = listen (fd, /* backlog = */ 10);
2178     if (status != 0)
2179     {
2180       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2181       close (fd);
2182       return (-1);
2183     }
2184
2185     listen_fds[listen_fds_num].fd = fd;
2186     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2187     listen_fds_num++;
2188   } /* for (ai_ptr) */
2189
2190   return (0);
2191 } /* }}} static int open_listen_socket_network */
2192
2193 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2194 {
2195   assert(sock != NULL);
2196   assert(sock->addr != NULL);
2197
2198   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2199       || sock->addr[0] == '/')
2200     return (open_listen_socket_unix(sock));
2201   else
2202     return (open_listen_socket_network(sock));
2203 } /* }}} int open_listen_socket */
2204
2205 static int close_listen_sockets (void) /* {{{ */
2206 {
2207   size_t i;
2208
2209   for (i = 0; i < listen_fds_num; i++)
2210   {
2211     close (listen_fds[i].fd);
2212
2213     if (listen_fds[i].family == PF_UNIX)
2214       unlink(listen_fds[i].addr);
2215   }
2216
2217   free (listen_fds);
2218   listen_fds = NULL;
2219   listen_fds_num = 0;
2220
2221   return (0);
2222 } /* }}} int close_listen_sockets */
2223
2224 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2225 {
2226   struct pollfd *pollfds;
2227   int pollfds_num;
2228   int status;
2229   int i;
2230
2231   for (i = 0; i < config_listen_address_list_len; i++)
2232     open_listen_socket (config_listen_address_list[i]);
2233
2234   if (config_listen_address_list_len < 1)
2235   {
2236     listen_socket_t sock;
2237     memset(&sock, 0, sizeof(sock));
2238     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2239     open_listen_socket (&sock);
2240   }
2241
2242   if (listen_fds_num < 1)
2243   {
2244     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2245         "could be opened. Sorry.");
2246     return (NULL);
2247   }
2248
2249   pollfds_num = listen_fds_num;
2250   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2251   if (pollfds == NULL)
2252   {
2253     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2254     return (NULL);
2255   }
2256   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2257
2258   RRDD_LOG(LOG_INFO, "listening for connections");
2259
2260   while (do_shutdown == 0)
2261   {
2262     assert (pollfds_num == ((int) listen_fds_num));
2263     for (i = 0; i < pollfds_num; i++)
2264     {
2265       pollfds[i].fd = listen_fds[i].fd;
2266       pollfds[i].events = POLLIN | POLLPRI;
2267       pollfds[i].revents = 0;
2268     }
2269
2270     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2271     if (do_shutdown)
2272       break;
2273     else if (status == 0) /* timeout */
2274       continue;
2275     else if (status < 0) /* error */
2276     {
2277       status = errno;
2278       if (status != EINTR)
2279       {
2280         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2281       }
2282       continue;
2283     }
2284
2285     for (i = 0; i < pollfds_num; i++)
2286     {
2287       listen_socket_t *client_sock;
2288       struct sockaddr_storage client_sa;
2289       socklen_t client_sa_size;
2290       pthread_t tid;
2291       pthread_attr_t attr;
2292
2293       if (pollfds[i].revents == 0)
2294         continue;
2295
2296       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2297       {
2298         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2299             "poll(2) returned something unexpected for listen FD #%i.",
2300             pollfds[i].fd);
2301         continue;
2302       }
2303
2304       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2305       if (client_sock == NULL)
2306       {
2307         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2308         continue;
2309       }
2310       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2311
2312       client_sa_size = sizeof (client_sa);
2313       client_sock->fd = accept (pollfds[i].fd,
2314           (struct sockaddr *) &client_sa, &client_sa_size);
2315       if (client_sock->fd < 0)
2316       {
2317         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2318         free(client_sock);
2319         continue;
2320       }
2321
2322       pthread_attr_init (&attr);
2323       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2324
2325       status = pthread_create (&tid, &attr, connection_thread_main,
2326                                client_sock);
2327       if (status != 0)
2328       {
2329         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2330         close_connection(client_sock);
2331         continue;
2332       }
2333     } /* for (pollfds_num) */
2334   } /* while (do_shutdown == 0) */
2335
2336   RRDD_LOG(LOG_INFO, "starting shutdown");
2337
2338   close_listen_sockets ();
2339
2340   pthread_mutex_lock (&connection_threads_lock);
2341   while (connection_threads_num > 0)
2342   {
2343     pthread_t wait_for;
2344
2345     wait_for = connection_threads[0];
2346
2347     pthread_mutex_unlock (&connection_threads_lock);
2348     pthread_join (wait_for, /* retval = */ NULL);
2349     pthread_mutex_lock (&connection_threads_lock);
2350   }
2351   pthread_mutex_unlock (&connection_threads_lock);
2352
2353   return (NULL);
2354 } /* }}} void *listen_thread_main */
2355
2356 static int daemonize (void) /* {{{ */
2357 {
2358   int status;
2359   int pid_fd;
2360   char *base_dir;
2361
2362   daemon_uid = geteuid();
2363
2364   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2365   if (pid_fd < 0)
2366     pid_fd = check_pidfile();
2367   if (pid_fd < 0)
2368     return pid_fd;
2369
2370   if (!stay_foreground)
2371   {
2372     pid_t child;
2373
2374     child = fork ();
2375     if (child < 0)
2376     {
2377       fprintf (stderr, "daemonize: fork(2) failed.\n");
2378       return (-1);
2379     }
2380     else if (child > 0)
2381     {
2382       return (1);
2383     }
2384
2385     /* Become session leader */
2386     setsid ();
2387
2388     /* Open the first three file descriptors to /dev/null */
2389     close (2);
2390     close (1);
2391     close (0);
2392
2393     open ("/dev/null", O_RDWR);
2394     dup (0);
2395     dup (0);
2396   } /* if (!stay_foreground) */
2397
2398   /* Change into the /tmp directory. */
2399   base_dir = (config_base_dir != NULL)
2400     ? config_base_dir
2401     : "/tmp";
2402   status = chdir (base_dir);
2403   if (status != 0)
2404   {
2405     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2406     return (-1);
2407   }
2408
2409   install_signal_handlers();
2410
2411   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2412   RRDD_LOG(LOG_INFO, "starting up");
2413
2414   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2415   if (cache_tree == NULL)
2416   {
2417     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2418     return (-1);
2419   }
2420
2421   status = write_pidfile (pid_fd);
2422   return status;
2423 } /* }}} int daemonize */
2424
2425 static int cleanup (void) /* {{{ */
2426 {
2427   do_shutdown++;
2428
2429   pthread_cond_signal (&cache_cond);
2430   pthread_join (queue_thread, /* return = */ NULL);
2431
2432   remove_pidfile ();
2433
2434   RRDD_LOG(LOG_INFO, "goodbye");
2435   closelog ();
2436
2437   return (0);
2438 } /* }}} int cleanup */
2439
2440 static int read_options (int argc, char **argv) /* {{{ */
2441 {
2442   int option;
2443   int status = 0;
2444
2445   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2446   {
2447     switch (option)
2448     {
2449       case 'g':
2450         stay_foreground=1;
2451         break;
2452
2453       case 'L':
2454       case 'l':
2455       {
2456         listen_socket_t **temp;
2457         listen_socket_t *new;
2458
2459         new = malloc(sizeof(listen_socket_t));
2460         if (new == NULL)
2461         {
2462           fprintf(stderr, "read_options: malloc failed.\n");
2463           return(2);
2464         }
2465         memset(new, 0, sizeof(listen_socket_t));
2466
2467         temp = (listen_socket_t **) realloc (config_listen_address_list,
2468             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2469         if (temp == NULL)
2470         {
2471           fprintf (stderr, "read_options: realloc failed.\n");
2472           return (2);
2473         }
2474         config_listen_address_list = temp;
2475
2476         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2477         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2478
2479         temp[config_listen_address_list_len] = new;
2480         config_listen_address_list_len++;
2481       }
2482       break;
2483
2484       case 'f':
2485       {
2486         int temp;
2487
2488         temp = atoi (optarg);
2489         if (temp > 0)
2490           config_flush_interval = temp;
2491         else
2492         {
2493           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2494           status = 3;
2495         }
2496       }
2497       break;
2498
2499       case 'w':
2500       {
2501         int temp;
2502
2503         temp = atoi (optarg);
2504         if (temp > 0)
2505           config_write_interval = temp;
2506         else
2507         {
2508           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2509           status = 2;
2510         }
2511       }
2512       break;
2513
2514       case 'z':
2515       {
2516         int temp;
2517
2518         temp = atoi(optarg);
2519         if (temp > 0)
2520           config_write_jitter = temp;
2521         else
2522         {
2523           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2524           status = 2;
2525         }
2526
2527         break;
2528       }
2529
2530       case 'B':
2531         config_write_base_only = 1;
2532         break;
2533
2534       case 'b':
2535       {
2536         size_t len;
2537         char base_realpath[PATH_MAX];
2538
2539         if (config_base_dir != NULL)
2540           free (config_base_dir);
2541         config_base_dir = strdup (optarg);
2542         if (config_base_dir == NULL)
2543         {
2544           fprintf (stderr, "read_options: strdup failed.\n");
2545           return (3);
2546         }
2547
2548         /* make sure that the base directory is not resolved via
2549          * symbolic links.  this makes some performance-enhancing
2550          * assumptions possible (we don't have to resolve paths
2551          * that start with a "/")
2552          */
2553         if (realpath(config_base_dir, base_realpath) == NULL)
2554         {
2555           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2556           return 5;
2557         }
2558         else if (strncmp(config_base_dir,
2559                          base_realpath, sizeof(base_realpath)) != 0)
2560         {
2561           fprintf(stderr,
2562                   "Base directory (-b) resolved via file system links!\n"
2563                   "Please consult rrdcached '-b' documentation!\n"
2564                   "Consider specifying the real directory (%s)\n",
2565                   base_realpath);
2566           return 5;
2567         }
2568
2569         len = strlen (config_base_dir);
2570         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2571         {
2572           config_base_dir[len - 1] = 0;
2573           len--;
2574         }
2575
2576         if (len < 1)
2577         {
2578           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2579           return (4);
2580         }
2581
2582         _config_base_dir_len = len;
2583       }
2584       break;
2585
2586       case 'p':
2587       {
2588         if (config_pid_file != NULL)
2589           free (config_pid_file);
2590         config_pid_file = strdup (optarg);
2591         if (config_pid_file == NULL)
2592         {
2593           fprintf (stderr, "read_options: strdup failed.\n");
2594           return (3);
2595         }
2596       }
2597       break;
2598
2599       case 'F':
2600         config_flush_at_shutdown = 1;
2601         break;
2602
2603       case 'j':
2604       {
2605         struct stat statbuf;
2606         const char *dir = optarg;
2607
2608         status = stat(dir, &statbuf);
2609         if (status != 0)
2610         {
2611           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2612           return 6;
2613         }
2614
2615         if (!S_ISDIR(statbuf.st_mode)
2616             || access(dir, R_OK|W_OK|X_OK) != 0)
2617         {
2618           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2619                   errno ? rrd_strerror(errno) : "");
2620           return 6;
2621         }
2622
2623         journal_cur = malloc(PATH_MAX + 1);
2624         journal_old = malloc(PATH_MAX + 1);
2625         if (journal_cur == NULL || journal_old == NULL)
2626         {
2627           fprintf(stderr, "malloc failure for journal files\n");
2628           return 6;
2629         }
2630         else 
2631         {
2632           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2633           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2634         }
2635       }
2636       break;
2637
2638       case 'h':
2639       case '?':
2640         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2641             "\n"
2642             "Usage: rrdcached [options]\n"
2643             "\n"
2644             "Valid options are:\n"
2645             "  -l <address>  Socket address to listen to.\n"
2646             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2647             "  -w <seconds>  Interval in which to write data.\n"
2648             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2649             "  -f <seconds>  Interval in which to flush dead data.\n"
2650             "  -p <file>     Location of the PID-file.\n"
2651             "  -b <dir>      Base directory to change to.\n"
2652             "  -B            Restrict file access to paths within -b <dir>\n"
2653             "  -g            Do not fork and run in the foreground.\n"
2654             "  -j <dir>      Directory in which to create the journal files.\n"
2655             "  -F            Always flush all updates at shutdown\n"
2656             "\n"
2657             "For more information and a detailed description of all options "
2658             "please refer\n"
2659             "to the rrdcached(1) manual page.\n",
2660             VERSION);
2661         status = -1;
2662         break;
2663     } /* switch (option) */
2664   } /* while (getopt) */
2665
2666   /* advise the user when values are not sane */
2667   if (config_flush_interval < 2 * config_write_interval)
2668     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2669             " 2x write interval (-w) !\n");
2670   if (config_write_jitter > config_write_interval)
2671     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2672             " write interval (-w) !\n");
2673
2674   if (config_write_base_only && config_base_dir == NULL)
2675     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2676             "  Consult the rrdcached documentation\n");
2677
2678   if (journal_cur == NULL)
2679     config_flush_at_shutdown = 1;
2680
2681   return (status);
2682 } /* }}} int read_options */
2683
2684 int main (int argc, char **argv)
2685 {
2686   int status;
2687
2688   status = read_options (argc, argv);
2689   if (status != 0)
2690   {
2691     if (status < 0)
2692       status = 0;
2693     return (status);
2694   }
2695
2696   status = daemonize ();
2697   if (status == 1)
2698   {
2699     struct sigaction sigchld;
2700
2701     memset (&sigchld, 0, sizeof (sigchld));
2702     sigchld.sa_handler = SIG_IGN;
2703     sigaction (SIGCHLD, &sigchld, NULL);
2704
2705     return (0);
2706   }
2707   else if (status != 0)
2708   {
2709     fprintf (stderr, "daemonize failed, exiting.\n");
2710     return (1);
2711   }
2712
2713   journal_init();
2714
2715   /* start the queue thread */
2716   memset (&queue_thread, 0, sizeof (queue_thread));
2717   status = pthread_create (&queue_thread,
2718                            NULL, /* attr */
2719                            queue_thread_main,
2720                            NULL); /* args */
2721   if (status != 0)
2722   {
2723     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2724     cleanup();
2725     return (1);
2726   }
2727
2728   listen_thread_main (NULL);
2729   cleanup ();
2730
2731   return (0);
2732 } /* int main */
2733
2734 /*
2735  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2736  */