a0b836487f8e9c3d3be5b5258cdd9042c3df459b
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 struct listen_socket_s
105 {
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
110
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
125
126 struct callback_flush_data_s
127 {
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
134
135 enum queue_side_e
136 {
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
141
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
144
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
149
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
152
153 static int do_shutdown = 0;
154
155 static pthread_t queue_thread;
156
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
160
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
167
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
173
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
176
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
185
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
194
195 /* 
196  * Functions
197  */
198 static void sig_common (const char *sig) /* {{{ */
199 {
200   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
201   do_shutdown++;
202   pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_common */
204
205 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
206 {
207   sig_common("INT");
208 } /* }}} void sig_int_handler */
209
210 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
211 {
212   sig_common("TERM");
213 } /* }}} void sig_term_handler */
214
215 static void install_signal_handlers(void) /* {{{ */
216 {
217   /* These structures are static, because `sigaction' behaves weird if the are
218    * overwritten.. */
219   static struct sigaction sa_int;
220   static struct sigaction sa_term;
221   static struct sigaction sa_pipe;
222
223   /* Install signal handlers */
224   memset (&sa_int, 0, sizeof (sa_int));
225   sa_int.sa_handler = sig_int_handler;
226   sigaction (SIGINT, &sa_int, NULL);
227
228   memset (&sa_term, 0, sizeof (sa_term));
229   sa_term.sa_handler = sig_term_handler;
230   sigaction (SIGTERM, &sa_term, NULL);
231
232   memset (&sa_pipe, 0, sizeof (sa_pipe));
233   sa_pipe.sa_handler = SIG_IGN;
234   sigaction (SIGPIPE, &sa_pipe, NULL);
235
236 } /* }}} void install_signal_handlers */
237
238 static int open_pidfile(void) /* {{{ */
239 {
240   int fd;
241   char *file;
242
243   file = (config_pid_file != NULL)
244     ? config_pid_file
245     : LOCALSTATEDIR "/run/rrdcached.pid";
246
247   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
248   if (fd < 0)
249     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
250             file, rrd_strerror(errno));
251
252   return(fd);
253 }
254
255 static int write_pidfile (int fd) /* {{{ */
256 {
257   pid_t pid;
258   FILE *fh;
259
260   pid = getpid ();
261
262   fh = fdopen (fd, "w");
263   if (fh == NULL)
264   {
265     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
266     close(fd);
267     return (-1);
268   }
269
270   fprintf (fh, "%i\n", (int) pid);
271   fclose (fh);
272
273   return (0);
274 } /* }}} int write_pidfile */
275
276 static int remove_pidfile (void) /* {{{ */
277 {
278   char *file;
279   int status;
280
281   file = (config_pid_file != NULL)
282     ? config_pid_file
283     : LOCALSTATEDIR "/run/rrdcached.pid";
284
285   status = unlink (file);
286   if (status == 0)
287     return (0);
288   return (errno);
289 } /* }}} int remove_pidfile */
290
291 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
292 {
293   char    *buffer;
294   size_t   buffer_used;
295   size_t   buffer_free;
296   ssize_t  status;
297
298   buffer       = (char *) buffer_void;
299   buffer_used  = 0;
300   buffer_free  = buffer_size;
301
302   while (buffer_free > 0)
303   {
304     status = read (fd, buffer + buffer_used, buffer_free);
305     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
306       continue;
307
308     if (status < 0)
309       return (-1);
310
311     if (status == 0)
312       return (0);
313
314     assert ((0 > status) || (buffer_free >= (size_t) status));
315
316     buffer_free = buffer_free - status;
317     buffer_used = buffer_used + status;
318
319     if (buffer[buffer_used - 1] == '\n')
320       break;
321   }
322
323   assert (buffer_used > 0);
324
325   if (buffer[buffer_used - 1] != '\n')
326   {
327     errno = ENOBUFS;
328     return (-1);
329   }
330
331   buffer[buffer_used - 1] = 0;
332
333   /* Fix network line endings. */
334   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
335   {
336     buffer_used--;
337     buffer[buffer_used - 1] = 0;
338   }
339
340   return (buffer_used);
341 } /* }}} ssize_t sread */
342
343 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
344 {
345   const char *ptr;
346   size_t      nleft;
347   ssize_t     status;
348
349   /* special case for journal replay */
350   if (fd < 0) return 0;
351
352   ptr   = (const char *) buf;
353   nleft = count;
354
355   while (nleft > 0)
356   {
357     status = write (fd, (const void *) ptr, nleft);
358
359     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
360       continue;
361
362     if (status < 0)
363       return (status);
364
365     nleft -= status;
366     ptr   += status;
367   }
368
369   return (0);
370 } /* }}} ssize_t swrite */
371
372 static void _wipe_ci_values(cache_item_t *ci, time_t when)
373 {
374   ci->values = NULL;
375   ci->values_num = 0;
376
377   ci->last_flush_time = when;
378   if (config_write_jitter > 0)
379     ci->last_flush_time += (random() % config_write_jitter);
380
381   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
382 }
383
384 /*
385  * enqueue_cache_item:
386  * `cache_lock' must be acquired before calling this function!
387  */
388 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
389     queue_side_t side)
390 {
391   int did_insert = 0;
392
393   if (ci == NULL)
394     return (-1);
395
396   if (ci->values_num == 0)
397     return (0);
398
399   if (side == HEAD)
400   {
401     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
402     {
403       assert (ci->next == NULL);
404       ci->next = cache_queue_head;
405       cache_queue_head = ci;
406
407       if (cache_queue_tail == NULL)
408         cache_queue_tail = cache_queue_head;
409
410       did_insert = 1;
411     }
412     else if (cache_queue_head == ci)
413     {
414       /* do nothing */
415     }
416     else /* enqueued, but not first entry */
417     {
418       cache_item_t *prev;
419
420       /* find previous entry */
421       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
422         if (prev->next == ci)
423           break;
424       assert (prev != NULL);
425
426       /* move to the front */
427       prev->next = ci->next;
428       ci->next = cache_queue_head;
429       cache_queue_head = ci;
430
431       /* check if we need to adapt the tail */
432       if (cache_queue_tail == ci)
433         cache_queue_tail = prev;
434     }
435   }
436   else /* (side == TAIL) */
437   {
438     /* We don't move values back in the list.. */
439     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
440       return (0);
441
442     assert (ci->next == NULL);
443
444     if (cache_queue_tail == NULL)
445       cache_queue_head = ci;
446     else
447       cache_queue_tail->next = ci;
448     cache_queue_tail = ci;
449
450     did_insert = 1;
451   }
452
453   ci->flags |= CI_FLAGS_IN_QUEUE;
454
455   if (did_insert)
456   {
457     pthread_cond_broadcast(&cache_cond);
458     pthread_mutex_lock (&stats_lock);
459     stats_queue_length++;
460     pthread_mutex_unlock (&stats_lock);
461   }
462
463   return (0);
464 } /* }}} int enqueue_cache_item */
465
466 /*
467  * tree_callback_flush:
468  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
469  * while this is in progress.
470  */
471 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
472     gpointer data)
473 {
474   cache_item_t *ci;
475   callback_flush_data_t *cfd;
476
477   ci = (cache_item_t *) value;
478   cfd = (callback_flush_data_t *) data;
479
480   if ((ci->last_flush_time <= cfd->abs_timeout)
481       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
482       && (ci->values_num > 0))
483   {
484     enqueue_cache_item (ci, TAIL);
485   }
486   else if ((do_shutdown != 0)
487       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
488       && (ci->values_num > 0))
489   {
490     enqueue_cache_item (ci, TAIL);
491   }
492   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
493       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
494       && (ci->values_num <= 0))
495   {
496     char **temp;
497
498     temp = (char **) realloc (cfd->keys,
499         sizeof (char *) * (cfd->keys_num + 1));
500     if (temp == NULL)
501     {
502       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
503       return (FALSE);
504     }
505     cfd->keys = temp;
506     /* Make really sure this points to the _same_ place */
507     assert ((char *) key == ci->file);
508     cfd->keys[cfd->keys_num] = (char *) key;
509     cfd->keys_num++;
510   }
511
512   return (FALSE);
513 } /* }}} gboolean tree_callback_flush */
514
515 static int flush_old_values (int max_age)
516 {
517   callback_flush_data_t cfd;
518   size_t k;
519
520   memset (&cfd, 0, sizeof (cfd));
521   /* Pass the current time as user data so that we don't need to call
522    * `time' for each node. */
523   cfd.now = time (NULL);
524   cfd.keys = NULL;
525   cfd.keys_num = 0;
526
527   if (max_age > 0)
528     cfd.abs_timeout = cfd.now - max_age;
529   else
530     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
531
532   /* `tree_callback_flush' will return the keys of all values that haven't
533    * been touched in the last `config_flush_interval' seconds in `cfd'.
534    * The char*'s in this array point to the same memory as ci->file, so we
535    * don't need to free them separately. */
536   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
537
538   for (k = 0; k < cfd.keys_num; k++)
539   {
540     cache_item_t *ci;
541
542     /* This must not fail. */
543     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
544     assert (ci != NULL);
545
546     /* If we end up here with values available, something's seriously
547      * messed up. */
548     assert (ci->values_num == 0);
549
550     /* Remove the node from the tree */
551     g_tree_remove (cache_tree, cfd.keys[k]);
552     cfd.keys[k] = NULL;
553
554     /* Now free and clean up `ci'. */
555     free (ci->file);
556     ci->file = NULL;
557     free (ci);
558     ci = NULL;
559   } /* for (k = 0; k < cfd.keys_num; k++) */
560
561   if (cfd.keys != NULL)
562   {
563     free (cfd.keys);
564     cfd.keys = NULL;
565   }
566
567   return (0);
568 } /* int flush_old_values */
569
570 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
571 {
572   struct timeval now;
573   struct timespec next_flush;
574
575   gettimeofday (&now, NULL);
576   next_flush.tv_sec = now.tv_sec + config_flush_interval;
577   next_flush.tv_nsec = 1000 * now.tv_usec;
578
579   pthread_mutex_lock (&cache_lock);
580   while ((do_shutdown == 0) || (cache_queue_head != NULL))
581   {
582     cache_item_t *ci;
583     char *file;
584     char **values;
585     int values_num;
586     int status;
587     int i;
588
589     /* First, check if it's time to do the cache flush. */
590     gettimeofday (&now, NULL);
591     if ((now.tv_sec > next_flush.tv_sec)
592         || ((now.tv_sec == next_flush.tv_sec)
593           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
594     {
595       /* Flush all values that haven't been written in the last
596        * `config_write_interval' seconds. */
597       flush_old_values (config_write_interval);
598
599       /* Determine the time of the next cache flush. */
600       while (next_flush.tv_sec <= now.tv_sec)
601         next_flush.tv_sec += config_flush_interval;
602
603       /* unlock the cache while we rotate so we don't block incoming
604        * updates if the fsync() blocks on disk I/O */
605       pthread_mutex_unlock(&cache_lock);
606       journal_rotate();
607       pthread_mutex_lock(&cache_lock);
608     }
609
610     /* Now, check if there's something to store away. If not, wait until
611      * something comes in or it's time to do the cache flush. */
612     if (cache_queue_head == NULL)
613     {
614       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
615       if ((status != 0) && (status != ETIMEDOUT))
616       {
617         RRDD_LOG (LOG_ERR, "queue_thread_main: "
618             "pthread_cond_timedwait returned %i.", status);
619       }
620     }
621
622     /* We're about to shut down, so lets flush the entire tree. */
623     if ((do_shutdown != 0) && (cache_queue_head == NULL))
624       flush_old_values (/* max age = */ -1);
625
626     /* Check if a value has arrived. This may be NULL if we timed out or there
627      * was an interrupt such as a signal. */
628     if (cache_queue_head == NULL)
629       continue;
630
631     ci = cache_queue_head;
632
633     /* copy the relevant parts */
634     file = strdup (ci->file);
635     if (file == NULL)
636     {
637       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
638       continue;
639     }
640
641     assert(ci->values != NULL);
642     assert(ci->values_num > 0);
643
644     values = ci->values;
645     values_num = ci->values_num;
646
647     _wipe_ci_values(ci, time(NULL));
648
649     cache_queue_head = ci->next;
650     if (cache_queue_head == NULL)
651       cache_queue_tail = NULL;
652     ci->next = NULL;
653
654     pthread_mutex_lock (&stats_lock);
655     assert (stats_queue_length > 0);
656     stats_queue_length--;
657     pthread_mutex_unlock (&stats_lock);
658
659     pthread_mutex_unlock (&cache_lock);
660
661     rrd_clear_error ();
662     status = rrd_update_r (file, NULL, values_num, (void *) values);
663     if (status != 0)
664     {
665       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
666           "rrd_update_r (%s) failed with status %i. (%s)",
667           file, status, rrd_get_error());
668     }
669
670     journal_write("wrote", file);
671     pthread_cond_broadcast(&ci->flushed);
672
673     for (i = 0; i < values_num; i++)
674       free (values[i]);
675
676     free(values);
677     free(file);
678
679     if (status == 0)
680     {
681       pthread_mutex_lock (&stats_lock);
682       stats_updates_written++;
683       stats_data_sets_written += values_num;
684       pthread_mutex_unlock (&stats_lock);
685     }
686
687     pthread_mutex_lock (&cache_lock);
688
689     /* We're about to shut down, so lets flush the entire tree. */
690     if ((do_shutdown != 0) && (cache_queue_head == NULL))
691       flush_old_values (/* max age = */ -1);
692   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
693   pthread_mutex_unlock (&cache_lock);
694
695   assert(cache_queue_head == NULL);
696   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
697   journal_done();
698
699   return (NULL);
700 } /* }}} void *queue_thread_main */
701
702 static int buffer_get_field (char **buffer_ret, /* {{{ */
703     size_t *buffer_size_ret, char **field_ret)
704 {
705   char *buffer;
706   size_t buffer_pos;
707   size_t buffer_size;
708   char *field;
709   size_t field_size;
710   int status;
711
712   buffer = *buffer_ret;
713   buffer_pos = 0;
714   buffer_size = *buffer_size_ret;
715   field = *buffer_ret;
716   field_size = 0;
717
718   if (buffer_size <= 0)
719     return (-1);
720
721   /* This is ensured by `handle_request'. */
722   assert (buffer[buffer_size - 1] == '\0');
723
724   status = -1;
725   while (buffer_pos < buffer_size)
726   {
727     /* Check for end-of-field or end-of-buffer */
728     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
729     {
730       field[field_size] = 0;
731       field_size++;
732       buffer_pos++;
733       status = 0;
734       break;
735     }
736     /* Handle escaped characters. */
737     else if (buffer[buffer_pos] == '\\')
738     {
739       if (buffer_pos >= (buffer_size - 1))
740         break;
741       buffer_pos++;
742       field[field_size] = buffer[buffer_pos];
743       field_size++;
744       buffer_pos++;
745     }
746     /* Normal operation */ 
747     else
748     {
749       field[field_size] = buffer[buffer_pos];
750       field_size++;
751       buffer_pos++;
752     }
753   } /* while (buffer_pos < buffer_size) */
754
755   if (status != 0)
756     return (status);
757
758   *buffer_ret = buffer + buffer_pos;
759   *buffer_size_ret = buffer_size - buffer_pos;
760   *field_ret = field;
761
762   return (0);
763 } /* }}} int buffer_get_field */
764
765 static int flush_file (const char *filename) /* {{{ */
766 {
767   cache_item_t *ci;
768
769   pthread_mutex_lock (&cache_lock);
770
771   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
772   if (ci == NULL)
773   {
774     pthread_mutex_unlock (&cache_lock);
775     return (ENOENT);
776   }
777
778   /* Enqueue at head */
779   enqueue_cache_item (ci, HEAD);
780
781   pthread_cond_wait(&ci->flushed, &cache_lock);
782   pthread_mutex_unlock(&cache_lock);
783
784   return (0);
785 } /* }}} int flush_file */
786
787 static int handle_request_help (int fd, /* {{{ */
788     char *buffer, size_t buffer_size)
789 {
790   int status;
791   char **help_text;
792   size_t help_text_len;
793   char *command;
794   size_t i;
795
796   char *help_help[] =
797   {
798     "5 Command overview\n",
799     "FLUSH <filename>\n",
800     "FLUSHALL\n",
801     "HELP [<command>]\n",
802     "UPDATE <filename> <values> [<values> ...]\n",
803     "STATS\n"
804   };
805   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
806
807   char *help_flush[] =
808   {
809     "4 Help for FLUSH\n",
810     "Usage: FLUSH <filename>\n",
811     "\n",
812     "Adds the given filename to the head of the update queue and returns\n",
813     "after is has been dequeued.\n"
814   };
815   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
816
817   char *help_flushall[] =
818   {
819     "3 Help for FLUSHALL\n",
820     "Usage: FLUSHALL\n",
821     "\n",
822     "Triggers writing of all pending updates.  Returns immediately.\n"
823   };
824   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
825
826   char *help_update[] =
827   {
828     "9 Help for UPDATE\n",
829     "Usage: UPDATE <filename> <values> [<values> ...]\n"
830     "\n",
831     "Adds the given file to the internal cache if it is not yet known and\n",
832     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
833     "for details.\n",
834     "\n",
835     "Each <values> has the following form:\n",
836     "  <values> = <time>:<value>[:<value>[...]]\n",
837     "See the rrdupdate(1) manpage for details.\n"
838   };
839   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
840
841   char *help_stats[] =
842   {
843     "4 Help for STATS\n",
844     "Usage: STATS\n",
845     "\n",
846     "Returns some performance counters, see the rrdcached(1) manpage for\n",
847     "a description of the values.\n"
848   };
849   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
850
851   status = buffer_get_field (&buffer, &buffer_size, &command);
852   if (status != 0)
853   {
854     help_text = help_help;
855     help_text_len = help_help_len;
856   }
857   else
858   {
859     if (strcasecmp (command, "update") == 0)
860     {
861       help_text = help_update;
862       help_text_len = help_update_len;
863     }
864     else if (strcasecmp (command, "flush") == 0)
865     {
866       help_text = help_flush;
867       help_text_len = help_flush_len;
868     }
869     else if (strcasecmp (command, "flushall") == 0)
870     {
871       help_text = help_flushall;
872       help_text_len = help_flushall_len;
873     }
874     else if (strcasecmp (command, "stats") == 0)
875     {
876       help_text = help_stats;
877       help_text_len = help_stats_len;
878     }
879     else
880     {
881       help_text = help_help;
882       help_text_len = help_help_len;
883     }
884   }
885
886   for (i = 0; i < help_text_len; i++)
887   {
888     status = swrite (fd, help_text[i], strlen (help_text[i]));
889     if (status < 0)
890     {
891       status = errno;
892       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
893       return (status);
894     }
895   }
896
897   return (0);
898 } /* }}} int handle_request_help */
899
900 static int handle_request_stats (int fd, /* {{{ */
901     char *buffer __attribute__((unused)),
902     size_t buffer_size __attribute__((unused)))
903 {
904   int status;
905   char outbuf[CMD_MAX];
906
907   uint64_t copy_queue_length;
908   uint64_t copy_updates_received;
909   uint64_t copy_flush_received;
910   uint64_t copy_updates_written;
911   uint64_t copy_data_sets_written;
912   uint64_t copy_journal_bytes;
913   uint64_t copy_journal_rotate;
914
915   uint64_t tree_nodes_number;
916   uint64_t tree_depth;
917
918   pthread_mutex_lock (&stats_lock);
919   copy_queue_length       = stats_queue_length;
920   copy_updates_received   = stats_updates_received;
921   copy_flush_received     = stats_flush_received;
922   copy_updates_written    = stats_updates_written;
923   copy_data_sets_written  = stats_data_sets_written;
924   copy_journal_bytes      = stats_journal_bytes;
925   copy_journal_rotate     = stats_journal_rotate;
926   pthread_mutex_unlock (&stats_lock);
927
928   pthread_mutex_lock (&cache_lock);
929   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
930   tree_depth        = (uint64_t) g_tree_height (cache_tree);
931   pthread_mutex_unlock (&cache_lock);
932
933 #define RRDD_STATS_SEND \
934   outbuf[sizeof (outbuf) - 1] = 0; \
935   status = swrite (fd, outbuf, strlen (outbuf)); \
936   if (status < 0) \
937   { \
938     status = errno; \
939     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
940     return (status); \
941   }
942
943   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
944   RRDD_STATS_SEND;
945
946   snprintf (outbuf, sizeof (outbuf),
947       "QueueLength: %"PRIu64"\n", copy_queue_length);
948   RRDD_STATS_SEND;
949
950   snprintf (outbuf, sizeof (outbuf),
951       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
952   RRDD_STATS_SEND;
953
954   snprintf (outbuf, sizeof (outbuf),
955       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
956   RRDD_STATS_SEND;
957
958   snprintf (outbuf, sizeof (outbuf),
959       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
960   RRDD_STATS_SEND;
961
962   snprintf (outbuf, sizeof (outbuf),
963       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
964   RRDD_STATS_SEND;
965
966   snprintf (outbuf, sizeof (outbuf),
967       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
968   RRDD_STATS_SEND;
969
970   snprintf (outbuf, sizeof (outbuf),
971       "TreeDepth: %"PRIu64"\n", tree_depth);
972   RRDD_STATS_SEND;
973
974   snprintf (outbuf, sizeof(outbuf),
975       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
976   RRDD_STATS_SEND;
977
978   snprintf (outbuf, sizeof(outbuf),
979       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
980   RRDD_STATS_SEND;
981
982   return (0);
983 #undef RRDD_STATS_SEND
984 } /* }}} int handle_request_stats */
985
986 static int handle_request_flush (int fd, /* {{{ */
987     char *buffer, size_t buffer_size)
988 {
989   char *file;
990   int status;
991   char result[CMD_MAX];
992
993   status = buffer_get_field (&buffer, &buffer_size, &file);
994   if (status != 0)
995   {
996     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
997   }
998   else
999   {
1000     pthread_mutex_lock(&stats_lock);
1001     stats_flush_received++;
1002     pthread_mutex_unlock(&stats_lock);
1003
1004     status = flush_file (file);
1005     if (status == 0)
1006       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1007     else if (status == ENOENT)
1008     {
1009       /* no file in our tree; see whether it exists at all */
1010       struct stat statbuf;
1011
1012       memset(&statbuf, 0, sizeof(statbuf));
1013       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1014         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1015       else
1016         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1017     }
1018     else if (status < 0)
1019       strncpy (result, "-1 Internal error.\n", sizeof (result));
1020     else
1021       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1022   }
1023   result[sizeof (result) - 1] = 0;
1024
1025   status = swrite (fd, result, strlen (result));
1026   if (status < 0)
1027   {
1028     status = errno;
1029     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1030     return (status);
1031   }
1032
1033   return (0);
1034 } /* }}} int handle_request_flush */
1035
1036 static int handle_request_flushall(int fd) /* {{{ */
1037 {
1038   int status;
1039   char answer[] ="0 Started flush.\n";
1040
1041   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1042
1043   pthread_mutex_lock(&cache_lock);
1044   flush_old_values(-1);
1045   pthread_mutex_unlock(&cache_lock);
1046
1047   status = swrite(fd, answer, strlen(answer));
1048   if (status < 0)
1049   {
1050     status = errno;
1051     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1052   }
1053
1054   return (status);
1055 }
1056
1057 static int handle_request_update (int fd, /* {{{ */
1058     char *buffer, size_t buffer_size)
1059 {
1060   char *file;
1061   int values_num = 0;
1062   int status;
1063
1064   time_t now;
1065
1066   cache_item_t *ci;
1067   char answer[CMD_MAX];
1068
1069 #define RRDD_UPDATE_SEND \
1070   answer[sizeof (answer) - 1] = 0; \
1071   status = swrite (fd, answer, strlen (answer)); \
1072   if (status < 0) \
1073   { \
1074     status = errno; \
1075     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1076     return (status); \
1077   }
1078
1079   now = time (NULL);
1080
1081   status = buffer_get_field (&buffer, &buffer_size, &file);
1082   if (status != 0)
1083   {
1084     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1085         sizeof (answer));
1086     RRDD_UPDATE_SEND;
1087     return (0);
1088   }
1089
1090   pthread_mutex_lock(&stats_lock);
1091   stats_updates_received++;
1092   pthread_mutex_unlock(&stats_lock);
1093
1094   pthread_mutex_lock (&cache_lock);
1095   ci = g_tree_lookup (cache_tree, file);
1096
1097   if (ci == NULL) /* {{{ */
1098   {
1099     struct stat statbuf;
1100
1101     /* don't hold the lock while we setup; stat(2) might block */
1102     pthread_mutex_unlock(&cache_lock);
1103
1104     memset (&statbuf, 0, sizeof (statbuf));
1105     status = stat (file, &statbuf);
1106     if (status != 0)
1107     {
1108       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1109
1110       status = errno;
1111       if (status == ENOENT)
1112         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1113       else
1114         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1115             status);
1116       RRDD_UPDATE_SEND;
1117       return (0);
1118     }
1119     if (!S_ISREG (statbuf.st_mode))
1120     {
1121       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1122       RRDD_UPDATE_SEND;
1123       return (0);
1124     }
1125     if (access(file, R_OK|W_OK) != 0)
1126     {
1127       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1128                 file, rrd_strerror(errno));
1129       RRDD_UPDATE_SEND;
1130       return (0);
1131     }
1132
1133     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1134     if (ci == NULL)
1135     {
1136       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1137
1138       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1139       RRDD_UPDATE_SEND;
1140       return (0);
1141     }
1142     memset (ci, 0, sizeof (cache_item_t));
1143
1144     ci->file = strdup (file);
1145     if (ci->file == NULL)
1146     {
1147       free (ci);
1148       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1149
1150       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1151       RRDD_UPDATE_SEND;
1152       return (0);
1153     }
1154
1155     _wipe_ci_values(ci, now);
1156     ci->flags = CI_FLAGS_IN_TREE;
1157
1158     pthread_mutex_lock(&cache_lock);
1159     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1160   } /* }}} */
1161   assert (ci != NULL);
1162
1163   while (buffer_size > 0)
1164   {
1165     char **temp;
1166     char *value;
1167
1168     status = buffer_get_field (&buffer, &buffer_size, &value);
1169     if (status != 0)
1170     {
1171       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1172       break;
1173     }
1174
1175     temp = (char **) realloc (ci->values,
1176         sizeof (char *) * (ci->values_num + 1));
1177     if (temp == NULL)
1178     {
1179       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1180       continue;
1181     }
1182     ci->values = temp;
1183
1184     ci->values[ci->values_num] = strdup (value);
1185     if (ci->values[ci->values_num] == NULL)
1186     {
1187       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1188       continue;
1189     }
1190     ci->values_num++;
1191
1192     values_num++;
1193   }
1194
1195   if (((now - ci->last_flush_time) >= config_write_interval)
1196       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1197       && (ci->values_num > 0))
1198   {
1199     enqueue_cache_item (ci, TAIL);
1200   }
1201
1202   pthread_mutex_unlock (&cache_lock);
1203
1204   if (values_num < 1)
1205   {
1206     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1207   }
1208   else
1209   {
1210     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1211         (values_num == 1) ? "" : "s");
1212   }
1213   RRDD_UPDATE_SEND;
1214   return (0);
1215 #undef RRDD_UPDATE_SEND
1216 } /* }}} int handle_request_update */
1217
1218 /* we came across a "WROTE" entry during journal replay.
1219  * throw away any values that we have accumulated for this file
1220  */
1221 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1222                                  const char *buffer,
1223                                  size_t buffer_size __attribute__((unused)))
1224 {
1225   int i;
1226   cache_item_t *ci;
1227   const char *file = buffer;
1228
1229   pthread_mutex_lock(&cache_lock);
1230
1231   ci = g_tree_lookup(cache_tree, file);
1232   if (ci == NULL)
1233   {
1234     pthread_mutex_unlock(&cache_lock);
1235     return (0);
1236   }
1237
1238   if (ci->values)
1239   {
1240     for (i=0; i < ci->values_num; i++)
1241       free(ci->values[i]);
1242
1243     free(ci->values);
1244   }
1245
1246   _wipe_ci_values(ci, time(NULL));
1247
1248   pthread_mutex_unlock(&cache_lock);
1249   return (0);
1250 } /* }}} int handle_request_wrote */
1251
1252 /* if fd < 0, we are in journal replay mode */
1253 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1254 {
1255   char *buffer_ptr;
1256   char *command;
1257   int status;
1258
1259   assert (buffer[buffer_size - 1] == '\0');
1260
1261   buffer_ptr = buffer;
1262   command = NULL;
1263   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1264   if (status != 0)
1265   {
1266     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1267     return (-1);
1268   }
1269
1270   if (strcasecmp (command, "update") == 0)
1271   {
1272     /* don't re-write updates in replay mode */
1273     if (fd >= 0)
1274       journal_write(command, buffer_ptr);
1275
1276     return (handle_request_update (fd, buffer_ptr, buffer_size));
1277   }
1278   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1279   {
1280     /* this is only valid in replay mode */
1281     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1282   }
1283   else if (strcasecmp (command, "flush") == 0)
1284   {
1285     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1286   }
1287   else if (strcasecmp (command, "flushall") == 0)
1288   {
1289     return (handle_request_flushall(fd));
1290   }
1291   else if (strcasecmp (command, "stats") == 0)
1292   {
1293     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1294   }
1295   else if (strcasecmp (command, "help") == 0)
1296   {
1297     return (handle_request_help (fd, buffer_ptr, buffer_size));
1298   }
1299   else
1300   {
1301     char result[CMD_MAX];
1302
1303     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1304     result[sizeof (result) - 1] = 0;
1305
1306     status = swrite (fd, result, strlen (result));
1307     if (status < 0)
1308     {
1309       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1310       return (-1);
1311     }
1312   }
1313
1314   return (0);
1315 } /* }}} int handle_request */
1316
1317 /* MUST NOT hold journal_lock before calling this */
1318 static void journal_rotate(void) /* {{{ */
1319 {
1320   FILE *old_fh = NULL;
1321
1322   if (journal_cur == NULL || journal_old == NULL)
1323     return;
1324
1325   pthread_mutex_lock(&journal_lock);
1326
1327   /* we rotate this way (rename before close) so that the we can release
1328    * the journal lock as fast as possible.  Journal writes to the new
1329    * journal can proceed immediately after the new file is opened.  The
1330    * fclose can then block without affecting new updates.
1331    */
1332   if (journal_fh != NULL)
1333   {
1334     old_fh = journal_fh;
1335     rename(journal_cur, journal_old);
1336     ++stats_journal_rotate;
1337   }
1338
1339   journal_fh = fopen(journal_cur, "a");
1340   pthread_mutex_unlock(&journal_lock);
1341
1342   if (old_fh != NULL)
1343     fclose(old_fh);
1344
1345   if (journal_fh == NULL)
1346     RRDD_LOG(LOG_CRIT,
1347              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1348              journal_cur, rrd_strerror(errno));
1349
1350 } /* }}} static void journal_rotate */
1351
1352 static void journal_done(void) /* {{{ */
1353 {
1354   if (journal_cur == NULL)
1355     return;
1356
1357   pthread_mutex_lock(&journal_lock);
1358   if (journal_fh != NULL)
1359   {
1360     fclose(journal_fh);
1361     journal_fh = NULL;
1362   }
1363
1364   RRDD_LOG(LOG_INFO, "removing journals");
1365
1366   unlink(journal_old);
1367   unlink(journal_cur);
1368   pthread_mutex_unlock(&journal_lock);
1369
1370 } /* }}} static void journal_done */
1371
1372 static int journal_write(char *cmd, char *args) /* {{{ */
1373 {
1374   int chars;
1375
1376   if (journal_fh == NULL)
1377     return 0;
1378
1379   pthread_mutex_lock(&journal_lock);
1380   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1381   pthread_mutex_unlock(&journal_lock);
1382
1383   if (chars > 0)
1384   {
1385     pthread_mutex_lock(&stats_lock);
1386     stats_journal_bytes += chars;
1387     pthread_mutex_unlock(&stats_lock);
1388   }
1389
1390   return chars;
1391 } /* }}} static int journal_write */
1392
1393 static int journal_replay (const char *file) /* {{{ */
1394 {
1395   FILE *fh;
1396   int entry_cnt = 0;
1397   int fail_cnt = 0;
1398   uint64_t line = 0;
1399   char entry[CMD_MAX];
1400
1401   if (file == NULL) return 0;
1402
1403   fh = fopen(file, "r");
1404   if (fh == NULL)
1405   {
1406     if (errno != ENOENT)
1407       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1408                file, rrd_strerror(errno));
1409     return 0;
1410   }
1411   else
1412     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1413
1414   while(!feof(fh))
1415   {
1416     size_t entry_len;
1417
1418     ++line;
1419     fgets(entry, sizeof(entry), fh);
1420     entry_len = strlen(entry);
1421
1422     /* check \n termination in case journal writing crashed mid-line */
1423     if (entry_len == 0)
1424       continue;
1425     else if (entry[entry_len - 1] != '\n')
1426     {
1427       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1428       ++fail_cnt;
1429       continue;
1430     }
1431
1432     entry[entry_len - 1] = '\0';
1433
1434     if (handle_request(-1, entry, entry_len) == 0)
1435       ++entry_cnt;
1436     else
1437       ++fail_cnt;
1438   }
1439
1440   fclose(fh);
1441
1442   if (entry_cnt > 0)
1443   {
1444     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1445              entry_cnt, fail_cnt);
1446     return 1;
1447   }
1448   else
1449     return 0;
1450
1451 } /* }}} static int journal_replay */
1452
1453 static void *connection_thread_main (void *args) /* {{{ */
1454 {
1455   pthread_t self;
1456   int i;
1457   int fd;
1458   
1459   fd = *((int *) args);
1460   free (args);
1461
1462   pthread_mutex_lock (&connection_threads_lock);
1463   {
1464     pthread_t *temp;
1465
1466     temp = (pthread_t *) realloc (connection_threads,
1467         sizeof (pthread_t) * (connection_threads_num + 1));
1468     if (temp == NULL)
1469     {
1470       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1471     }
1472     else
1473     {
1474       connection_threads = temp;
1475       connection_threads[connection_threads_num] = pthread_self ();
1476       connection_threads_num++;
1477     }
1478   }
1479   pthread_mutex_unlock (&connection_threads_lock);
1480
1481   while (do_shutdown == 0)
1482   {
1483     char buffer[CMD_MAX];
1484
1485     struct pollfd pollfd;
1486     int status;
1487
1488     pollfd.fd = fd;
1489     pollfd.events = POLLIN | POLLPRI;
1490     pollfd.revents = 0;
1491
1492     status = poll (&pollfd, 1, /* timeout = */ 500);
1493     if (status == 0) /* timeout */
1494       continue;
1495     else if (status < 0) /* error */
1496     {
1497       status = errno;
1498       if (status == EINTR)
1499         continue;
1500       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1501       continue;
1502     }
1503
1504     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1505     {
1506       close (fd);
1507       break;
1508     }
1509     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1510     {
1511       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1512           "poll(2) returned something unexpected: %#04hx",
1513           pollfd.revents);
1514       close (fd);
1515       break;
1516     }
1517
1518     status = (int) sread (fd, buffer, sizeof (buffer));
1519     if (status <= 0)
1520     {
1521       close (fd);
1522
1523       if (status < 0)
1524         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1525
1526       break;
1527     }
1528
1529     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1530     if (status != 0)
1531       break;
1532   }
1533
1534   close(fd);
1535
1536   self = pthread_self ();
1537   /* Remove this thread from the connection threads list */
1538   pthread_mutex_lock (&connection_threads_lock);
1539   /* Find out own index in the array */
1540   for (i = 0; i < connection_threads_num; i++)
1541     if (pthread_equal (connection_threads[i], self) != 0)
1542       break;
1543   assert (i < connection_threads_num);
1544
1545   /* Move the trailing threads forward. */
1546   if (i < (connection_threads_num - 1))
1547   {
1548     memmove (connection_threads + i,
1549         connection_threads + i + 1,
1550         sizeof (pthread_t) * (connection_threads_num - i - 1));
1551   }
1552
1553   connection_threads_num--;
1554   pthread_mutex_unlock (&connection_threads_lock);
1555
1556   return (NULL);
1557 } /* }}} void *connection_thread_main */
1558
1559 static int open_listen_socket_unix (const char *path) /* {{{ */
1560 {
1561   int fd;
1562   struct sockaddr_un sa;
1563   listen_socket_t *temp;
1564   int status;
1565
1566   temp = (listen_socket_t *) realloc (listen_fds,
1567       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1568   if (temp == NULL)
1569   {
1570     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1571     return (-1);
1572   }
1573   listen_fds = temp;
1574   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1575
1576   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1577   if (fd < 0)
1578   {
1579     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1580     return (-1);
1581   }
1582
1583   memset (&sa, 0, sizeof (sa));
1584   sa.sun_family = AF_UNIX;
1585   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1586
1587   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1588   if (status != 0)
1589   {
1590     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1591     close (fd);
1592     unlink (path);
1593     return (-1);
1594   }
1595
1596   status = listen (fd, /* backlog = */ 10);
1597   if (status != 0)
1598   {
1599     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1600     close (fd);
1601     unlink (path);
1602     return (-1);
1603   }
1604   
1605   listen_fds[listen_fds_num].fd = fd;
1606   snprintf (listen_fds[listen_fds_num].path,
1607       sizeof (listen_fds[listen_fds_num].path) - 1,
1608       "unix:%s", path);
1609   listen_fds_num++;
1610
1611   return (0);
1612 } /* }}} int open_listen_socket_unix */
1613
1614 static int open_listen_socket (const char *addr_orig) /* {{{ */
1615 {
1616   struct addrinfo ai_hints;
1617   struct addrinfo *ai_res;
1618   struct addrinfo *ai_ptr;
1619   char addr_copy[NI_MAXHOST];
1620   char *addr;
1621   char *port;
1622   int status;
1623
1624   assert (addr_orig != NULL);
1625
1626   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1627   addr_copy[sizeof (addr_copy) - 1] = 0;
1628   addr = addr_copy;
1629
1630   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1631     return (open_listen_socket_unix (addr + strlen ("unix:")));
1632   else if (addr[0] == '/')
1633     return (open_listen_socket_unix (addr));
1634
1635   memset (&ai_hints, 0, sizeof (ai_hints));
1636   ai_hints.ai_flags = 0;
1637 #ifdef AI_ADDRCONFIG
1638   ai_hints.ai_flags |= AI_ADDRCONFIG;
1639 #endif
1640   ai_hints.ai_family = AF_UNSPEC;
1641   ai_hints.ai_socktype = SOCK_STREAM;
1642
1643   port = NULL;
1644  if (*addr == '[') /* IPv6+port format */
1645   {
1646     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1647     addr++;
1648
1649     port = strchr (addr, ']');
1650     if (port == NULL)
1651     {
1652       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1653           addr_orig);
1654       return (-1);
1655     }
1656     *port = 0;
1657     port++;
1658
1659     if (*port == ':')
1660       port++;
1661     else if (*port == 0)
1662       port = NULL;
1663     else
1664     {
1665       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1666           port);
1667       return (-1);
1668     }
1669   } /* if (*addr = ']') */
1670   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1671   {
1672     port = rindex(addr, ':');
1673     if (port != NULL)
1674     {
1675       *port = 0;
1676       port++;
1677     }
1678   }
1679   ai_res = NULL;
1680   status = getaddrinfo (addr,
1681                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1682                         &ai_hints, &ai_res);
1683   if (status != 0)
1684   {
1685     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1686         "%s", addr, gai_strerror (status));
1687     return (-1);
1688   }
1689
1690   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1691   {
1692     int fd;
1693     listen_socket_t *temp;
1694     int one = 1;
1695
1696     temp = (listen_socket_t *) realloc (listen_fds,
1697         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1698     if (temp == NULL)
1699     {
1700       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1701       continue;
1702     }
1703     listen_fds = temp;
1704     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1705
1706     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1707     if (fd < 0)
1708     {
1709       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1710       continue;
1711     }
1712
1713     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1714
1715     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1716     if (status != 0)
1717     {
1718       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1719       close (fd);
1720       continue;
1721     }
1722
1723     status = listen (fd, /* backlog = */ 10);
1724     if (status != 0)
1725     {
1726       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1727       close (fd);
1728       return (-1);
1729     }
1730
1731     listen_fds[listen_fds_num].fd = fd;
1732     strncpy (listen_fds[listen_fds_num].path, addr,
1733         sizeof (listen_fds[listen_fds_num].path) - 1);
1734     listen_fds_num++;
1735   } /* for (ai_ptr) */
1736
1737   return (0);
1738 } /* }}} int open_listen_socket */
1739
1740 static int close_listen_sockets (void) /* {{{ */
1741 {
1742   size_t i;
1743
1744   for (i = 0; i < listen_fds_num; i++)
1745   {
1746     close (listen_fds[i].fd);
1747     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1748       unlink (listen_fds[i].path + strlen ("unix:"));
1749   }
1750
1751   free (listen_fds);
1752   listen_fds = NULL;
1753   listen_fds_num = 0;
1754
1755   return (0);
1756 } /* }}} int close_listen_sockets */
1757
1758 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1759 {
1760   struct pollfd *pollfds;
1761   int pollfds_num;
1762   int status;
1763   int i;
1764
1765   for (i = 0; i < config_listen_address_list_len; i++)
1766     open_listen_socket (config_listen_address_list[i]);
1767
1768   if (config_listen_address_list_len < 1)
1769     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1770
1771   if (listen_fds_num < 1)
1772   {
1773     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1774         "could be opened. Sorry.");
1775     return (NULL);
1776   }
1777
1778   pollfds_num = listen_fds_num;
1779   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1780   if (pollfds == NULL)
1781   {
1782     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1783     return (NULL);
1784   }
1785   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1786
1787   RRDD_LOG(LOG_INFO, "listening for connections");
1788
1789   while (do_shutdown == 0)
1790   {
1791     assert (pollfds_num == ((int) listen_fds_num));
1792     for (i = 0; i < pollfds_num; i++)
1793     {
1794       pollfds[i].fd = listen_fds[i].fd;
1795       pollfds[i].events = POLLIN | POLLPRI;
1796       pollfds[i].revents = 0;
1797     }
1798
1799     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1800     if (status == 0)
1801     {
1802       continue; /* timeout */
1803     }
1804     else if (status < 0)
1805     {
1806       status = errno;
1807       if (status != EINTR)
1808       {
1809         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1810       }
1811       continue;
1812     }
1813
1814     for (i = 0; i < pollfds_num; i++)
1815     {
1816       int *client_sd;
1817       struct sockaddr_storage client_sa;
1818       socklen_t client_sa_size;
1819       pthread_t tid;
1820       pthread_attr_t attr;
1821
1822       if (pollfds[i].revents == 0)
1823         continue;
1824
1825       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1826       {
1827         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1828             "poll(2) returned something unexpected for listen FD #%i.",
1829             pollfds[i].fd);
1830         continue;
1831       }
1832
1833       client_sd = (int *) malloc (sizeof (int));
1834       if (client_sd == NULL)
1835       {
1836         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1837         continue;
1838       }
1839
1840       client_sa_size = sizeof (client_sa);
1841       *client_sd = accept (pollfds[i].fd,
1842           (struct sockaddr *) &client_sa, &client_sa_size);
1843       if (*client_sd < 0)
1844       {
1845         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1846         continue;
1847       }
1848
1849       pthread_attr_init (&attr);
1850       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1851
1852       status = pthread_create (&tid, &attr, connection_thread_main,
1853           /* args = */ (void *) client_sd);
1854       if (status != 0)
1855       {
1856         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1857         close (*client_sd);
1858         free (client_sd);
1859         continue;
1860       }
1861     } /* for (pollfds_num) */
1862   } /* while (do_shutdown == 0) */
1863
1864   RRDD_LOG(LOG_INFO, "starting shutdown");
1865
1866   close_listen_sockets ();
1867
1868   pthread_mutex_lock (&connection_threads_lock);
1869   while (connection_threads_num > 0)
1870   {
1871     pthread_t wait_for;
1872
1873     wait_for = connection_threads[0];
1874
1875     pthread_mutex_unlock (&connection_threads_lock);
1876     pthread_join (wait_for, /* retval = */ NULL);
1877     pthread_mutex_lock (&connection_threads_lock);
1878   }
1879   pthread_mutex_unlock (&connection_threads_lock);
1880
1881   return (NULL);
1882 } /* }}} void *listen_thread_main */
1883
1884 static int daemonize (void) /* {{{ */
1885 {
1886   int status;
1887   int fd;
1888
1889   fd = open_pidfile();
1890   if (fd < 0) return fd;
1891
1892   if (!stay_foreground)
1893   {
1894     pid_t child;
1895     char *base_dir;
1896
1897     child = fork ();
1898     if (child < 0)
1899     {
1900       fprintf (stderr, "daemonize: fork(2) failed.\n");
1901       return (-1);
1902     }
1903     else if (child > 0)
1904     {
1905       return (1);
1906     }
1907
1908     /* Change into the /tmp directory. */
1909     base_dir = (config_base_dir != NULL)
1910       ? config_base_dir
1911       : "/tmp";
1912     status = chdir (base_dir);
1913     if (status != 0)
1914     {
1915       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1916       return (-1);
1917     }
1918
1919     /* Become session leader */
1920     setsid ();
1921
1922     /* Open the first three file descriptors to /dev/null */
1923     close (2);
1924     close (1);
1925     close (0);
1926
1927     open ("/dev/null", O_RDWR);
1928     dup (0);
1929     dup (0);
1930   } /* if (!stay_foreground) */
1931
1932   install_signal_handlers();
1933
1934   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1935   RRDD_LOG(LOG_INFO, "starting up");
1936
1937   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1938   if (cache_tree == NULL)
1939   {
1940     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1941     return (-1);
1942   }
1943
1944   status = write_pidfile (fd);
1945   return status;
1946 } /* }}} int daemonize */
1947
1948 static int cleanup (void) /* {{{ */
1949 {
1950   do_shutdown++;
1951
1952   pthread_cond_signal (&cache_cond);
1953   pthread_join (queue_thread, /* return = */ NULL);
1954
1955   remove_pidfile ();
1956
1957   RRDD_LOG(LOG_INFO, "goodbye");
1958   closelog ();
1959
1960   return (0);
1961 } /* }}} int cleanup */
1962
1963 static int read_options (int argc, char **argv) /* {{{ */
1964 {
1965   int option;
1966   int status = 0;
1967
1968   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1969   {
1970     switch (option)
1971     {
1972       case 'g':
1973         stay_foreground=1;
1974         break;
1975
1976       case 'l':
1977       {
1978         char **temp;
1979
1980         temp = (char **) realloc (config_listen_address_list,
1981             sizeof (char *) * (config_listen_address_list_len + 1));
1982         if (temp == NULL)
1983         {
1984           fprintf (stderr, "read_options: realloc failed.\n");
1985           return (2);
1986         }
1987         config_listen_address_list = temp;
1988
1989         temp[config_listen_address_list_len] = strdup (optarg);
1990         if (temp[config_listen_address_list_len] == NULL)
1991         {
1992           fprintf (stderr, "read_options: strdup failed.\n");
1993           return (2);
1994         }
1995         config_listen_address_list_len++;
1996       }
1997       break;
1998
1999       case 'f':
2000       {
2001         int temp;
2002
2003         temp = atoi (optarg);
2004         if (temp > 0)
2005           config_flush_interval = temp;
2006         else
2007         {
2008           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2009           status = 3;
2010         }
2011       }
2012       break;
2013
2014       case 'w':
2015       {
2016         int temp;
2017
2018         temp = atoi (optarg);
2019         if (temp > 0)
2020           config_write_interval = temp;
2021         else
2022         {
2023           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2024           status = 2;
2025         }
2026       }
2027       break;
2028
2029       case 'z':
2030       {
2031         int temp;
2032
2033         temp = atoi(optarg);
2034         if (temp > 0)
2035           config_write_jitter = temp;
2036         else
2037         {
2038           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2039           status = 2;
2040         }
2041
2042         break;
2043       }
2044
2045       case 'b':
2046       {
2047         size_t len;
2048
2049         if (config_base_dir != NULL)
2050           free (config_base_dir);
2051         config_base_dir = strdup (optarg);
2052         if (config_base_dir == NULL)
2053         {
2054           fprintf (stderr, "read_options: strdup failed.\n");
2055           return (3);
2056         }
2057
2058         len = strlen (config_base_dir);
2059         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2060         {
2061           config_base_dir[len - 1] = 0;
2062           len--;
2063         }
2064
2065         if (len < 1)
2066         {
2067           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2068           return (4);
2069         }
2070       }
2071       break;
2072
2073       case 'p':
2074       {
2075         if (config_pid_file != NULL)
2076           free (config_pid_file);
2077         config_pid_file = strdup (optarg);
2078         if (config_pid_file == NULL)
2079         {
2080           fprintf (stderr, "read_options: strdup failed.\n");
2081           return (3);
2082         }
2083       }
2084       break;
2085
2086       case 'j':
2087       {
2088         struct stat statbuf;
2089         const char *dir = optarg;
2090
2091         status = stat(dir, &statbuf);
2092         if (status != 0)
2093         {
2094           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2095           return 6;
2096         }
2097
2098         if (!S_ISDIR(statbuf.st_mode)
2099             || access(dir, R_OK|W_OK|X_OK) != 0)
2100         {
2101           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2102                   errno ? rrd_strerror(errno) : "");
2103           return 6;
2104         }
2105
2106         journal_cur = malloc(PATH_MAX + 1);
2107         journal_old = malloc(PATH_MAX + 1);
2108         if (journal_cur == NULL || journal_old == NULL)
2109         {
2110           fprintf(stderr, "malloc failure for journal files\n");
2111           return 6;
2112         }
2113         else 
2114         {
2115           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2116           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2117         }
2118       }
2119       break;
2120
2121       case 'h':
2122       case '?':
2123         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2124             "\n"
2125             "Usage: rrdcached [options]\n"
2126             "\n"
2127             "Valid options are:\n"
2128             "  -l <address>  Socket address to listen to.\n"
2129             "  -w <seconds>  Interval in which to write data.\n"
2130             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2131             "  -f <seconds>  Interval in which to flush dead data.\n"
2132             "  -p <file>     Location of the PID-file.\n"
2133             "  -b <dir>      Base directory to change to.\n"
2134             "  -g            Do not fork and run in the foreground.\n"
2135             "  -j <dir>      Directory in which to create the journal files.\n"
2136             "\n"
2137             "For more information and a detailed description of all options "
2138             "please refer\n"
2139             "to the rrdcached(1) manual page.\n",
2140             VERSION);
2141         status = -1;
2142         break;
2143     } /* switch (option) */
2144   } /* while (getopt) */
2145
2146   /* advise the user when values are not sane */
2147   if (config_flush_interval < 2 * config_write_interval)
2148     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2149             " 2x write interval (-w) !\n");
2150   if (config_write_jitter > config_write_interval)
2151     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2152             " write interval (-w) !\n");
2153
2154   return (status);
2155 } /* }}} int read_options */
2156
2157 int main (int argc, char **argv)
2158 {
2159   int status;
2160
2161   status = read_options (argc, argv);
2162   if (status != 0)
2163   {
2164     if (status < 0)
2165       status = 0;
2166     return (status);
2167   }
2168
2169   status = daemonize ();
2170   if (status == 1)
2171   {
2172     struct sigaction sigchld;
2173
2174     memset (&sigchld, 0, sizeof (sigchld));
2175     sigchld.sa_handler = SIG_IGN;
2176     sigaction (SIGCHLD, &sigchld, NULL);
2177
2178     return (0);
2179   }
2180   else if (status != 0)
2181   {
2182     fprintf (stderr, "daemonize failed, exiting.\n");
2183     return (1);
2184   }
2185
2186   if (journal_cur != NULL)
2187   {
2188     int had_journal = 0;
2189
2190     pthread_mutex_lock(&journal_lock);
2191
2192     RRDD_LOG(LOG_INFO, "checking for journal files");
2193
2194     had_journal += journal_replay(journal_old);
2195     had_journal += journal_replay(journal_cur);
2196
2197     if (had_journal)
2198       flush_old_values(-1);
2199
2200     pthread_mutex_unlock(&journal_lock);
2201     journal_rotate();
2202
2203     RRDD_LOG(LOG_INFO, "journal processing complete");
2204   }
2205
2206   /* start the queue thread */
2207   memset (&queue_thread, 0, sizeof (queue_thread));
2208   status = pthread_create (&queue_thread,
2209                            NULL, /* attr */
2210                            queue_thread_main,
2211                            NULL); /* args */
2212   if (status != 0)
2213   {
2214     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2215     cleanup();
2216     return (1);
2217   }
2218
2219   listen_thread_main (NULL);
2220   cleanup ();
2221
2222   return (0);
2223 } /* int main */
2224
2225 /*
2226  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2227  */