Big bunch of improvements for the caching daemon.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  *   kevin brintnall <kbrint@rufus.net>
21  **/
22
23 #ifndef _REENTRANT
24 # define _REENTRANT
25 #endif
26
27 #ifndef _THREAD_SAFE
28 # define _THREAD_SAFE
29 #endif
30
31 /* }}} */
32
33 /*
34  * Now for some includes..
35  */
36 #include "rrd.h" /* {{{ */
37 #include "rrd_client.h"
38
39 #include <stdlib.h>
40 #include <stdint.h>
41 #include <stdio.h>
42 #include <unistd.h>
43 #include <string.h>
44 #include <strings.h>
45 #include <stdint.h>
46 #include <inttypes.h>
47
48 #include <sys/types.h>
49 #include <sys/stat.h>
50 #include <fcntl.h>
51 #include <signal.h>
52 #include <sys/socket.h>
53 #include <sys/un.h>
54 #include <netdb.h>
55 #include <poll.h>
56 #include <syslog.h>
57 #include <pthread.h>
58 #include <errno.h>
59 #include <assert.h>
60 #include <sys/time.h>
61 #include <time.h>
62
63 #include <glib-2.0/glib.h>
64 /* }}} */
65
66 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
67
68 #ifndef __GNUC__
69 # define __attribute__(x) /**/
70 #endif
71
72 /*
73  * Types
74  */
75 struct listen_socket_s
76 {
77   int fd;
78   char path[PATH_MAX + 1];
79 };
80 typedef struct listen_socket_s listen_socket_t;
81
82 struct cache_item_s;
83 typedef struct cache_item_s cache_item_t;
84 struct cache_item_s
85 {
86   char *file;
87   char **values;
88   int values_num;
89   time_t last_flush_time;
90 #define CI_FLAGS_IN_TREE  (1<<0)
91 #define CI_FLAGS_IN_QUEUE (1<<1)
92   int flags;
93
94   cache_item_t *next;
95 };
96
97 struct callback_flush_data_s
98 {
99   time_t now;
100   time_t abs_timeout;
101   char **keys;
102   size_t keys_num;
103 };
104 typedef struct callback_flush_data_s callback_flush_data_t;
105
106 enum queue_side_e
107 {
108   HEAD,
109   TAIL
110 };
111 typedef enum queue_side_e queue_side_t;
112
113 /* max length of socket command or response */
114 #define CMD_MAX 4096
115
116 /*
117  * Variables
118  */
119 static int stay_foreground = 0;
120
121 static listen_socket_t *listen_fds = NULL;
122 static size_t listen_fds_num = 0;
123
124 static int do_shutdown = 0;
125
126 static pthread_t queue_thread;
127
128 static pthread_t *connection_threads = NULL;
129 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
130 static int connection_threads_num = 0;
131
132 /* Cache stuff */
133 static GTree          *cache_tree = NULL;
134 static cache_item_t   *cache_queue_head = NULL;
135 static cache_item_t   *cache_queue_tail = NULL;
136 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
137 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
138
139 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
140
141 static int config_write_interval = 300;
142 static int config_write_jitter   = 0;
143 static int config_flush_interval = 3600;
144 static char *config_pid_file = NULL;
145 static char *config_base_dir = NULL;
146
147 static char **config_listen_address_list = NULL;
148 static int config_listen_address_list_len = 0;
149
150 static uint64_t stats_queue_length = 0;
151 static uint64_t stats_updates_received = 0;
152 static uint64_t stats_flush_received = 0;
153 static uint64_t stats_updates_written = 0;
154 static uint64_t stats_data_sets_written = 0;
155 static uint64_t stats_journal_bytes = 0;
156 static uint64_t stats_journal_rotate = 0;
157 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
158
159 /* Journaled updates */
160 static char *journal_cur = NULL;
161 static char *journal_old = NULL;
162 static FILE *journal_fh = NULL;
163 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
164 static int journal_write(char *cmd, char *args);
165 static void journal_done(void);
166 static void journal_rotate(void);
167
168 /* 
169  * Functions
170  */
171 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
172 {
173   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
174   do_shutdown++;
175   pthread_cond_broadcast(&cache_cond);
176 } /* }}} void sig_int_handler */
177
178 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
179 {
180   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
181   do_shutdown++;
182   pthread_cond_broadcast(&cache_cond);
183 } /* }}} void sig_term_handler */
184
185 static int write_pidfile (void) /* {{{ */
186 {
187   pid_t pid;
188   char *file;
189   int fd;
190   FILE *fh;
191
192   pid = getpid ();
193   
194   file = (config_pid_file != NULL)
195     ? config_pid_file
196     : LOCALSTATEDIR "/run/rrdcached.pid";
197
198   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
199   if (fd < 0)
200   {
201     RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
202              file, rrd_strerror(errno));
203     return (-1);
204   }
205
206   fh = fdopen (fd, "w");
207   if (fh == NULL)
208   {
209     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
210     close(fd);
211     return (-1);
212   }
213
214   fprintf (fh, "%i\n", (int) pid);
215   fclose (fh);
216
217   return (0);
218 } /* }}} int write_pidfile */
219
220 static int remove_pidfile (void) /* {{{ */
221 {
222   char *file;
223   int status;
224
225   file = (config_pid_file != NULL)
226     ? config_pid_file
227     : LOCALSTATEDIR "/run/rrdcached.pid";
228
229   status = unlink (file);
230   if (status == 0)
231     return (0);
232   return (errno);
233 } /* }}} int remove_pidfile */
234
235 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
236 {
237   char    *buffer;
238   size_t   buffer_used;
239   size_t   buffer_free;
240   ssize_t  status;
241
242   buffer       = (char *) buffer_void;
243   buffer_used  = 0;
244   buffer_free  = buffer_size;
245
246   while (buffer_free > 0)
247   {
248     status = read (fd, buffer + buffer_used, buffer_free);
249     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
250       continue;
251
252     if (status < 0)
253       return (-1);
254
255     if (status == 0)
256       return (0);
257
258     assert ((0 > status) || (buffer_free >= (size_t) status));
259
260     buffer_free = buffer_free - status;
261     buffer_used = buffer_used + status;
262
263     if (buffer[buffer_used - 1] == '\n')
264       break;
265   }
266
267   assert (buffer_used > 0);
268
269   if (buffer[buffer_used - 1] != '\n')
270   {
271     errno = ENOBUFS;
272     return (-1);
273   }
274
275   buffer[buffer_used - 1] = 0;
276
277   /* Fix network line endings. */
278   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
279   {
280     buffer_used--;
281     buffer[buffer_used - 1] = 0;
282   }
283
284   return (buffer_used);
285 } /* }}} ssize_t sread */
286
287 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
288 {
289   const char *ptr;
290   size_t      nleft;
291   ssize_t     status;
292
293   /* special case for journal replay */
294   if (fd < 0) return 0;
295
296   ptr   = (const char *) buf;
297   nleft = count;
298
299   while (nleft > 0)
300   {
301     status = write (fd, (const void *) ptr, nleft);
302
303     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
304       continue;
305
306     if (status < 0)
307       return (status);
308
309     nleft -= status;
310     ptr   += status;
311   }
312
313   return (0);
314 } /* }}} ssize_t swrite */
315
316 static void _wipe_ci_values(cache_item_t *ci, time_t when)
317 {
318   ci->values = NULL;
319   ci->values_num = 0;
320
321   ci->last_flush_time = when;
322   if (config_write_jitter > 0)
323     ci->last_flush_time += (random() % config_write_jitter);
324
325   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
326 }
327
328 /*
329  * enqueue_cache_item:
330  * `cache_lock' must be acquired before calling this function!
331  */
332 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
333     queue_side_t side)
334 {
335   int did_insert = 0;
336
337   if (ci == NULL)
338     return (-1);
339
340   if (ci->values_num == 0)
341     return (0);
342
343   if (side == HEAD)
344   {
345     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
346     {
347       assert (ci->next == NULL);
348       ci->next = cache_queue_head;
349       cache_queue_head = ci;
350
351       if (cache_queue_tail == NULL)
352         cache_queue_tail = cache_queue_head;
353
354       did_insert = 1;
355     }
356     else if (cache_queue_head == ci)
357     {
358       /* do nothing */
359     }
360     else /* enqueued, but not first entry */
361     {
362       cache_item_t *prev;
363
364       /* find previous entry */
365       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
366         if (prev->next == ci)
367           break;
368       assert (prev != NULL);
369
370       /* move to the front */
371       prev->next = ci->next;
372       ci->next = cache_queue_head;
373       cache_queue_head = ci;
374
375       /* check if we need to adapt the tail */
376       if (cache_queue_tail == ci)
377         cache_queue_tail = prev;
378     }
379   }
380   else /* (side == TAIL) */
381   {
382     /* We don't move values back in the list.. */
383     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
384       return (0);
385
386     assert (ci->next == NULL);
387
388     if (cache_queue_tail == NULL)
389       cache_queue_head = ci;
390     else
391       cache_queue_tail->next = ci;
392     cache_queue_tail = ci;
393
394     did_insert = 1;
395   }
396
397   ci->flags |= CI_FLAGS_IN_QUEUE;
398
399   if (did_insert)
400   {
401     pthread_mutex_lock (&stats_lock);
402     stats_queue_length++;
403     pthread_mutex_unlock (&stats_lock);
404   }
405
406   return (0);
407 } /* }}} int enqueue_cache_item */
408
409 /*
410  * tree_callback_flush:
411  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
412  * while this is in progress.
413  */
414 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
415     gpointer data)
416 {
417   cache_item_t *ci;
418   callback_flush_data_t *cfd;
419
420   ci = (cache_item_t *) value;
421   cfd = (callback_flush_data_t *) data;
422
423   if ((ci->last_flush_time <= cfd->abs_timeout)
424       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
425       && (ci->values_num > 0))
426   {
427     enqueue_cache_item (ci, TAIL);
428   }
429   else if ((do_shutdown != 0)
430       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
431       && (ci->values_num > 0))
432   {
433     enqueue_cache_item (ci, TAIL);
434   }
435   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
436       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
437       && (ci->values_num <= 0))
438   {
439     char **temp;
440
441     temp = (char **) realloc (cfd->keys,
442         sizeof (char *) * (cfd->keys_num + 1));
443     if (temp == NULL)
444     {
445       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
446       return (FALSE);
447     }
448     cfd->keys = temp;
449     /* Make really sure this points to the _same_ place */
450     assert ((char *) key == ci->file);
451     cfd->keys[cfd->keys_num] = (char *) key;
452     cfd->keys_num++;
453   }
454
455   return (FALSE);
456 } /* }}} gboolean tree_callback_flush */
457
458 static int flush_old_values (int max_age)
459 {
460   callback_flush_data_t cfd;
461   size_t k;
462
463   memset (&cfd, 0, sizeof (cfd));
464   /* Pass the current time as user data so that we don't need to call
465    * `time' for each node. */
466   cfd.now = time (NULL);
467   cfd.keys = NULL;
468   cfd.keys_num = 0;
469
470   if (max_age > 0)
471     cfd.abs_timeout = cfd.now - max_age;
472   else
473     cfd.abs_timeout = cfd.now + 1;
474
475   /* `tree_callback_flush' will return the keys of all values that haven't
476    * been touched in the last `config_flush_interval' seconds in `cfd'.
477    * The char*'s in this array point to the same memory as ci->file, so we
478    * don't need to free them separately. */
479   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
480
481   for (k = 0; k < cfd.keys_num; k++)
482   {
483     cache_item_t *ci;
484
485     /* This must not fail. */
486     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
487     assert (ci != NULL);
488
489     /* If we end up here with values available, something's seriously
490      * messed up. */
491     assert (ci->values_num == 0);
492
493     /* Remove the node from the tree */
494     g_tree_remove (cache_tree, cfd.keys[k]);
495     cfd.keys[k] = NULL;
496
497     /* Now free and clean up `ci'. */
498     free (ci->file);
499     ci->file = NULL;
500     free (ci);
501     ci = NULL;
502   } /* for (k = 0; k < cfd.keys_num; k++) */
503
504   if (cfd.keys != NULL)
505   {
506     free (cfd.keys);
507     cfd.keys = NULL;
508   }
509
510   return (0);
511 } /* int flush_old_values */
512
513 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
514 {
515   struct timeval now;
516   struct timespec next_flush;
517
518   gettimeofday (&now, NULL);
519   next_flush.tv_sec = now.tv_sec + config_flush_interval;
520   next_flush.tv_nsec = 1000 * now.tv_usec;
521
522   pthread_mutex_lock (&cache_lock);
523   while ((do_shutdown == 0) || (cache_queue_head != NULL))
524   {
525     cache_item_t *ci;
526     char *file;
527     char **values;
528     int values_num;
529     int status;
530     int i;
531
532     /* First, check if it's time to do the cache flush. */
533     gettimeofday (&now, NULL);
534     if ((now.tv_sec > next_flush.tv_sec)
535         || ((now.tv_sec == next_flush.tv_sec)
536           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
537     {
538       /* Flush all values that haven't been written in the last
539        * `config_write_interval' seconds. */
540       flush_old_values (config_write_interval);
541
542       /* Determine the time of the next cache flush. */
543       while (next_flush.tv_sec <= now.tv_sec)
544         next_flush.tv_sec += config_flush_interval;
545
546       /* unlock the cache while we rotate so we don't block incoming
547        * updates if the fsync() blocks on disk I/O */
548       pthread_mutex_unlock(&cache_lock);
549       journal_rotate();
550       pthread_mutex_lock(&cache_lock);
551     }
552
553     /* Now, check if there's something to store away. If not, wait until
554      * something comes in or it's time to do the cache flush. */
555     if (cache_queue_head == NULL)
556     {
557       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
558       if ((status != 0) && (status != ETIMEDOUT))
559       {
560         RRDD_LOG (LOG_ERR, "queue_thread_main: "
561             "pthread_cond_timedwait returned %i.", status);
562       }
563     }
564
565     /* We're about to shut down, so lets flush the entire tree. */
566     if ((do_shutdown != 0) && (cache_queue_head == NULL))
567       flush_old_values (/* max age = */ -1);
568
569     /* Check if a value has arrived. This may be NULL if we timed out or there
570      * was an interrupt such as a signal. */
571     if (cache_queue_head == NULL)
572       continue;
573
574     ci = cache_queue_head;
575
576     /* copy the relevant parts */
577     file = strdup (ci->file);
578     if (file == NULL)
579     {
580       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
581       continue;
582     }
583
584     assert(ci->values != NULL);
585     assert(ci->values_num > 0);
586
587     values = ci->values;
588     values_num = ci->values_num;
589
590     _wipe_ci_values(ci, time(NULL));
591
592     cache_queue_head = ci->next;
593     if (cache_queue_head == NULL)
594       cache_queue_tail = NULL;
595     ci->next = NULL;
596
597     pthread_mutex_lock (&stats_lock);
598     assert (stats_queue_length > 0);
599     stats_queue_length--;
600     pthread_mutex_unlock (&stats_lock);
601
602     pthread_mutex_unlock (&cache_lock);
603
604     rrd_clear_error ();
605     status = rrd_update_r (file, NULL, values_num, (void *) values);
606     if (status != 0)
607     {
608       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
609           "rrd_update_r (%s) failed with status %i. (%s)",
610           file, status, rrd_get_error());
611     }
612
613     journal_write("wrote", file);
614
615     for (i = 0; i < values_num; i++)
616       free (values[i]);
617
618     free(values);
619     free(file);
620
621     if (status == 0)
622     {
623       pthread_mutex_lock (&stats_lock);
624       stats_updates_written++;
625       stats_data_sets_written += values_num;
626       pthread_mutex_unlock (&stats_lock);
627     }
628
629     pthread_mutex_lock (&cache_lock);
630     pthread_cond_broadcast (&flush_cond);
631
632     /* We're about to shut down, so lets flush the entire tree. */
633     if ((do_shutdown != 0) && (cache_queue_head == NULL))
634       flush_old_values (/* max age = */ -1);
635   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
636   pthread_mutex_unlock (&cache_lock);
637
638   assert(cache_queue_head == NULL);
639   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
640   journal_done();
641
642   return (NULL);
643 } /* }}} void *queue_thread_main */
644
645 static int buffer_get_field (char **buffer_ret, /* {{{ */
646     size_t *buffer_size_ret, char **field_ret)
647 {
648   char *buffer;
649   size_t buffer_pos;
650   size_t buffer_size;
651   char *field;
652   size_t field_size;
653   int status;
654
655   buffer = *buffer_ret;
656   buffer_pos = 0;
657   buffer_size = *buffer_size_ret;
658   field = *buffer_ret;
659   field_size = 0;
660
661   if (buffer_size <= 0)
662     return (-1);
663
664   /* This is ensured by `handle_request'. */
665   assert (buffer[buffer_size - 1] == '\0');
666
667   status = -1;
668   while (buffer_pos < buffer_size)
669   {
670     /* Check for end-of-field or end-of-buffer */
671     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
672     {
673       field[field_size] = 0;
674       field_size++;
675       buffer_pos++;
676       status = 0;
677       break;
678     }
679     /* Handle escaped characters. */
680     else if (buffer[buffer_pos] == '\\')
681     {
682       if (buffer_pos >= (buffer_size - 1))
683         break;
684       buffer_pos++;
685       field[field_size] = buffer[buffer_pos];
686       field_size++;
687       buffer_pos++;
688     }
689     /* Normal operation */ 
690     else
691     {
692       field[field_size] = buffer[buffer_pos];
693       field_size++;
694       buffer_pos++;
695     }
696   } /* while (buffer_pos < buffer_size) */
697
698   if (status != 0)
699     return (status);
700
701   *buffer_ret = buffer + buffer_pos;
702   *buffer_size_ret = buffer_size - buffer_pos;
703   *field_ret = field;
704
705   return (0);
706 } /* }}} int buffer_get_field */
707
708 static int flush_file (const char *filename) /* {{{ */
709 {
710   cache_item_t *ci;
711
712   pthread_mutex_lock (&cache_lock);
713
714   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
715   if (ci == NULL)
716   {
717     pthread_mutex_unlock (&cache_lock);
718     return (ENOENT);
719   }
720
721   /* Enqueue at head */
722   enqueue_cache_item (ci, HEAD);
723   pthread_cond_signal (&cache_cond);
724
725   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
726   {
727     ci = NULL;
728
729     pthread_cond_wait (&flush_cond, &cache_lock);
730
731     ci = g_tree_lookup (cache_tree, filename);
732     if (ci == NULL)
733     {
734       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
735           "while waiting for flush.");
736       pthread_mutex_unlock (&cache_lock);
737       return (-1);
738     }
739   }
740
741   pthread_mutex_unlock (&cache_lock);
742   return (0);
743 } /* }}} int flush_file */
744
745 static int handle_request_help (int fd, /* {{{ */
746     char *buffer, size_t buffer_size)
747 {
748   int status;
749   char **help_text;
750   size_t help_text_len;
751   char *command;
752   size_t i;
753
754   char *help_help[] =
755   {
756     "4 Command overview\n",
757     "FLUSH <filename>\n",
758     "HELP [<command>]\n",
759     "UPDATE <filename> <values> [<values> ...]\n",
760     "STATS\n"
761   };
762   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
763
764   char *help_flush[] =
765   {
766     "4 Help for FLUSH\n",
767     "Usage: FLUSH <filename>\n",
768     "\n",
769     "Adds the given filename to the head of the update queue and returns\n",
770     "after is has been dequeued.\n"
771   };
772   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
773
774   char *help_update[] =
775   {
776     "9 Help for UPDATE\n",
777     "Usage: UPDATE <filename> <values> [<values> ...]\n"
778     "\n",
779     "Adds the given file to the internal cache if it is not yet known and\n",
780     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
781     "for details.\n",
782     "\n",
783     "Each <values> has the following form:\n",
784     "  <values> = <time>:<value>[:<value>[...]]\n",
785     "See the rrdupdate(1) manpage for details.\n"
786   };
787   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
788
789   char *help_stats[] =
790   {
791     "4 Help for STATS\n",
792     "Usage: STATS\n",
793     "\n",
794     "Returns some performance counters, see the rrdcached(1) manpage for\n",
795     "a description of the values.\n"
796   };
797   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
798
799   status = buffer_get_field (&buffer, &buffer_size, &command);
800   if (status != 0)
801   {
802     help_text = help_help;
803     help_text_len = help_help_len;
804   }
805   else
806   {
807     if (strcasecmp (command, "update") == 0)
808     {
809       help_text = help_update;
810       help_text_len = help_update_len;
811     }
812     else if (strcasecmp (command, "flush") == 0)
813     {
814       help_text = help_flush;
815       help_text_len = help_flush_len;
816     }
817     else if (strcasecmp (command, "stats") == 0)
818     {
819       help_text = help_stats;
820       help_text_len = help_stats_len;
821     }
822     else
823     {
824       help_text = help_help;
825       help_text_len = help_help_len;
826     }
827   }
828
829   for (i = 0; i < help_text_len; i++)
830   {
831     status = swrite (fd, help_text[i], strlen (help_text[i]));
832     if (status < 0)
833     {
834       status = errno;
835       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
836       return (status);
837     }
838   }
839
840   return (0);
841 } /* }}} int handle_request_help */
842
843 static int handle_request_stats (int fd, /* {{{ */
844     char *buffer __attribute__((unused)),
845     size_t buffer_size __attribute__((unused)))
846 {
847   int status;
848   char outbuf[CMD_MAX];
849
850   uint64_t copy_queue_length;
851   uint64_t copy_updates_received;
852   uint64_t copy_flush_received;
853   uint64_t copy_updates_written;
854   uint64_t copy_data_sets_written;
855   uint64_t copy_journal_bytes;
856   uint64_t copy_journal_rotate;
857
858   uint64_t tree_nodes_number;
859   uint64_t tree_depth;
860
861   pthread_mutex_lock (&stats_lock);
862   copy_queue_length       = stats_queue_length;
863   copy_updates_received   = stats_updates_received;
864   copy_flush_received     = stats_flush_received;
865   copy_updates_written    = stats_updates_written;
866   copy_data_sets_written  = stats_data_sets_written;
867   copy_journal_bytes      = stats_journal_bytes;
868   copy_journal_rotate     = stats_journal_rotate;
869   pthread_mutex_unlock (&stats_lock);
870
871   pthread_mutex_lock (&cache_lock);
872   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
873   tree_depth        = (uint64_t) g_tree_height (cache_tree);
874   pthread_mutex_unlock (&cache_lock);
875
876 #define RRDD_STATS_SEND \
877   outbuf[sizeof (outbuf) - 1] = 0; \
878   status = swrite (fd, outbuf, strlen (outbuf)); \
879   if (status < 0) \
880   { \
881     status = errno; \
882     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
883     return (status); \
884   }
885
886   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
887   RRDD_STATS_SEND;
888
889   snprintf (outbuf, sizeof (outbuf),
890       "QueueLength: %"PRIu64"\n", copy_queue_length);
891   RRDD_STATS_SEND;
892
893   snprintf (outbuf, sizeof (outbuf),
894       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
895   RRDD_STATS_SEND;
896
897   snprintf (outbuf, sizeof (outbuf),
898       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
899   RRDD_STATS_SEND;
900
901   snprintf (outbuf, sizeof (outbuf),
902       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
903   RRDD_STATS_SEND;
904
905   snprintf (outbuf, sizeof (outbuf),
906       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
907   RRDD_STATS_SEND;
908
909   snprintf (outbuf, sizeof (outbuf),
910       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
911   RRDD_STATS_SEND;
912
913   snprintf (outbuf, sizeof (outbuf),
914       "TreeDepth: %"PRIu64"\n", tree_depth);
915   RRDD_STATS_SEND;
916
917   snprintf (outbuf, sizeof(outbuf),
918       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
919   RRDD_STATS_SEND;
920
921   snprintf (outbuf, sizeof(outbuf),
922       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
923   RRDD_STATS_SEND;
924
925   return (0);
926 #undef RRDD_STATS_SEND
927 } /* }}} int handle_request_stats */
928
929 static int handle_request_flush (int fd, /* {{{ */
930     char *buffer, size_t buffer_size)
931 {
932   char *file;
933   int status;
934   char result[CMD_MAX];
935
936   status = buffer_get_field (&buffer, &buffer_size, &file);
937   if (status != 0)
938   {
939     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
940   }
941   else
942   {
943     pthread_mutex_lock(&stats_lock);
944     stats_flush_received++;
945     pthread_mutex_unlock(&stats_lock);
946
947     status = flush_file (file);
948     if (status == 0)
949       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
950     else if (status == ENOENT)
951     {
952       /* no file in our tree; see whether it exists at all */
953       struct stat statbuf;
954
955       memset(&statbuf, 0, sizeof(statbuf));
956       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
957         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
958       else
959         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
960     }
961     else if (status < 0)
962       strncpy (result, "-1 Internal error.\n", sizeof (result));
963     else
964       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
965   }
966   result[sizeof (result) - 1] = 0;
967
968   status = swrite (fd, result, strlen (result));
969   if (status < 0)
970   {
971     status = errno;
972     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
973     return (status);
974   }
975
976   return (0);
977 } /* }}} int handle_request_flush */
978
979 static int handle_request_update (int fd, /* {{{ */
980     char *buffer, size_t buffer_size)
981 {
982   char *file;
983   int values_num = 0;
984   int status;
985
986   time_t now;
987
988   cache_item_t *ci;
989   char answer[CMD_MAX];
990
991 #define RRDD_UPDATE_SEND \
992   answer[sizeof (answer) - 1] = 0; \
993   status = swrite (fd, answer, strlen (answer)); \
994   if (status < 0) \
995   { \
996     status = errno; \
997     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
998     return (status); \
999   }
1000
1001   now = time (NULL);
1002
1003   status = buffer_get_field (&buffer, &buffer_size, &file);
1004   if (status != 0)
1005   {
1006     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1007         sizeof (answer));
1008     RRDD_UPDATE_SEND;
1009     return (0);
1010   }
1011
1012   pthread_mutex_lock(&stats_lock);
1013   stats_updates_received++;
1014   pthread_mutex_unlock(&stats_lock);
1015
1016   pthread_mutex_lock (&cache_lock);
1017
1018   ci = g_tree_lookup (cache_tree, file);
1019   if (ci == NULL) /* {{{ */
1020   {
1021     struct stat statbuf;
1022
1023     memset (&statbuf, 0, sizeof (statbuf));
1024     status = stat (file, &statbuf);
1025     if (status != 0)
1026     {
1027       pthread_mutex_unlock (&cache_lock);
1028       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1029
1030       status = errno;
1031       if (status == ENOENT)
1032         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1033       else
1034         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1035             status);
1036       RRDD_UPDATE_SEND;
1037       return (0);
1038     }
1039     if (!S_ISREG (statbuf.st_mode))
1040     {
1041       pthread_mutex_unlock (&cache_lock);
1042
1043       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1044       RRDD_UPDATE_SEND;
1045       return (0);
1046     }
1047     if (access(file, R_OK|W_OK) != 0)
1048     {
1049       pthread_mutex_unlock (&cache_lock);
1050
1051       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1052                 file, rrd_strerror(errno));
1053       RRDD_UPDATE_SEND;
1054       return (0);
1055     }
1056
1057     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1058     if (ci == NULL)
1059     {
1060       pthread_mutex_unlock (&cache_lock);
1061       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1062
1063       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1064       RRDD_UPDATE_SEND;
1065       return (0);
1066     }
1067     memset (ci, 0, sizeof (cache_item_t));
1068
1069     ci->file = strdup (file);
1070     if (ci->file == NULL)
1071     {
1072       pthread_mutex_unlock (&cache_lock);
1073       free (ci);
1074       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1075
1076       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1077       RRDD_UPDATE_SEND;
1078       return (0);
1079     }
1080
1081     _wipe_ci_values(ci, now);
1082     ci->flags = CI_FLAGS_IN_TREE;
1083
1084     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1085   } /* }}} */
1086   assert (ci != NULL);
1087
1088   while (buffer_size > 0)
1089   {
1090     char **temp;
1091     char *value;
1092
1093     status = buffer_get_field (&buffer, &buffer_size, &value);
1094     if (status != 0)
1095     {
1096       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1097       break;
1098     }
1099
1100     temp = (char **) realloc (ci->values,
1101         sizeof (char *) * (ci->values_num + 1));
1102     if (temp == NULL)
1103     {
1104       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1105       continue;
1106     }
1107     ci->values = temp;
1108
1109     ci->values[ci->values_num] = strdup (value);
1110     if (ci->values[ci->values_num] == NULL)
1111     {
1112       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1113       continue;
1114     }
1115     ci->values_num++;
1116
1117     values_num++;
1118   }
1119
1120   if (((now - ci->last_flush_time) >= config_write_interval)
1121       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1122       && (ci->values_num > 0))
1123   {
1124     enqueue_cache_item (ci, TAIL);
1125     pthread_cond_signal (&cache_cond);
1126   }
1127
1128   pthread_mutex_unlock (&cache_lock);
1129
1130   if (values_num < 1)
1131   {
1132     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1133   }
1134   else
1135   {
1136     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1137         (values_num == 1) ? "" : "s");
1138   }
1139   RRDD_UPDATE_SEND;
1140   return (0);
1141 #undef RRDD_UPDATE_SEND
1142 } /* }}} int handle_request_update */
1143
1144 /* we came across a "WROTE" entry during journal replay.
1145  * throw away any values that we have accumulated for this file
1146  */
1147 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1148                                  const char *buffer,
1149                                  size_t buffer_size __attribute__((unused)))
1150 {
1151   int i;
1152   cache_item_t *ci;
1153   const char *file = buffer;
1154
1155   pthread_mutex_lock(&cache_lock);
1156
1157   ci = g_tree_lookup(cache_tree, file);
1158   if (ci == NULL)
1159     goto out;
1160
1161   if (ci->values)
1162   {
1163     for (i=0; i < ci->values_num; i++)
1164       free(ci->values[i]);
1165
1166     free(ci->values);
1167   }
1168
1169   _wipe_ci_values(ci, time(NULL));
1170
1171 out:
1172   pthread_mutex_unlock(&cache_lock);
1173   return 0;
1174 }
1175
1176 /* if fd < 0, we are in journal replay mode */
1177 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1178 {
1179   char *buffer_ptr;
1180   char *command;
1181   int status;
1182
1183   assert (buffer[buffer_size - 1] == '\0');
1184
1185   buffer_ptr = buffer;
1186   command = NULL;
1187   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1188   if (status != 0)
1189   {
1190     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1191     return (-1);
1192   }
1193
1194   if (strcasecmp (command, "update") == 0)
1195   {
1196     /* don't re-write updates in replay mode */
1197     if (fd >= 0)
1198       journal_write(command, buffer_ptr);
1199
1200     return (handle_request_update (fd, buffer_ptr, buffer_size));
1201   }
1202   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1203   {
1204     /* this is only valid in replay mode */
1205     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1206   }
1207   else if (strcasecmp (command, "flush") == 0)
1208   {
1209     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1210   }
1211   else if (strcasecmp (command, "stats") == 0)
1212   {
1213     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1214   }
1215   else if (strcasecmp (command, "help") == 0)
1216   {
1217     return (handle_request_help (fd, buffer_ptr, buffer_size));
1218   }
1219   else
1220   {
1221     char result[CMD_MAX];
1222
1223     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1224     result[sizeof (result) - 1] = 0;
1225
1226     status = swrite (fd, result, strlen (result));
1227     if (status < 0)
1228     {
1229       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1230       return (-1);
1231     }
1232   }
1233
1234   return (0);
1235 } /* }}} int handle_request */
1236
1237 /* MUST NOT hold journal_lock before calling this */
1238 static void journal_rotate(void) /* {{{ */
1239 {
1240   FILE *old_fh = NULL;
1241
1242   if (journal_cur == NULL || journal_old == NULL)
1243     return;
1244
1245   pthread_mutex_lock(&journal_lock);
1246
1247   /* we rotate this way (rename before close) so that the we can release
1248    * the journal lock as fast as possible.  Journal writes to the new
1249    * journal can proceed immediately after the new file is opened.  The
1250    * fclose can then block without affecting new updates.
1251    */
1252   if (journal_fh != NULL)
1253   {
1254     old_fh = journal_fh;
1255     rename(journal_cur, journal_old);
1256     ++stats_journal_rotate;
1257   }
1258
1259   journal_fh = fopen(journal_cur, "a");
1260   pthread_mutex_unlock(&journal_lock);
1261
1262   if (old_fh != NULL)
1263     fclose(old_fh);
1264
1265   if (journal_fh == NULL)
1266     RRDD_LOG(LOG_CRIT,
1267              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1268              journal_cur, rrd_strerror(errno));
1269
1270 } /* }}} static void journal_rotate */
1271
1272 static void journal_done(void) /* {{{ */
1273 {
1274   if (journal_cur == NULL)
1275     return;
1276
1277   pthread_mutex_lock(&journal_lock);
1278   if (journal_fh != NULL)
1279   {
1280     fclose(journal_fh);
1281     journal_fh = NULL;
1282   }
1283
1284   RRDD_LOG(LOG_INFO, "removing journals");
1285
1286   unlink(journal_old);
1287   unlink(journal_cur);
1288   pthread_mutex_unlock(&journal_lock);
1289
1290 } /* }}} static void journal_done */
1291
1292 static int journal_write(char *cmd, char *args) /* {{{ */
1293 {
1294   int chars;
1295
1296   if (journal_fh == NULL)
1297     return 0;
1298
1299   pthread_mutex_lock(&journal_lock);
1300   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1301   pthread_mutex_unlock(&journal_lock);
1302
1303   if (chars > 0)
1304   {
1305     pthread_mutex_lock(&stats_lock);
1306     stats_journal_bytes += chars;
1307     pthread_mutex_unlock(&stats_lock);
1308   }
1309
1310   return chars;
1311 } /* }}} static int journal_write */
1312
1313 static int journal_replay (const char *file) /* {{{ */
1314 {
1315   FILE *fh;
1316   int entry_cnt = 0;
1317   int fail_cnt = 0;
1318   uint64_t line = 0;
1319   char entry[CMD_MAX];
1320
1321   if (file == NULL) return 0;
1322
1323   fh = fopen(file, "r");
1324   if (fh == NULL)
1325   {
1326     if (errno != ENOENT)
1327       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1328                file, rrd_strerror(errno));
1329     return 0;
1330   }
1331   else
1332     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1333
1334   while(!feof(fh))
1335   {
1336     size_t entry_len;
1337
1338     ++line;
1339     fgets(entry, sizeof(entry), fh);
1340     entry_len = strlen(entry);
1341
1342     /* check \n termination in case journal writing crashed mid-line */
1343     if (entry_len == 0)
1344       continue;
1345     else if (entry[entry_len - 1] != '\n')
1346     {
1347       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1348       ++fail_cnt;
1349       continue;
1350     }
1351
1352     entry[entry_len - 1] = '\0';
1353
1354     if (handle_request(-1, entry, entry_len) == 0)
1355       ++entry_cnt;
1356     else
1357       ++fail_cnt;
1358   }
1359
1360   fclose(fh);
1361
1362   if (entry_cnt > 0)
1363   {
1364     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1365              entry_cnt, fail_cnt);
1366     return 1;
1367   }
1368   else
1369     return 0;
1370
1371 } /* }}} static int journal_replay */
1372
1373 static void *connection_thread_main (void *args) /* {{{ */
1374 {
1375   pthread_t self;
1376   int i;
1377   int fd;
1378   
1379   fd = *((int *) args);
1380   free (args);
1381
1382   pthread_mutex_lock (&connection_threads_lock);
1383   {
1384     pthread_t *temp;
1385
1386     temp = (pthread_t *) realloc (connection_threads,
1387         sizeof (pthread_t) * (connection_threads_num + 1));
1388     if (temp == NULL)
1389     {
1390       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1391     }
1392     else
1393     {
1394       connection_threads = temp;
1395       connection_threads[connection_threads_num] = pthread_self ();
1396       connection_threads_num++;
1397     }
1398   }
1399   pthread_mutex_unlock (&connection_threads_lock);
1400
1401   while (do_shutdown == 0)
1402   {
1403     char buffer[CMD_MAX];
1404
1405     struct pollfd pollfd;
1406     int status;
1407
1408     pollfd.fd = fd;
1409     pollfd.events = POLLIN | POLLPRI;
1410     pollfd.revents = 0;
1411
1412     status = poll (&pollfd, 1, /* timeout = */ 500);
1413     if (status == 0) /* timeout */
1414       continue;
1415     else if (status < 0) /* error */
1416     {
1417       status = errno;
1418       if (status == EINTR)
1419         continue;
1420       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1421       continue;
1422     }
1423
1424     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1425     {
1426       close (fd);
1427       break;
1428     }
1429     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1430     {
1431       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1432           "poll(2) returned something unexpected: %#04hx",
1433           pollfd.revents);
1434       close (fd);
1435       break;
1436     }
1437
1438     status = (int) sread (fd, buffer, sizeof (buffer));
1439     if (status <= 0)
1440     {
1441       close (fd);
1442
1443       if (status < 0)
1444         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1445
1446       break;
1447     }
1448
1449     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1450     if (status != 0)
1451     {
1452       close (fd);
1453       break;
1454     }
1455   }
1456
1457   self = pthread_self ();
1458   /* Remove this thread from the connection threads list */
1459   pthread_mutex_lock (&connection_threads_lock);
1460   /* Find out own index in the array */
1461   for (i = 0; i < connection_threads_num; i++)
1462     if (pthread_equal (connection_threads[i], self) != 0)
1463       break;
1464   assert (i < connection_threads_num);
1465
1466   /* Move the trailing threads forward. */
1467   if (i < (connection_threads_num - 1))
1468   {
1469     memmove (connection_threads + i,
1470         connection_threads + i + 1,
1471         sizeof (pthread_t) * (connection_threads_num - i - 1));
1472   }
1473
1474   connection_threads_num--;
1475   pthread_mutex_unlock (&connection_threads_lock);
1476
1477   return (NULL);
1478 } /* }}} void *connection_thread_main */
1479
1480 static int open_listen_socket_unix (const char *path) /* {{{ */
1481 {
1482   int fd;
1483   struct sockaddr_un sa;
1484   listen_socket_t *temp;
1485   int status;
1486
1487   temp = (listen_socket_t *) realloc (listen_fds,
1488       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1489   if (temp == NULL)
1490   {
1491     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1492     return (-1);
1493   }
1494   listen_fds = temp;
1495   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1496
1497   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1498   if (fd < 0)
1499   {
1500     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1501     return (-1);
1502   }
1503
1504   memset (&sa, 0, sizeof (sa));
1505   sa.sun_family = AF_UNIX;
1506   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1507
1508   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1509   if (status != 0)
1510   {
1511     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1512     close (fd);
1513     unlink (path);
1514     return (-1);
1515   }
1516
1517   status = listen (fd, /* backlog = */ 10);
1518   if (status != 0)
1519   {
1520     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1521     close (fd);
1522     unlink (path);
1523     return (-1);
1524   }
1525   
1526   listen_fds[listen_fds_num].fd = fd;
1527   snprintf (listen_fds[listen_fds_num].path,
1528       sizeof (listen_fds[listen_fds_num].path) - 1,
1529       "unix:%s", path);
1530   listen_fds_num++;
1531
1532   return (0);
1533 } /* }}} int open_listen_socket_unix */
1534
1535 static int open_listen_socket (const char *addr) /* {{{ */
1536 {
1537   struct addrinfo ai_hints;
1538   struct addrinfo *ai_res;
1539   struct addrinfo *ai_ptr;
1540   int status;
1541
1542   assert (addr != NULL);
1543
1544   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1545     return (open_listen_socket_unix (addr + strlen ("unix:")));
1546   else if (addr[0] == '/')
1547     return (open_listen_socket_unix (addr));
1548
1549   memset (&ai_hints, 0, sizeof (ai_hints));
1550   ai_hints.ai_flags = 0;
1551 #ifdef AI_ADDRCONFIG
1552   ai_hints.ai_flags |= AI_ADDRCONFIG;
1553 #endif
1554   ai_hints.ai_family = AF_UNSPEC;
1555   ai_hints.ai_socktype = SOCK_STREAM;
1556
1557   ai_res = NULL;
1558   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
1559   if (status != 0)
1560   {
1561     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1562         "%s", addr, gai_strerror (status));
1563     return (-1);
1564   }
1565
1566   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1567   {
1568     int fd;
1569     listen_socket_t *temp;
1570
1571     temp = (listen_socket_t *) realloc (listen_fds,
1572         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1573     if (temp == NULL)
1574     {
1575       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1576       continue;
1577     }
1578     listen_fds = temp;
1579     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1580
1581     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1582     if (fd < 0)
1583     {
1584       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1585       continue;
1586     }
1587
1588     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1589     if (status != 0)
1590     {
1591       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1592       close (fd);
1593       continue;
1594     }
1595
1596     status = listen (fd, /* backlog = */ 10);
1597     if (status != 0)
1598     {
1599       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1600       close (fd);
1601       return (-1);
1602     }
1603
1604     listen_fds[listen_fds_num].fd = fd;
1605     strncpy (listen_fds[listen_fds_num].path, addr,
1606         sizeof (listen_fds[listen_fds_num].path) - 1);
1607     listen_fds_num++;
1608   } /* for (ai_ptr) */
1609
1610   return (0);
1611 } /* }}} int open_listen_socket */
1612
1613 static int close_listen_sockets (void) /* {{{ */
1614 {
1615   size_t i;
1616
1617   for (i = 0; i < listen_fds_num; i++)
1618   {
1619     close (listen_fds[i].fd);
1620     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1621       unlink (listen_fds[i].path + strlen ("unix:"));
1622   }
1623
1624   free (listen_fds);
1625   listen_fds = NULL;
1626   listen_fds_num = 0;
1627
1628   return (0);
1629 } /* }}} int close_listen_sockets */
1630
1631 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1632 {
1633   struct pollfd *pollfds;
1634   int pollfds_num;
1635   int status;
1636   int i;
1637
1638   for (i = 0; i < config_listen_address_list_len; i++)
1639     open_listen_socket (config_listen_address_list[i]);
1640
1641   if (config_listen_address_list_len < 1)
1642     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1643
1644   if (listen_fds_num < 1)
1645   {
1646     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1647         "could be opened. Sorry.");
1648     return (NULL);
1649   }
1650
1651   pollfds_num = listen_fds_num;
1652   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1653   if (pollfds == NULL)
1654   {
1655     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1656     return (NULL);
1657   }
1658   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1659
1660   RRDD_LOG(LOG_INFO, "listening for connections");
1661
1662   while (do_shutdown == 0)
1663   {
1664     assert (pollfds_num == ((int) listen_fds_num));
1665     for (i = 0; i < pollfds_num; i++)
1666     {
1667       pollfds[i].fd = listen_fds[i].fd;
1668       pollfds[i].events = POLLIN | POLLPRI;
1669       pollfds[i].revents = 0;
1670     }
1671
1672     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1673     if (status < 1)
1674     {
1675       status = errno;
1676       if (status != EINTR)
1677       {
1678         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1679       }
1680       continue;
1681     }
1682
1683     for (i = 0; i < pollfds_num; i++)
1684     {
1685       int *client_sd;
1686       struct sockaddr_storage client_sa;
1687       socklen_t client_sa_size;
1688       pthread_t tid;
1689       pthread_attr_t attr;
1690
1691       if (pollfds[i].revents == 0)
1692         continue;
1693
1694       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1695       {
1696         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1697             "poll(2) returned something unexpected for listen FD #%i.",
1698             pollfds[i].fd);
1699         continue;
1700       }
1701
1702       client_sd = (int *) malloc (sizeof (int));
1703       if (client_sd == NULL)
1704       {
1705         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1706         continue;
1707       }
1708
1709       client_sa_size = sizeof (client_sa);
1710       *client_sd = accept (pollfds[i].fd,
1711           (struct sockaddr *) &client_sa, &client_sa_size);
1712       if (*client_sd < 0)
1713       {
1714         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1715         continue;
1716       }
1717
1718       pthread_attr_init (&attr);
1719       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1720
1721       status = pthread_create (&tid, &attr, connection_thread_main,
1722           /* args = */ (void *) client_sd);
1723       if (status != 0)
1724       {
1725         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1726         close (*client_sd);
1727         free (client_sd);
1728         continue;
1729       }
1730     } /* for (pollfds_num) */
1731   } /* while (do_shutdown == 0) */
1732
1733   RRDD_LOG(LOG_INFO, "starting shutdown");
1734
1735   close_listen_sockets ();
1736
1737   pthread_mutex_lock (&connection_threads_lock);
1738   while (connection_threads_num > 0)
1739   {
1740     pthread_t wait_for;
1741
1742     wait_for = connection_threads[0];
1743
1744     pthread_mutex_unlock (&connection_threads_lock);
1745     pthread_join (wait_for, /* retval = */ NULL);
1746     pthread_mutex_lock (&connection_threads_lock);
1747   }
1748   pthread_mutex_unlock (&connection_threads_lock);
1749
1750   return (NULL);
1751 } /* }}} void *listen_thread_main */
1752
1753 static int daemonize (void) /* {{{ */
1754 {
1755   pid_t child;
1756   int status;
1757   char *base_dir;
1758
1759   /* These structures are static, because `sigaction' behaves weird if the are
1760    * overwritten.. */
1761   static struct sigaction sa_int;
1762   static struct sigaction sa_term;
1763   static struct sigaction sa_pipe;
1764
1765   if (stay_foreground)
1766     goto child_startup;
1767
1768   child = fork ();
1769   if (child < 0)
1770   {
1771     fprintf (stderr, "daemonize: fork(2) failed.\n");
1772     return (-1);
1773   }
1774   else if (child > 0)
1775   {
1776     return (1);
1777   }
1778
1779   /* Change into the /tmp directory. */
1780   base_dir = (config_base_dir != NULL)
1781     ? config_base_dir
1782     : "/tmp";
1783   status = chdir (base_dir);
1784   if (status != 0)
1785   {
1786     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1787     return (-1);
1788   }
1789
1790   /* Become session leader */
1791   setsid ();
1792
1793   /* Open the first three file descriptors to /dev/null */
1794   close (2);
1795   close (1);
1796   close (0);
1797
1798   open ("/dev/null", O_RDWR);
1799   dup (0);
1800   dup (0);
1801
1802 child_startup:
1803   /* Install signal handlers */
1804   memset (&sa_int, 0, sizeof (sa_int));
1805   sa_int.sa_handler = sig_int_handler;
1806   sigaction (SIGINT, &sa_int, NULL);
1807
1808   memset (&sa_term, 0, sizeof (sa_term));
1809   sa_term.sa_handler = sig_term_handler;
1810   sigaction (SIGTERM, &sa_term, NULL);
1811
1812   memset (&sa_pipe, 0, sizeof (sa_pipe));
1813   sa_pipe.sa_handler = SIG_IGN;
1814   sigaction (SIGPIPE, &sa_pipe, NULL);
1815
1816   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1817   RRDD_LOG(LOG_INFO, "starting up");
1818
1819   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1820   if (cache_tree == NULL)
1821   {
1822     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1823     return (-1);
1824   }
1825
1826   status = write_pidfile ();
1827   return status;
1828 } /* }}} int daemonize */
1829
1830 static int cleanup (void) /* {{{ */
1831 {
1832   do_shutdown++;
1833
1834   pthread_cond_signal (&cache_cond);
1835   pthread_join (queue_thread, /* return = */ NULL);
1836
1837   remove_pidfile ();
1838
1839   RRDD_LOG(LOG_INFO, "goodbye");
1840   closelog ();
1841
1842   return (0);
1843 } /* }}} int cleanup */
1844
1845 static int read_options (int argc, char **argv) /* {{{ */
1846 {
1847   int option;
1848   int status = 0;
1849
1850   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1851   {
1852     switch (option)
1853     {
1854       case 'g':
1855         stay_foreground=1;
1856         break;
1857
1858       case 'l':
1859       {
1860         char **temp;
1861
1862         temp = (char **) realloc (config_listen_address_list,
1863             sizeof (char *) * (config_listen_address_list_len + 1));
1864         if (temp == NULL)
1865         {
1866           fprintf (stderr, "read_options: realloc failed.\n");
1867           return (2);
1868         }
1869         config_listen_address_list = temp;
1870
1871         temp[config_listen_address_list_len] = strdup (optarg);
1872         if (temp[config_listen_address_list_len] == NULL)
1873         {
1874           fprintf (stderr, "read_options: strdup failed.\n");
1875           return (2);
1876         }
1877         config_listen_address_list_len++;
1878       }
1879       break;
1880
1881       case 'f':
1882       {
1883         int temp;
1884
1885         temp = atoi (optarg);
1886         if (temp > 0)
1887           config_flush_interval = temp;
1888         else
1889         {
1890           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1891           status = 3;
1892         }
1893       }
1894       break;
1895
1896       case 'w':
1897       {
1898         int temp;
1899
1900         temp = atoi (optarg);
1901         if (temp > 0)
1902           config_write_interval = temp;
1903         else
1904         {
1905           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1906           status = 2;
1907         }
1908       }
1909       break;
1910
1911       case 'z':
1912       {
1913         int temp;
1914
1915         temp = atoi(optarg);
1916         if (temp > 0)
1917           config_write_jitter = temp;
1918         else
1919         {
1920           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1921           status = 2;
1922         }
1923
1924         break;
1925       }
1926
1927       case 'b':
1928       {
1929         size_t len;
1930
1931         if (config_base_dir != NULL)
1932           free (config_base_dir);
1933         config_base_dir = strdup (optarg);
1934         if (config_base_dir == NULL)
1935         {
1936           fprintf (stderr, "read_options: strdup failed.\n");
1937           return (3);
1938         }
1939
1940         len = strlen (config_base_dir);
1941         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1942         {
1943           config_base_dir[len - 1] = 0;
1944           len--;
1945         }
1946
1947         if (len < 1)
1948         {
1949           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1950           return (4);
1951         }
1952       }
1953       break;
1954
1955       case 'p':
1956       {
1957         if (config_pid_file != NULL)
1958           free (config_pid_file);
1959         config_pid_file = strdup (optarg);
1960         if (config_pid_file == NULL)
1961         {
1962           fprintf (stderr, "read_options: strdup failed.\n");
1963           return (3);
1964         }
1965       }
1966       break;
1967
1968       case 'j':
1969       {
1970         struct stat statbuf;
1971         const char *dir = optarg;
1972
1973         status = stat(dir, &statbuf);
1974         if (status != 0)
1975         {
1976           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
1977           return 6;
1978         }
1979
1980         if (!S_ISDIR(statbuf.st_mode)
1981             || access(dir, R_OK|W_OK|X_OK) != 0)
1982         {
1983           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
1984                   errno ? rrd_strerror(errno) : "");
1985           return 6;
1986         }
1987
1988         journal_cur = malloc(PATH_MAX + 1);
1989         journal_old = malloc(PATH_MAX + 1);
1990         if (journal_cur == NULL || journal_old == NULL)
1991         {
1992           fprintf(stderr, "malloc failure for journal files\n");
1993           return 6;
1994         }
1995         else 
1996         {
1997           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
1998           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
1999         }
2000       }
2001       break;
2002
2003       case 'h':
2004       case '?':
2005         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
2006             "\n"
2007             "Usage: rrdcached [options]\n"
2008             "\n"
2009             "Valid options are:\n"
2010             "  -l <address>  Socket address to listen to.\n"
2011             "  -w <seconds>  Interval in which to write data.\n"
2012             "  -z <delay>    Delay writes up to <delay> seconds to spread load" \
2013             "  -f <seconds>  Interval in which to flush dead data.\n"
2014             "  -p <file>     Location of the PID-file.\n"
2015             "  -b <dir>      Base directory to change to.\n"
2016             "\n"
2017             "For more information and a detailed description of all options "
2018             "please refer\n"
2019             "to the rrdcached(1) manual page.\n",
2020             VERSION);
2021         status = -1;
2022         break;
2023     } /* switch (option) */
2024   } /* while (getopt) */
2025
2026   /* advise the user when values are not sane */
2027   if (config_flush_interval < 2 * config_write_interval)
2028     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2029             " 2x write interval (-w) !\n");
2030   if (config_write_jitter > config_write_interval)
2031     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2032             " write interval (-w) !\n");
2033
2034   return (status);
2035 } /* }}} int read_options */
2036
2037 int main (int argc, char **argv)
2038 {
2039   int status;
2040
2041   status = read_options (argc, argv);
2042   if (status != 0)
2043   {
2044     if (status < 0)
2045       status = 0;
2046     return (status);
2047   }
2048
2049   status = daemonize ();
2050   if (status == 1)
2051   {
2052     struct sigaction sigchld;
2053
2054     memset (&sigchld, 0, sizeof (sigchld));
2055     sigchld.sa_handler = SIG_IGN;
2056     sigaction (SIGCHLD, &sigchld, NULL);
2057
2058     return (0);
2059   }
2060   else if (status != 0)
2061   {
2062     fprintf (stderr, "daemonize failed, exiting.\n");
2063     return (1);
2064   }
2065
2066   if (journal_cur != NULL)
2067   {
2068     int had_journal = 0;
2069
2070     pthread_mutex_lock(&journal_lock);
2071
2072     RRDD_LOG(LOG_INFO, "checking for journal files");
2073
2074     had_journal += journal_replay(journal_old);
2075     had_journal += journal_replay(journal_cur);
2076
2077     if (had_journal)
2078       flush_old_values(-1);
2079
2080     pthread_mutex_unlock(&journal_lock);
2081     journal_rotate();
2082
2083     RRDD_LOG(LOG_INFO, "journal processing complete");
2084   }
2085
2086   /* start the queue thread */
2087   memset (&queue_thread, 0, sizeof (queue_thread));
2088   status = pthread_create (&queue_thread,
2089                            NULL, /* attr */
2090                            queue_thread_main,
2091                            NULL); /* args */
2092   if (status != 0)
2093   {
2094     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2095     cleanup();
2096     return (1);
2097   }
2098
2099   listen_thread_main (NULL);
2100   cleanup ();
2101
2102   return (0);
2103 } /* int main */
2104
2105 /*
2106  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2107  */