This patch ensures that the "FLUSH" command will write the updates out to
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 struct listen_socket_s
105 {
106   int fd;
107   char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
110
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115   char *file;
116   char **values;
117   int values_num;
118   time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE  (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121   int flags;
122   pthread_cond_t  flushed;
123   cache_item_t *next;
124 };
125
126 struct callback_flush_data_s
127 {
128   time_t now;
129   time_t abs_timeout;
130   char **keys;
131   size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
134
135 enum queue_side_e
136 {
137   HEAD,
138   TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
141
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
144
145 /*
146  * Variables
147  */
148 static int stay_foreground = 0;
149
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
152
153 static int do_shutdown = 0;
154
155 static pthread_t queue_thread;
156
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
160
161 /* Cache stuff */
162 static GTree          *cache_tree = NULL;
163 static cache_item_t   *cache_queue_head = NULL;
164 static cache_item_t   *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
167
168 static int config_write_interval = 300;
169 static int config_write_jitter   = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
173
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
176
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
185
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
194
195 /* 
196  * Functions
197  */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
199 {
200   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201   do_shutdown++;
202   pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
204
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
206 {
207   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208   do_shutdown++;
209   pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
211
212 static int write_pidfile (void) /* {{{ */
213 {
214   pid_t pid;
215   char *file;
216   int fd;
217   FILE *fh;
218
219   pid = getpid ();
220   
221   file = (config_pid_file != NULL)
222     ? config_pid_file
223     : LOCALSTATEDIR "/run/rrdcached.pid";
224
225   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
226   if (fd < 0)
227   {
228     RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
229              file, rrd_strerror(errno));
230     return (-1);
231   }
232
233   fh = fdopen (fd, "w");
234   if (fh == NULL)
235   {
236     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
237     close(fd);
238     return (-1);
239   }
240
241   fprintf (fh, "%i\n", (int) pid);
242   fclose (fh);
243
244   return (0);
245 } /* }}} int write_pidfile */
246
247 static int remove_pidfile (void) /* {{{ */
248 {
249   char *file;
250   int status;
251
252   file = (config_pid_file != NULL)
253     ? config_pid_file
254     : LOCALSTATEDIR "/run/rrdcached.pid";
255
256   status = unlink (file);
257   if (status == 0)
258     return (0);
259   return (errno);
260 } /* }}} int remove_pidfile */
261
262 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
263 {
264   char    *buffer;
265   size_t   buffer_used;
266   size_t   buffer_free;
267   ssize_t  status;
268
269   buffer       = (char *) buffer_void;
270   buffer_used  = 0;
271   buffer_free  = buffer_size;
272
273   while (buffer_free > 0)
274   {
275     status = read (fd, buffer + buffer_used, buffer_free);
276     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
277       continue;
278
279     if (status < 0)
280       return (-1);
281
282     if (status == 0)
283       return (0);
284
285     assert ((0 > status) || (buffer_free >= (size_t) status));
286
287     buffer_free = buffer_free - status;
288     buffer_used = buffer_used + status;
289
290     if (buffer[buffer_used - 1] == '\n')
291       break;
292   }
293
294   assert (buffer_used > 0);
295
296   if (buffer[buffer_used - 1] != '\n')
297   {
298     errno = ENOBUFS;
299     return (-1);
300   }
301
302   buffer[buffer_used - 1] = 0;
303
304   /* Fix network line endings. */
305   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
306   {
307     buffer_used--;
308     buffer[buffer_used - 1] = 0;
309   }
310
311   return (buffer_used);
312 } /* }}} ssize_t sread */
313
314 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
315 {
316   const char *ptr;
317   size_t      nleft;
318   ssize_t     status;
319
320   /* special case for journal replay */
321   if (fd < 0) return 0;
322
323   ptr   = (const char *) buf;
324   nleft = count;
325
326   while (nleft > 0)
327   {
328     status = write (fd, (const void *) ptr, nleft);
329
330     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
331       continue;
332
333     if (status < 0)
334       return (status);
335
336     nleft -= status;
337     ptr   += status;
338   }
339
340   return (0);
341 } /* }}} ssize_t swrite */
342
343 static void _wipe_ci_values(cache_item_t *ci, time_t when)
344 {
345   ci->values = NULL;
346   ci->values_num = 0;
347
348   ci->last_flush_time = when;
349   if (config_write_jitter > 0)
350     ci->last_flush_time += (random() % config_write_jitter);
351
352   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
353 }
354
355 /*
356  * enqueue_cache_item:
357  * `cache_lock' must be acquired before calling this function!
358  */
359 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
360     queue_side_t side)
361 {
362   int did_insert = 0;
363
364   if (ci == NULL)
365     return (-1);
366
367   if (ci->values_num == 0)
368     return (0);
369
370   if (side == HEAD)
371   {
372     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
373     {
374       assert (ci->next == NULL);
375       ci->next = cache_queue_head;
376       cache_queue_head = ci;
377
378       if (cache_queue_tail == NULL)
379         cache_queue_tail = cache_queue_head;
380
381       did_insert = 1;
382     }
383     else if (cache_queue_head == ci)
384     {
385       /* do nothing */
386     }
387     else /* enqueued, but not first entry */
388     {
389       cache_item_t *prev;
390
391       /* find previous entry */
392       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
393         if (prev->next == ci)
394           break;
395       assert (prev != NULL);
396
397       /* move to the front */
398       prev->next = ci->next;
399       ci->next = cache_queue_head;
400       cache_queue_head = ci;
401
402       /* check if we need to adapt the tail */
403       if (cache_queue_tail == ci)
404         cache_queue_tail = prev;
405     }
406   }
407   else /* (side == TAIL) */
408   {
409     /* We don't move values back in the list.. */
410     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
411       return (0);
412
413     assert (ci->next == NULL);
414
415     if (cache_queue_tail == NULL)
416       cache_queue_head = ci;
417     else
418       cache_queue_tail->next = ci;
419     cache_queue_tail = ci;
420
421     did_insert = 1;
422   }
423
424   ci->flags |= CI_FLAGS_IN_QUEUE;
425
426   if (did_insert)
427   {
428     pthread_mutex_lock (&stats_lock);
429     stats_queue_length++;
430     pthread_mutex_unlock (&stats_lock);
431   }
432
433   return (0);
434 } /* }}} int enqueue_cache_item */
435
436 /*
437  * tree_callback_flush:
438  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
439  * while this is in progress.
440  */
441 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
442     gpointer data)
443 {
444   cache_item_t *ci;
445   callback_flush_data_t *cfd;
446
447   ci = (cache_item_t *) value;
448   cfd = (callback_flush_data_t *) data;
449
450   if ((ci->last_flush_time <= cfd->abs_timeout)
451       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
452       && (ci->values_num > 0))
453   {
454     enqueue_cache_item (ci, TAIL);
455   }
456   else if ((do_shutdown != 0)
457       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
458       && (ci->values_num > 0))
459   {
460     enqueue_cache_item (ci, TAIL);
461   }
462   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
463       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
464       && (ci->values_num <= 0))
465   {
466     char **temp;
467
468     temp = (char **) realloc (cfd->keys,
469         sizeof (char *) * (cfd->keys_num + 1));
470     if (temp == NULL)
471     {
472       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
473       return (FALSE);
474     }
475     cfd->keys = temp;
476     /* Make really sure this points to the _same_ place */
477     assert ((char *) key == ci->file);
478     cfd->keys[cfd->keys_num] = (char *) key;
479     cfd->keys_num++;
480   }
481
482   return (FALSE);
483 } /* }}} gboolean tree_callback_flush */
484
485 static int flush_old_values (int max_age)
486 {
487   callback_flush_data_t cfd;
488   size_t k;
489
490   memset (&cfd, 0, sizeof (cfd));
491   /* Pass the current time as user data so that we don't need to call
492    * `time' for each node. */
493   cfd.now = time (NULL);
494   cfd.keys = NULL;
495   cfd.keys_num = 0;
496
497   if (max_age > 0)
498     cfd.abs_timeout = cfd.now - max_age;
499   else
500     cfd.abs_timeout = cfd.now + 1;
501
502   /* `tree_callback_flush' will return the keys of all values that haven't
503    * been touched in the last `config_flush_interval' seconds in `cfd'.
504    * The char*'s in this array point to the same memory as ci->file, so we
505    * don't need to free them separately. */
506   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
507
508   for (k = 0; k < cfd.keys_num; k++)
509   {
510     cache_item_t *ci;
511
512     /* This must not fail. */
513     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
514     assert (ci != NULL);
515
516     /* If we end up here with values available, something's seriously
517      * messed up. */
518     assert (ci->values_num == 0);
519
520     /* Remove the node from the tree */
521     g_tree_remove (cache_tree, cfd.keys[k]);
522     cfd.keys[k] = NULL;
523
524     /* Now free and clean up `ci'. */
525     free (ci->file);
526     ci->file = NULL;
527     free (ci);
528     ci = NULL;
529   } /* for (k = 0; k < cfd.keys_num; k++) */
530
531   if (cfd.keys != NULL)
532   {
533     free (cfd.keys);
534     cfd.keys = NULL;
535   }
536
537   return (0);
538 } /* int flush_old_values */
539
540 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
541 {
542   struct timeval now;
543   struct timespec next_flush;
544
545   gettimeofday (&now, NULL);
546   next_flush.tv_sec = now.tv_sec + config_flush_interval;
547   next_flush.tv_nsec = 1000 * now.tv_usec;
548
549   pthread_mutex_lock (&cache_lock);
550   while ((do_shutdown == 0) || (cache_queue_head != NULL))
551   {
552     cache_item_t *ci;
553     char *file;
554     char **values;
555     int values_num;
556     int status;
557     int i;
558
559     /* First, check if it's time to do the cache flush. */
560     gettimeofday (&now, NULL);
561     if ((now.tv_sec > next_flush.tv_sec)
562         || ((now.tv_sec == next_flush.tv_sec)
563           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
564     {
565       /* Flush all values that haven't been written in the last
566        * `config_write_interval' seconds. */
567       flush_old_values (config_write_interval);
568
569       /* Determine the time of the next cache flush. */
570       while (next_flush.tv_sec <= now.tv_sec)
571         next_flush.tv_sec += config_flush_interval;
572
573       /* unlock the cache while we rotate so we don't block incoming
574        * updates if the fsync() blocks on disk I/O */
575       pthread_mutex_unlock(&cache_lock);
576       journal_rotate();
577       pthread_mutex_lock(&cache_lock);
578     }
579
580     /* Now, check if there's something to store away. If not, wait until
581      * something comes in or it's time to do the cache flush. */
582     if (cache_queue_head == NULL)
583     {
584       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
585       if ((status != 0) && (status != ETIMEDOUT))
586       {
587         RRDD_LOG (LOG_ERR, "queue_thread_main: "
588             "pthread_cond_timedwait returned %i.", status);
589       }
590     }
591
592     /* We're about to shut down, so lets flush the entire tree. */
593     if ((do_shutdown != 0) && (cache_queue_head == NULL))
594       flush_old_values (/* max age = */ -1);
595
596     /* Check if a value has arrived. This may be NULL if we timed out or there
597      * was an interrupt such as a signal. */
598     if (cache_queue_head == NULL)
599       continue;
600
601     ci = cache_queue_head;
602
603     /* copy the relevant parts */
604     file = strdup (ci->file);
605     if (file == NULL)
606     {
607       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
608       continue;
609     }
610
611     assert(ci->values != NULL);
612     assert(ci->values_num > 0);
613
614     values = ci->values;
615     values_num = ci->values_num;
616
617     _wipe_ci_values(ci, time(NULL));
618
619     cache_queue_head = ci->next;
620     if (cache_queue_head == NULL)
621       cache_queue_tail = NULL;
622     ci->next = NULL;
623
624     pthread_mutex_lock (&stats_lock);
625     assert (stats_queue_length > 0);
626     stats_queue_length--;
627     pthread_mutex_unlock (&stats_lock);
628
629     pthread_mutex_unlock (&cache_lock);
630
631     rrd_clear_error ();
632     status = rrd_update_r (file, NULL, values_num, (void *) values);
633     if (status != 0)
634     {
635       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
636           "rrd_update_r (%s) failed with status %i. (%s)",
637           file, status, rrd_get_error());
638     }
639
640     journal_write("wrote", file);
641     pthread_cond_broadcast(&ci->flushed);
642
643     for (i = 0; i < values_num; i++)
644       free (values[i]);
645
646     free(values);
647     free(file);
648
649     if (status == 0)
650     {
651       pthread_mutex_lock (&stats_lock);
652       stats_updates_written++;
653       stats_data_sets_written += values_num;
654       pthread_mutex_unlock (&stats_lock);
655     }
656
657     pthread_mutex_lock (&cache_lock);
658
659     /* We're about to shut down, so lets flush the entire tree. */
660     if ((do_shutdown != 0) && (cache_queue_head == NULL))
661       flush_old_values (/* max age = */ -1);
662   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
663   pthread_mutex_unlock (&cache_lock);
664
665   assert(cache_queue_head == NULL);
666   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
667   journal_done();
668
669   return (NULL);
670 } /* }}} void *queue_thread_main */
671
672 static int buffer_get_field (char **buffer_ret, /* {{{ */
673     size_t *buffer_size_ret, char **field_ret)
674 {
675   char *buffer;
676   size_t buffer_pos;
677   size_t buffer_size;
678   char *field;
679   size_t field_size;
680   int status;
681
682   buffer = *buffer_ret;
683   buffer_pos = 0;
684   buffer_size = *buffer_size_ret;
685   field = *buffer_ret;
686   field_size = 0;
687
688   if (buffer_size <= 0)
689     return (-1);
690
691   /* This is ensured by `handle_request'. */
692   assert (buffer[buffer_size - 1] == '\0');
693
694   status = -1;
695   while (buffer_pos < buffer_size)
696   {
697     /* Check for end-of-field or end-of-buffer */
698     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
699     {
700       field[field_size] = 0;
701       field_size++;
702       buffer_pos++;
703       status = 0;
704       break;
705     }
706     /* Handle escaped characters. */
707     else if (buffer[buffer_pos] == '\\')
708     {
709       if (buffer_pos >= (buffer_size - 1))
710         break;
711       buffer_pos++;
712       field[field_size] = buffer[buffer_pos];
713       field_size++;
714       buffer_pos++;
715     }
716     /* Normal operation */ 
717     else
718     {
719       field[field_size] = buffer[buffer_pos];
720       field_size++;
721       buffer_pos++;
722     }
723   } /* while (buffer_pos < buffer_size) */
724
725   if (status != 0)
726     return (status);
727
728   *buffer_ret = buffer + buffer_pos;
729   *buffer_size_ret = buffer_size - buffer_pos;
730   *field_ret = field;
731
732   return (0);
733 } /* }}} int buffer_get_field */
734
735 static int flush_file (const char *filename) /* {{{ */
736 {
737   cache_item_t *ci;
738
739   pthread_mutex_lock (&cache_lock);
740
741   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
742   if (ci == NULL)
743   {
744     pthread_mutex_unlock (&cache_lock);
745     return (ENOENT);
746   }
747
748   /* Enqueue at head */
749   enqueue_cache_item (ci, HEAD);
750   pthread_cond_signal (&cache_cond);
751
752   pthread_cond_wait(&ci->flushed, &cache_lock);
753   pthread_mutex_unlock(&cache_lock);
754
755   return (0);
756 } /* }}} int flush_file */
757
758 static int handle_request_help (int fd, /* {{{ */
759     char *buffer, size_t buffer_size)
760 {
761   int status;
762   char **help_text;
763   size_t help_text_len;
764   char *command;
765   size_t i;
766
767   char *help_help[] =
768   {
769     "4 Command overview\n",
770     "FLUSH <filename>\n",
771     "HELP [<command>]\n",
772     "UPDATE <filename> <values> [<values> ...]\n",
773     "STATS\n"
774   };
775   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
776
777   char *help_flush[] =
778   {
779     "4 Help for FLUSH\n",
780     "Usage: FLUSH <filename>\n",
781     "\n",
782     "Adds the given filename to the head of the update queue and returns\n",
783     "after is has been dequeued.\n"
784   };
785   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
786
787   char *help_update[] =
788   {
789     "9 Help for UPDATE\n",
790     "Usage: UPDATE <filename> <values> [<values> ...]\n"
791     "\n",
792     "Adds the given file to the internal cache if it is not yet known and\n",
793     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
794     "for details.\n",
795     "\n",
796     "Each <values> has the following form:\n",
797     "  <values> = <time>:<value>[:<value>[...]]\n",
798     "See the rrdupdate(1) manpage for details.\n"
799   };
800   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
801
802   char *help_stats[] =
803   {
804     "4 Help for STATS\n",
805     "Usage: STATS\n",
806     "\n",
807     "Returns some performance counters, see the rrdcached(1) manpage for\n",
808     "a description of the values.\n"
809   };
810   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
811
812   status = buffer_get_field (&buffer, &buffer_size, &command);
813   if (status != 0)
814   {
815     help_text = help_help;
816     help_text_len = help_help_len;
817   }
818   else
819   {
820     if (strcasecmp (command, "update") == 0)
821     {
822       help_text = help_update;
823       help_text_len = help_update_len;
824     }
825     else if (strcasecmp (command, "flush") == 0)
826     {
827       help_text = help_flush;
828       help_text_len = help_flush_len;
829     }
830     else if (strcasecmp (command, "stats") == 0)
831     {
832       help_text = help_stats;
833       help_text_len = help_stats_len;
834     }
835     else
836     {
837       help_text = help_help;
838       help_text_len = help_help_len;
839     }
840   }
841
842   for (i = 0; i < help_text_len; i++)
843   {
844     status = swrite (fd, help_text[i], strlen (help_text[i]));
845     if (status < 0)
846     {
847       status = errno;
848       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
849       return (status);
850     }
851   }
852
853   return (0);
854 } /* }}} int handle_request_help */
855
856 static int handle_request_stats (int fd, /* {{{ */
857     char *buffer __attribute__((unused)),
858     size_t buffer_size __attribute__((unused)))
859 {
860   int status;
861   char outbuf[CMD_MAX];
862
863   uint64_t copy_queue_length;
864   uint64_t copy_updates_received;
865   uint64_t copy_flush_received;
866   uint64_t copy_updates_written;
867   uint64_t copy_data_sets_written;
868   uint64_t copy_journal_bytes;
869   uint64_t copy_journal_rotate;
870
871   uint64_t tree_nodes_number;
872   uint64_t tree_depth;
873
874   pthread_mutex_lock (&stats_lock);
875   copy_queue_length       = stats_queue_length;
876   copy_updates_received   = stats_updates_received;
877   copy_flush_received     = stats_flush_received;
878   copy_updates_written    = stats_updates_written;
879   copy_data_sets_written  = stats_data_sets_written;
880   copy_journal_bytes      = stats_journal_bytes;
881   copy_journal_rotate     = stats_journal_rotate;
882   pthread_mutex_unlock (&stats_lock);
883
884   pthread_mutex_lock (&cache_lock);
885   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
886   tree_depth        = (uint64_t) g_tree_height (cache_tree);
887   pthread_mutex_unlock (&cache_lock);
888
889 #define RRDD_STATS_SEND \
890   outbuf[sizeof (outbuf) - 1] = 0; \
891   status = swrite (fd, outbuf, strlen (outbuf)); \
892   if (status < 0) \
893   { \
894     status = errno; \
895     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
896     return (status); \
897   }
898
899   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
900   RRDD_STATS_SEND;
901
902   snprintf (outbuf, sizeof (outbuf),
903       "QueueLength: %"PRIu64"\n", copy_queue_length);
904   RRDD_STATS_SEND;
905
906   snprintf (outbuf, sizeof (outbuf),
907       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
908   RRDD_STATS_SEND;
909
910   snprintf (outbuf, sizeof (outbuf),
911       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
912   RRDD_STATS_SEND;
913
914   snprintf (outbuf, sizeof (outbuf),
915       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
916   RRDD_STATS_SEND;
917
918   snprintf (outbuf, sizeof (outbuf),
919       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
920   RRDD_STATS_SEND;
921
922   snprintf (outbuf, sizeof (outbuf),
923       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
924   RRDD_STATS_SEND;
925
926   snprintf (outbuf, sizeof (outbuf),
927       "TreeDepth: %"PRIu64"\n", tree_depth);
928   RRDD_STATS_SEND;
929
930   snprintf (outbuf, sizeof(outbuf),
931       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
932   RRDD_STATS_SEND;
933
934   snprintf (outbuf, sizeof(outbuf),
935       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
936   RRDD_STATS_SEND;
937
938   return (0);
939 #undef RRDD_STATS_SEND
940 } /* }}} int handle_request_stats */
941
942 static int handle_request_flush (int fd, /* {{{ */
943     char *buffer, size_t buffer_size)
944 {
945   char *file;
946   int status;
947   char result[CMD_MAX];
948
949   status = buffer_get_field (&buffer, &buffer_size, &file);
950   if (status != 0)
951   {
952     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
953   }
954   else
955   {
956     pthread_mutex_lock(&stats_lock);
957     stats_flush_received++;
958     pthread_mutex_unlock(&stats_lock);
959
960     status = flush_file (file);
961     if (status == 0)
962       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
963     else if (status == ENOENT)
964     {
965       /* no file in our tree; see whether it exists at all */
966       struct stat statbuf;
967
968       memset(&statbuf, 0, sizeof(statbuf));
969       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
970         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
971       else
972         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
973     }
974     else if (status < 0)
975       strncpy (result, "-1 Internal error.\n", sizeof (result));
976     else
977       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
978   }
979   result[sizeof (result) - 1] = 0;
980
981   status = swrite (fd, result, strlen (result));
982   if (status < 0)
983   {
984     status = errno;
985     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
986     return (status);
987   }
988
989   return (0);
990 } /* }}} int handle_request_flush */
991
992 static int handle_request_update (int fd, /* {{{ */
993     char *buffer, size_t buffer_size)
994 {
995   char *file;
996   int values_num = 0;
997   int status;
998
999   time_t now;
1000
1001   cache_item_t *ci;
1002   char answer[CMD_MAX];
1003
1004 #define RRDD_UPDATE_SEND \
1005   answer[sizeof (answer) - 1] = 0; \
1006   status = swrite (fd, answer, strlen (answer)); \
1007   if (status < 0) \
1008   { \
1009     status = errno; \
1010     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1011     return (status); \
1012   }
1013
1014   now = time (NULL);
1015
1016   status = buffer_get_field (&buffer, &buffer_size, &file);
1017   if (status != 0)
1018   {
1019     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1020         sizeof (answer));
1021     RRDD_UPDATE_SEND;
1022     return (0);
1023   }
1024
1025   pthread_mutex_lock(&stats_lock);
1026   stats_updates_received++;
1027   pthread_mutex_unlock(&stats_lock);
1028
1029   pthread_mutex_lock (&cache_lock);
1030   ci = g_tree_lookup (cache_tree, file);
1031
1032   if (ci == NULL) /* {{{ */
1033   {
1034     struct stat statbuf;
1035
1036     /* don't hold the lock while we setup; stat(2) might block */
1037     pthread_mutex_unlock(&cache_lock);
1038
1039     memset (&statbuf, 0, sizeof (statbuf));
1040     status = stat (file, &statbuf);
1041     if (status != 0)
1042     {
1043       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1044
1045       status = errno;
1046       if (status == ENOENT)
1047         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1048       else
1049         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1050             status);
1051       RRDD_UPDATE_SEND;
1052       return (0);
1053     }
1054     if (!S_ISREG (statbuf.st_mode))
1055     {
1056       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1057       RRDD_UPDATE_SEND;
1058       return (0);
1059     }
1060     if (access(file, R_OK|W_OK) != 0)
1061     {
1062       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1063                 file, rrd_strerror(errno));
1064       RRDD_UPDATE_SEND;
1065       return (0);
1066     }
1067
1068     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1069     if (ci == NULL)
1070     {
1071       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1072
1073       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1074       RRDD_UPDATE_SEND;
1075       return (0);
1076     }
1077     memset (ci, 0, sizeof (cache_item_t));
1078
1079     ci->file = strdup (file);
1080     if (ci->file == NULL)
1081     {
1082       free (ci);
1083       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1084
1085       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1086       RRDD_UPDATE_SEND;
1087       return (0);
1088     }
1089
1090     _wipe_ci_values(ci, now);
1091     ci->flags = CI_FLAGS_IN_TREE;
1092
1093     pthread_mutex_lock(&cache_lock);
1094     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1095   } /* }}} */
1096   assert (ci != NULL);
1097
1098   while (buffer_size > 0)
1099   {
1100     char **temp;
1101     char *value;
1102
1103     status = buffer_get_field (&buffer, &buffer_size, &value);
1104     if (status != 0)
1105     {
1106       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1107       break;
1108     }
1109
1110     temp = (char **) realloc (ci->values,
1111         sizeof (char *) * (ci->values_num + 1));
1112     if (temp == NULL)
1113     {
1114       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1115       continue;
1116     }
1117     ci->values = temp;
1118
1119     ci->values[ci->values_num] = strdup (value);
1120     if (ci->values[ci->values_num] == NULL)
1121     {
1122       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1123       continue;
1124     }
1125     ci->values_num++;
1126
1127     values_num++;
1128   }
1129
1130   if (((now - ci->last_flush_time) >= config_write_interval)
1131       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1132       && (ci->values_num > 0))
1133   {
1134     enqueue_cache_item (ci, TAIL);
1135     pthread_cond_signal (&cache_cond);
1136   }
1137
1138   pthread_mutex_unlock (&cache_lock);
1139
1140   if (values_num < 1)
1141   {
1142     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1143   }
1144   else
1145   {
1146     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1147         (values_num == 1) ? "" : "s");
1148   }
1149   RRDD_UPDATE_SEND;
1150   return (0);
1151 #undef RRDD_UPDATE_SEND
1152 } /* }}} int handle_request_update */
1153
1154 /* we came across a "WROTE" entry during journal replay.
1155  * throw away any values that we have accumulated for this file
1156  */
1157 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1158                                  const char *buffer,
1159                                  size_t buffer_size __attribute__((unused)))
1160 {
1161   int i;
1162   cache_item_t *ci;
1163   const char *file = buffer;
1164
1165   pthread_mutex_lock(&cache_lock);
1166
1167   ci = g_tree_lookup(cache_tree, file);
1168   if (ci == NULL)
1169   {
1170     pthread_mutex_unlock(&cache_lock);
1171     return (0);
1172   }
1173
1174   if (ci->values)
1175   {
1176     for (i=0; i < ci->values_num; i++)
1177       free(ci->values[i]);
1178
1179     free(ci->values);
1180   }
1181
1182   _wipe_ci_values(ci, time(NULL));
1183
1184   pthread_mutex_unlock(&cache_lock);
1185   return (0);
1186 } /* }}} int handle_request_wrote */
1187
1188 /* if fd < 0, we are in journal replay mode */
1189 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1190 {
1191   char *buffer_ptr;
1192   char *command;
1193   int status;
1194
1195   assert (buffer[buffer_size - 1] == '\0');
1196
1197   buffer_ptr = buffer;
1198   command = NULL;
1199   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1200   if (status != 0)
1201   {
1202     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1203     return (-1);
1204   }
1205
1206   if (strcasecmp (command, "update") == 0)
1207   {
1208     /* don't re-write updates in replay mode */
1209     if (fd >= 0)
1210       journal_write(command, buffer_ptr);
1211
1212     return (handle_request_update (fd, buffer_ptr, buffer_size));
1213   }
1214   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1215   {
1216     /* this is only valid in replay mode */
1217     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1218   }
1219   else if (strcasecmp (command, "flush") == 0)
1220   {
1221     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1222   }
1223   else if (strcasecmp (command, "stats") == 0)
1224   {
1225     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1226   }
1227   else if (strcasecmp (command, "help") == 0)
1228   {
1229     return (handle_request_help (fd, buffer_ptr, buffer_size));
1230   }
1231   else
1232   {
1233     char result[CMD_MAX];
1234
1235     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1236     result[sizeof (result) - 1] = 0;
1237
1238     status = swrite (fd, result, strlen (result));
1239     if (status < 0)
1240     {
1241       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1242       return (-1);
1243     }
1244   }
1245
1246   return (0);
1247 } /* }}} int handle_request */
1248
1249 /* MUST NOT hold journal_lock before calling this */
1250 static void journal_rotate(void) /* {{{ */
1251 {
1252   FILE *old_fh = NULL;
1253
1254   if (journal_cur == NULL || journal_old == NULL)
1255     return;
1256
1257   pthread_mutex_lock(&journal_lock);
1258
1259   /* we rotate this way (rename before close) so that the we can release
1260    * the journal lock as fast as possible.  Journal writes to the new
1261    * journal can proceed immediately after the new file is opened.  The
1262    * fclose can then block without affecting new updates.
1263    */
1264   if (journal_fh != NULL)
1265   {
1266     old_fh = journal_fh;
1267     rename(journal_cur, journal_old);
1268     ++stats_journal_rotate;
1269   }
1270
1271   journal_fh = fopen(journal_cur, "a");
1272   pthread_mutex_unlock(&journal_lock);
1273
1274   if (old_fh != NULL)
1275     fclose(old_fh);
1276
1277   if (journal_fh == NULL)
1278     RRDD_LOG(LOG_CRIT,
1279              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1280              journal_cur, rrd_strerror(errno));
1281
1282 } /* }}} static void journal_rotate */
1283
1284 static void journal_done(void) /* {{{ */
1285 {
1286   if (journal_cur == NULL)
1287     return;
1288
1289   pthread_mutex_lock(&journal_lock);
1290   if (journal_fh != NULL)
1291   {
1292     fclose(journal_fh);
1293     journal_fh = NULL;
1294   }
1295
1296   RRDD_LOG(LOG_INFO, "removing journals");
1297
1298   unlink(journal_old);
1299   unlink(journal_cur);
1300   pthread_mutex_unlock(&journal_lock);
1301
1302 } /* }}} static void journal_done */
1303
1304 static int journal_write(char *cmd, char *args) /* {{{ */
1305 {
1306   int chars;
1307
1308   if (journal_fh == NULL)
1309     return 0;
1310
1311   pthread_mutex_lock(&journal_lock);
1312   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1313   pthread_mutex_unlock(&journal_lock);
1314
1315   if (chars > 0)
1316   {
1317     pthread_mutex_lock(&stats_lock);
1318     stats_journal_bytes += chars;
1319     pthread_mutex_unlock(&stats_lock);
1320   }
1321
1322   return chars;
1323 } /* }}} static int journal_write */
1324
1325 static int journal_replay (const char *file) /* {{{ */
1326 {
1327   FILE *fh;
1328   int entry_cnt = 0;
1329   int fail_cnt = 0;
1330   uint64_t line = 0;
1331   char entry[CMD_MAX];
1332
1333   if (file == NULL) return 0;
1334
1335   fh = fopen(file, "r");
1336   if (fh == NULL)
1337   {
1338     if (errno != ENOENT)
1339       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1340                file, rrd_strerror(errno));
1341     return 0;
1342   }
1343   else
1344     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1345
1346   while(!feof(fh))
1347   {
1348     size_t entry_len;
1349
1350     ++line;
1351     fgets(entry, sizeof(entry), fh);
1352     entry_len = strlen(entry);
1353
1354     /* check \n termination in case journal writing crashed mid-line */
1355     if (entry_len == 0)
1356       continue;
1357     else if (entry[entry_len - 1] != '\n')
1358     {
1359       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1360       ++fail_cnt;
1361       continue;
1362     }
1363
1364     entry[entry_len - 1] = '\0';
1365
1366     if (handle_request(-1, entry, entry_len) == 0)
1367       ++entry_cnt;
1368     else
1369       ++fail_cnt;
1370   }
1371
1372   fclose(fh);
1373
1374   if (entry_cnt > 0)
1375   {
1376     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1377              entry_cnt, fail_cnt);
1378     return 1;
1379   }
1380   else
1381     return 0;
1382
1383 } /* }}} static int journal_replay */
1384
1385 static void *connection_thread_main (void *args) /* {{{ */
1386 {
1387   pthread_t self;
1388   int i;
1389   int fd;
1390   
1391   fd = *((int *) args);
1392   free (args);
1393
1394   pthread_mutex_lock (&connection_threads_lock);
1395   {
1396     pthread_t *temp;
1397
1398     temp = (pthread_t *) realloc (connection_threads,
1399         sizeof (pthread_t) * (connection_threads_num + 1));
1400     if (temp == NULL)
1401     {
1402       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1403     }
1404     else
1405     {
1406       connection_threads = temp;
1407       connection_threads[connection_threads_num] = pthread_self ();
1408       connection_threads_num++;
1409     }
1410   }
1411   pthread_mutex_unlock (&connection_threads_lock);
1412
1413   while (do_shutdown == 0)
1414   {
1415     char buffer[CMD_MAX];
1416
1417     struct pollfd pollfd;
1418     int status;
1419
1420     pollfd.fd = fd;
1421     pollfd.events = POLLIN | POLLPRI;
1422     pollfd.revents = 0;
1423
1424     status = poll (&pollfd, 1, /* timeout = */ 500);
1425     if (status == 0) /* timeout */
1426       continue;
1427     else if (status < 0) /* error */
1428     {
1429       status = errno;
1430       if (status == EINTR)
1431         continue;
1432       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1433       continue;
1434     }
1435
1436     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1437     {
1438       close (fd);
1439       break;
1440     }
1441     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1442     {
1443       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1444           "poll(2) returned something unexpected: %#04hx",
1445           pollfd.revents);
1446       close (fd);
1447       break;
1448     }
1449
1450     status = (int) sread (fd, buffer, sizeof (buffer));
1451     if (status <= 0)
1452     {
1453       close (fd);
1454
1455       if (status < 0)
1456         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1457
1458       break;
1459     }
1460
1461     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1462     if (status != 0)
1463       break;
1464   }
1465
1466   close(fd);
1467
1468   self = pthread_self ();
1469   /* Remove this thread from the connection threads list */
1470   pthread_mutex_lock (&connection_threads_lock);
1471   /* Find out own index in the array */
1472   for (i = 0; i < connection_threads_num; i++)
1473     if (pthread_equal (connection_threads[i], self) != 0)
1474       break;
1475   assert (i < connection_threads_num);
1476
1477   /* Move the trailing threads forward. */
1478   if (i < (connection_threads_num - 1))
1479   {
1480     memmove (connection_threads + i,
1481         connection_threads + i + 1,
1482         sizeof (pthread_t) * (connection_threads_num - i - 1));
1483   }
1484
1485   connection_threads_num--;
1486   pthread_mutex_unlock (&connection_threads_lock);
1487
1488   return (NULL);
1489 } /* }}} void *connection_thread_main */
1490
1491 static int open_listen_socket_unix (const char *path) /* {{{ */
1492 {
1493   int fd;
1494   struct sockaddr_un sa;
1495   listen_socket_t *temp;
1496   int status;
1497
1498   temp = (listen_socket_t *) realloc (listen_fds,
1499       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1500   if (temp == NULL)
1501   {
1502     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1503     return (-1);
1504   }
1505   listen_fds = temp;
1506   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1507
1508   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1509   if (fd < 0)
1510   {
1511     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1512     return (-1);
1513   }
1514
1515   memset (&sa, 0, sizeof (sa));
1516   sa.sun_family = AF_UNIX;
1517   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1518
1519   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1520   if (status != 0)
1521   {
1522     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1523     close (fd);
1524     unlink (path);
1525     return (-1);
1526   }
1527
1528   status = listen (fd, /* backlog = */ 10);
1529   if (status != 0)
1530   {
1531     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1532     close (fd);
1533     unlink (path);
1534     return (-1);
1535   }
1536   
1537   listen_fds[listen_fds_num].fd = fd;
1538   snprintf (listen_fds[listen_fds_num].path,
1539       sizeof (listen_fds[listen_fds_num].path) - 1,
1540       "unix:%s", path);
1541   listen_fds_num++;
1542
1543   return (0);
1544 } /* }}} int open_listen_socket_unix */
1545
1546 static int open_listen_socket (const char *addr_orig) /* {{{ */
1547 {
1548   struct addrinfo ai_hints;
1549   struct addrinfo *ai_res;
1550   struct addrinfo *ai_ptr;
1551   char addr_copy[NI_MAXHOST];
1552   char *addr;
1553   char *port;
1554   int status;
1555
1556   assert (addr_orig != NULL);
1557
1558   strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1559   addr_copy[sizeof (addr_copy) - 1] = 0;
1560   addr = addr_copy;
1561
1562   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1563     return (open_listen_socket_unix (addr + strlen ("unix:")));
1564   else if (addr[0] == '/')
1565     return (open_listen_socket_unix (addr));
1566
1567   memset (&ai_hints, 0, sizeof (ai_hints));
1568   ai_hints.ai_flags = 0;
1569 #ifdef AI_ADDRCONFIG
1570   ai_hints.ai_flags |= AI_ADDRCONFIG;
1571 #endif
1572   ai_hints.ai_family = AF_UNSPEC;
1573   ai_hints.ai_socktype = SOCK_STREAM;
1574
1575   port = NULL;
1576  if (*addr == '[') /* IPv6+port format */
1577   {
1578     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1579     addr++;
1580
1581     port = strchr (addr, ']');
1582     if (port == NULL)
1583     {
1584       RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1585           addr_orig);
1586       return (-1);
1587     }
1588     *port = 0;
1589     port++;
1590
1591     if (*port == ':')
1592       port++;
1593     else if (*port == 0)
1594       port = NULL;
1595     else
1596     {
1597       RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1598           port);
1599       return (-1);
1600     }
1601   } /* if (*addr = ']') */
1602   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1603   {
1604     port = rindex(addr, ':');
1605     if (port != NULL)
1606     {
1607       *port = 0;
1608       port++;
1609     }
1610   }
1611   ai_res = NULL;
1612   status = getaddrinfo (addr,
1613                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1614                         &ai_hints, &ai_res);
1615   if (status != 0)
1616   {
1617     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1618         "%s", addr, gai_strerror (status));
1619     return (-1);
1620   }
1621
1622   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1623   {
1624     int fd;
1625     listen_socket_t *temp;
1626     int one = 1;
1627
1628     temp = (listen_socket_t *) realloc (listen_fds,
1629         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1630     if (temp == NULL)
1631     {
1632       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1633       continue;
1634     }
1635     listen_fds = temp;
1636     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1637
1638     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1639     if (fd < 0)
1640     {
1641       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1642       continue;
1643     }
1644
1645     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1646
1647     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1648     if (status != 0)
1649     {
1650       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1651       close (fd);
1652       continue;
1653     }
1654
1655     status = listen (fd, /* backlog = */ 10);
1656     if (status != 0)
1657     {
1658       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1659       close (fd);
1660       return (-1);
1661     }
1662
1663     listen_fds[listen_fds_num].fd = fd;
1664     strncpy (listen_fds[listen_fds_num].path, addr,
1665         sizeof (listen_fds[listen_fds_num].path) - 1);
1666     listen_fds_num++;
1667   } /* for (ai_ptr) */
1668
1669   return (0);
1670 } /* }}} int open_listen_socket */
1671
1672 static int close_listen_sockets (void) /* {{{ */
1673 {
1674   size_t i;
1675
1676   for (i = 0; i < listen_fds_num; i++)
1677   {
1678     close (listen_fds[i].fd);
1679     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1680       unlink (listen_fds[i].path + strlen ("unix:"));
1681   }
1682
1683   free (listen_fds);
1684   listen_fds = NULL;
1685   listen_fds_num = 0;
1686
1687   return (0);
1688 } /* }}} int close_listen_sockets */
1689
1690 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1691 {
1692   struct pollfd *pollfds;
1693   int pollfds_num;
1694   int status;
1695   int i;
1696
1697   for (i = 0; i < config_listen_address_list_len; i++)
1698     open_listen_socket (config_listen_address_list[i]);
1699
1700   if (config_listen_address_list_len < 1)
1701     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1702
1703   if (listen_fds_num < 1)
1704   {
1705     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1706         "could be opened. Sorry.");
1707     return (NULL);
1708   }
1709
1710   pollfds_num = listen_fds_num;
1711   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1712   if (pollfds == NULL)
1713   {
1714     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1715     return (NULL);
1716   }
1717   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1718
1719   RRDD_LOG(LOG_INFO, "listening for connections");
1720
1721   while (do_shutdown == 0)
1722   {
1723     assert (pollfds_num == ((int) listen_fds_num));
1724     for (i = 0; i < pollfds_num; i++)
1725     {
1726       pollfds[i].fd = listen_fds[i].fd;
1727       pollfds[i].events = POLLIN | POLLPRI;
1728       pollfds[i].revents = 0;
1729     }
1730
1731     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1732     if (status == 0)
1733     {
1734       continue; /* timeout */
1735     }
1736     else if (status < 0)
1737     {
1738       status = errno;
1739       if (status != EINTR)
1740       {
1741         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1742       }
1743       continue;
1744     }
1745
1746     for (i = 0; i < pollfds_num; i++)
1747     {
1748       int *client_sd;
1749       struct sockaddr_storage client_sa;
1750       socklen_t client_sa_size;
1751       pthread_t tid;
1752       pthread_attr_t attr;
1753
1754       if (pollfds[i].revents == 0)
1755         continue;
1756
1757       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1758       {
1759         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1760             "poll(2) returned something unexpected for listen FD #%i.",
1761             pollfds[i].fd);
1762         continue;
1763       }
1764
1765       client_sd = (int *) malloc (sizeof (int));
1766       if (client_sd == NULL)
1767       {
1768         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1769         continue;
1770       }
1771
1772       client_sa_size = sizeof (client_sa);
1773       *client_sd = accept (pollfds[i].fd,
1774           (struct sockaddr *) &client_sa, &client_sa_size);
1775       if (*client_sd < 0)
1776       {
1777         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1778         continue;
1779       }
1780
1781       pthread_attr_init (&attr);
1782       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1783
1784       status = pthread_create (&tid, &attr, connection_thread_main,
1785           /* args = */ (void *) client_sd);
1786       if (status != 0)
1787       {
1788         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1789         close (*client_sd);
1790         free (client_sd);
1791         continue;
1792       }
1793     } /* for (pollfds_num) */
1794   } /* while (do_shutdown == 0) */
1795
1796   RRDD_LOG(LOG_INFO, "starting shutdown");
1797
1798   close_listen_sockets ();
1799
1800   pthread_mutex_lock (&connection_threads_lock);
1801   while (connection_threads_num > 0)
1802   {
1803     pthread_t wait_for;
1804
1805     wait_for = connection_threads[0];
1806
1807     pthread_mutex_unlock (&connection_threads_lock);
1808     pthread_join (wait_for, /* retval = */ NULL);
1809     pthread_mutex_lock (&connection_threads_lock);
1810   }
1811   pthread_mutex_unlock (&connection_threads_lock);
1812
1813   return (NULL);
1814 } /* }}} void *listen_thread_main */
1815
1816 static int daemonize (void) /* {{{ */
1817 {
1818   int status;
1819
1820   /* These structures are static, because `sigaction' behaves weird if the are
1821    * overwritten.. */
1822   static struct sigaction sa_int;
1823   static struct sigaction sa_term;
1824   static struct sigaction sa_pipe;
1825
1826   if (!stay_foreground)
1827   {
1828     pid_t child;
1829     char *base_dir;
1830
1831     child = fork ();
1832     if (child < 0)
1833     {
1834       fprintf (stderr, "daemonize: fork(2) failed.\n");
1835       return (-1);
1836     }
1837     else if (child > 0)
1838     {
1839       return (1);
1840     }
1841
1842     /* Change into the /tmp directory. */
1843     base_dir = (config_base_dir != NULL)
1844       ? config_base_dir
1845       : "/tmp";
1846     status = chdir (base_dir);
1847     if (status != 0)
1848     {
1849       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1850       return (-1);
1851     }
1852
1853     /* Become session leader */
1854     setsid ();
1855
1856     /* Open the first three file descriptors to /dev/null */
1857     close (2);
1858     close (1);
1859     close (0);
1860
1861     open ("/dev/null", O_RDWR);
1862     dup (0);
1863     dup (0);
1864   } /* if (!stay_foreground) */
1865
1866   /* Install signal handlers */
1867   memset (&sa_int, 0, sizeof (sa_int));
1868   sa_int.sa_handler = sig_int_handler;
1869   sigaction (SIGINT, &sa_int, NULL);
1870
1871   memset (&sa_term, 0, sizeof (sa_term));
1872   sa_term.sa_handler = sig_term_handler;
1873   sigaction (SIGTERM, &sa_term, NULL);
1874
1875   memset (&sa_pipe, 0, sizeof (sa_pipe));
1876   sa_pipe.sa_handler = SIG_IGN;
1877   sigaction (SIGPIPE, &sa_pipe, NULL);
1878
1879   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1880   RRDD_LOG(LOG_INFO, "starting up");
1881
1882   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1883   if (cache_tree == NULL)
1884   {
1885     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1886     return (-1);
1887   }
1888
1889   status = write_pidfile ();
1890   return status;
1891 } /* }}} int daemonize */
1892
1893 static int cleanup (void) /* {{{ */
1894 {
1895   do_shutdown++;
1896
1897   pthread_cond_signal (&cache_cond);
1898   pthread_join (queue_thread, /* return = */ NULL);
1899
1900   remove_pidfile ();
1901
1902   RRDD_LOG(LOG_INFO, "goodbye");
1903   closelog ();
1904
1905   return (0);
1906 } /* }}} int cleanup */
1907
1908 static int read_options (int argc, char **argv) /* {{{ */
1909 {
1910   int option;
1911   int status = 0;
1912
1913   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1914   {
1915     switch (option)
1916     {
1917       case 'g':
1918         stay_foreground=1;
1919         break;
1920
1921       case 'l':
1922       {
1923         char **temp;
1924
1925         temp = (char **) realloc (config_listen_address_list,
1926             sizeof (char *) * (config_listen_address_list_len + 1));
1927         if (temp == NULL)
1928         {
1929           fprintf (stderr, "read_options: realloc failed.\n");
1930           return (2);
1931         }
1932         config_listen_address_list = temp;
1933
1934         temp[config_listen_address_list_len] = strdup (optarg);
1935         if (temp[config_listen_address_list_len] == NULL)
1936         {
1937           fprintf (stderr, "read_options: strdup failed.\n");
1938           return (2);
1939         }
1940         config_listen_address_list_len++;
1941       }
1942       break;
1943
1944       case 'f':
1945       {
1946         int temp;
1947
1948         temp = atoi (optarg);
1949         if (temp > 0)
1950           config_flush_interval = temp;
1951         else
1952         {
1953           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1954           status = 3;
1955         }
1956       }
1957       break;
1958
1959       case 'w':
1960       {
1961         int temp;
1962
1963         temp = atoi (optarg);
1964         if (temp > 0)
1965           config_write_interval = temp;
1966         else
1967         {
1968           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1969           status = 2;
1970         }
1971       }
1972       break;
1973
1974       case 'z':
1975       {
1976         int temp;
1977
1978         temp = atoi(optarg);
1979         if (temp > 0)
1980           config_write_jitter = temp;
1981         else
1982         {
1983           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1984           status = 2;
1985         }
1986
1987         break;
1988       }
1989
1990       case 'b':
1991       {
1992         size_t len;
1993
1994         if (config_base_dir != NULL)
1995           free (config_base_dir);
1996         config_base_dir = strdup (optarg);
1997         if (config_base_dir == NULL)
1998         {
1999           fprintf (stderr, "read_options: strdup failed.\n");
2000           return (3);
2001         }
2002
2003         len = strlen (config_base_dir);
2004         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2005         {
2006           config_base_dir[len - 1] = 0;
2007           len--;
2008         }
2009
2010         if (len < 1)
2011         {
2012           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2013           return (4);
2014         }
2015       }
2016       break;
2017
2018       case 'p':
2019       {
2020         if (config_pid_file != NULL)
2021           free (config_pid_file);
2022         config_pid_file = strdup (optarg);
2023         if (config_pid_file == NULL)
2024         {
2025           fprintf (stderr, "read_options: strdup failed.\n");
2026           return (3);
2027         }
2028       }
2029       break;
2030
2031       case 'j':
2032       {
2033         struct stat statbuf;
2034         const char *dir = optarg;
2035
2036         status = stat(dir, &statbuf);
2037         if (status != 0)
2038         {
2039           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2040           return 6;
2041         }
2042
2043         if (!S_ISDIR(statbuf.st_mode)
2044             || access(dir, R_OK|W_OK|X_OK) != 0)
2045         {
2046           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2047                   errno ? rrd_strerror(errno) : "");
2048           return 6;
2049         }
2050
2051         journal_cur = malloc(PATH_MAX + 1);
2052         journal_old = malloc(PATH_MAX + 1);
2053         if (journal_cur == NULL || journal_old == NULL)
2054         {
2055           fprintf(stderr, "malloc failure for journal files\n");
2056           return 6;
2057         }
2058         else 
2059         {
2060           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2061           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2062         }
2063       }
2064       break;
2065
2066       case 'h':
2067       case '?':
2068         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2069             "\n"
2070             "Usage: rrdcached [options]\n"
2071             "\n"
2072             "Valid options are:\n"
2073             "  -l <address>  Socket address to listen to.\n"
2074             "  -w <seconds>  Interval in which to write data.\n"
2075             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2076             "  -f <seconds>  Interval in which to flush dead data.\n"
2077             "  -p <file>     Location of the PID-file.\n"
2078             "  -b <dir>      Base directory to change to.\n"
2079             "  -g            Do not fork and run in the foreground.\n"
2080             "  -j <dir>      Directory in which to create the journal files.\n"
2081             "\n"
2082             "For more information and a detailed description of all options "
2083             "please refer\n"
2084             "to the rrdcached(1) manual page.\n",
2085             VERSION);
2086         status = -1;
2087         break;
2088     } /* switch (option) */
2089   } /* while (getopt) */
2090
2091   /* advise the user when values are not sane */
2092   if (config_flush_interval < 2 * config_write_interval)
2093     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2094             " 2x write interval (-w) !\n");
2095   if (config_write_jitter > config_write_interval)
2096     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2097             " write interval (-w) !\n");
2098
2099   return (status);
2100 } /* }}} int read_options */
2101
2102 int main (int argc, char **argv)
2103 {
2104   int status;
2105
2106   status = read_options (argc, argv);
2107   if (status != 0)
2108   {
2109     if (status < 0)
2110       status = 0;
2111     return (status);
2112   }
2113
2114   status = daemonize ();
2115   if (status == 1)
2116   {
2117     struct sigaction sigchld;
2118
2119     memset (&sigchld, 0, sizeof (sigchld));
2120     sigchld.sa_handler = SIG_IGN;
2121     sigaction (SIGCHLD, &sigchld, NULL);
2122
2123     return (0);
2124   }
2125   else if (status != 0)
2126   {
2127     fprintf (stderr, "daemonize failed, exiting.\n");
2128     return (1);
2129   }
2130
2131   if (journal_cur != NULL)
2132   {
2133     int had_journal = 0;
2134
2135     pthread_mutex_lock(&journal_lock);
2136
2137     RRDD_LOG(LOG_INFO, "checking for journal files");
2138
2139     had_journal += journal_replay(journal_old);
2140     had_journal += journal_replay(journal_cur);
2141
2142     if (had_journal)
2143       flush_old_values(-1);
2144
2145     pthread_mutex_unlock(&journal_lock);
2146     journal_rotate();
2147
2148     RRDD_LOG(LOG_INFO, "journal processing complete");
2149   }
2150
2151   /* start the queue thread */
2152   memset (&queue_thread, 0, sizeof (queue_thread));
2153   status = pthread_create (&queue_thread,
2154                            NULL, /* attr */
2155                            queue_thread_main,
2156                            NULL); /* args */
2157   if (status != 0)
2158   {
2159     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2160     cleanup();
2161     return (1);
2162   }
2163
2164   listen_thread_main (NULL);
2165   cleanup ();
2166
2167   return (0);
2168 } /* int main */
2169
2170 /*
2171  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2172  */