79b908d2c3ec97408f6a2810ea40e93cdf3863e6
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  *   kevin brintnall <kbrint@rufus.net>
21  **/
22
23 #if 0
24 /*
25  * First tell the compiler to stick to the C99 and POSIX standards as close as
26  * possible.
27  */
28 #ifndef __STRICT_ANSI__ /* {{{ */
29 # define __STRICT_ANSI__
30 #endif
31
32 #ifndef _ISOC99_SOURCE
33 # define _ISOC99_SOURCE
34 #endif
35
36 #ifdef _POSIX_C_SOURCE
37 # undef _POSIX_C_SOURCE
38 #endif
39 #define _POSIX_C_SOURCE 200112L
40
41 /* Single UNIX needed for strdup. */
42 #ifdef _XOPEN_SOURCE
43 # undef _XOPEN_SOURCE
44 #endif
45 #define _XOPEN_SOURCE 500
46
47 #ifndef _REENTRANT
48 # define _REENTRANT
49 #endif
50
51 #ifndef _THREAD_SAFE
52 # define _THREAD_SAFE
53 #endif
54
55 #ifdef _GNU_SOURCE
56 # undef _GNU_SOURCE
57 #endif
58 /* }}} */
59 #endif /* 0 */
60
61 /*
62  * Now for some includes..
63  */
64 #include "rrd.h" /* {{{ */
65 #include "rrd_client.h"
66
67 #include <stdlib.h>
68 #include <stdint.h>
69 #include <stdio.h>
70 #include <unistd.h>
71 #include <string.h>
72 #include <strings.h>
73 #include <stdint.h>
74 #include <inttypes.h>
75
76 #include <sys/types.h>
77 #include <sys/stat.h>
78 #include <fcntl.h>
79 #include <signal.h>
80 #include <sys/socket.h>
81 #include <sys/un.h>
82 #include <netdb.h>
83 #include <poll.h>
84 #include <syslog.h>
85 #include <pthread.h>
86 #include <errno.h>
87 #include <assert.h>
88 #include <sys/time.h>
89 #include <time.h>
90
91 #include <glib-2.0/glib.h>
92 /* }}} */
93
94 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
95
96 #ifndef __GNUC__
97 # define __attribute__(x) /**/
98 #endif
99
100 /*
101  * Types
102  */
103 struct listen_socket_s
104 {
105   int fd;
106   char path[PATH_MAX + 1];
107 };
108 typedef struct listen_socket_s listen_socket_t;
109
110 struct cache_item_s;
111 typedef struct cache_item_s cache_item_t;
112 struct cache_item_s
113 {
114   char *file;
115   char **values;
116   int values_num;
117   time_t last_flush_time;
118 #define CI_FLAGS_IN_TREE  (1<<0)
119 #define CI_FLAGS_IN_QUEUE (1<<1)
120   int flags;
121
122   cache_item_t *next;
123 };
124
125 struct callback_flush_data_s
126 {
127   time_t now;
128   time_t abs_timeout;
129   char **keys;
130   size_t keys_num;
131 };
132 typedef struct callback_flush_data_s callback_flush_data_t;
133
134 enum queue_side_e
135 {
136   HEAD,
137   TAIL
138 };
139 typedef enum queue_side_e queue_side_t;
140
141 /* max length of socket command or response */
142 #define CMD_MAX 4096
143
144 /*
145  * Variables
146  */
147 static int stay_foreground = 0;
148
149 static listen_socket_t *listen_fds = NULL;
150 static size_t listen_fds_num = 0;
151
152 static int do_shutdown = 0;
153
154 static pthread_t queue_thread;
155
156 static pthread_t *connection_threads = NULL;
157 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
158 static int connection_threads_num = 0;
159
160 /* Cache stuff */
161 static GTree          *cache_tree = NULL;
162 static cache_item_t   *cache_queue_head = NULL;
163 static cache_item_t   *cache_queue_tail = NULL;
164 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
165 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
166
167 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
168
169 static int config_write_interval = 300;
170 static int config_write_jitter   = 0;
171 static int config_flush_interval = 3600;
172 static char *config_pid_file = NULL;
173 static char *config_base_dir = NULL;
174
175 static char **config_listen_address_list = NULL;
176 static int config_listen_address_list_len = 0;
177
178 static uint64_t stats_queue_length = 0;
179 static uint64_t stats_updates_received = 0;
180 static uint64_t stats_flush_received = 0;
181 static uint64_t stats_updates_written = 0;
182 static uint64_t stats_data_sets_written = 0;
183 static uint64_t stats_journal_bytes = 0;
184 static uint64_t stats_journal_rotate = 0;
185 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186
187 /* Journaled updates */
188 static char *journal_cur = NULL;
189 static char *journal_old = NULL;
190 static FILE *journal_fh = NULL;
191 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
192 static int journal_write(char *cmd, char *args);
193 static void journal_done(void);
194 static void journal_rotate(void);
195
196 /* 
197  * Functions
198  */
199 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
200 {
201   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
202   do_shutdown++;
203   pthread_cond_broadcast(&cache_cond);
204 } /* }}} void sig_int_handler */
205
206 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
207 {
208   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
209   do_shutdown++;
210   pthread_cond_broadcast(&cache_cond);
211 } /* }}} void sig_term_handler */
212
213 static int write_pidfile (void) /* {{{ */
214 {
215   pid_t pid;
216   char *file;
217   int fd;
218   FILE *fh;
219
220   pid = getpid ();
221   
222   file = (config_pid_file != NULL)
223     ? config_pid_file
224     : LOCALSTATEDIR "/run/rrdcached.pid";
225
226   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
227   if (fd < 0)
228   {
229     RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
230              file, rrd_strerror(errno));
231     return (-1);
232   }
233
234   fh = fdopen (fd, "w");
235   if (fh == NULL)
236   {
237     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
238     close(fd);
239     return (-1);
240   }
241
242   fprintf (fh, "%i\n", (int) pid);
243   fclose (fh);
244
245   return (0);
246 } /* }}} int write_pidfile */
247
248 static int remove_pidfile (void) /* {{{ */
249 {
250   char *file;
251   int status;
252
253   file = (config_pid_file != NULL)
254     ? config_pid_file
255     : LOCALSTATEDIR "/run/rrdcached.pid";
256
257   status = unlink (file);
258   if (status == 0)
259     return (0);
260   return (errno);
261 } /* }}} int remove_pidfile */
262
263 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
264 {
265   char    *buffer;
266   size_t   buffer_used;
267   size_t   buffer_free;
268   ssize_t  status;
269
270   buffer       = (char *) buffer_void;
271   buffer_used  = 0;
272   buffer_free  = buffer_size;
273
274   while (buffer_free > 0)
275   {
276     status = read (fd, buffer + buffer_used, buffer_free);
277     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
278       continue;
279
280     if (status < 0)
281       return (-1);
282
283     if (status == 0)
284       return (0);
285
286     assert ((0 > status) || (buffer_free >= (size_t) status));
287
288     buffer_free = buffer_free - status;
289     buffer_used = buffer_used + status;
290
291     if (buffer[buffer_used - 1] == '\n')
292       break;
293   }
294
295   assert (buffer_used > 0);
296
297   if (buffer[buffer_used - 1] != '\n')
298   {
299     errno = ENOBUFS;
300     return (-1);
301   }
302
303   buffer[buffer_used - 1] = 0;
304
305   /* Fix network line endings. */
306   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
307   {
308     buffer_used--;
309     buffer[buffer_used - 1] = 0;
310   }
311
312   return (buffer_used);
313 } /* }}} ssize_t sread */
314
315 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
316 {
317   const char *ptr;
318   size_t      nleft;
319   ssize_t     status;
320
321   /* special case for journal replay */
322   if (fd < 0) return 0;
323
324   ptr   = (const char *) buf;
325   nleft = count;
326
327   while (nleft > 0)
328   {
329     status = write (fd, (const void *) ptr, nleft);
330
331     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
332       continue;
333
334     if (status < 0)
335       return (status);
336
337     nleft -= status;
338     ptr   += status;
339   }
340
341   return (0);
342 } /* }}} ssize_t swrite */
343
344 static void _wipe_ci_values(cache_item_t *ci, time_t when)
345 {
346   ci->values = NULL;
347   ci->values_num = 0;
348
349   ci->last_flush_time = when;
350   if (config_write_jitter > 0)
351     ci->last_flush_time += (random() % config_write_jitter);
352
353   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
354 }
355
356 /*
357  * enqueue_cache_item:
358  * `cache_lock' must be acquired before calling this function!
359  */
360 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
361     queue_side_t side)
362 {
363   int did_insert = 0;
364
365   if (ci == NULL)
366     return (-1);
367
368   if (ci->values_num == 0)
369     return (0);
370
371   if (side == HEAD)
372   {
373     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
374     {
375       assert (ci->next == NULL);
376       ci->next = cache_queue_head;
377       cache_queue_head = ci;
378
379       if (cache_queue_tail == NULL)
380         cache_queue_tail = cache_queue_head;
381
382       did_insert = 1;
383     }
384     else if (cache_queue_head == ci)
385     {
386       /* do nothing */
387     }
388     else /* enqueued, but not first entry */
389     {
390       cache_item_t *prev;
391
392       /* find previous entry */
393       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
394         if (prev->next == ci)
395           break;
396       assert (prev != NULL);
397
398       /* move to the front */
399       prev->next = ci->next;
400       ci->next = cache_queue_head;
401       cache_queue_head = ci;
402
403       /* check if we need to adapt the tail */
404       if (cache_queue_tail == ci)
405         cache_queue_tail = prev;
406     }
407   }
408   else /* (side == TAIL) */
409   {
410     /* We don't move values back in the list.. */
411     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
412       return (0);
413
414     assert (ci->next == NULL);
415
416     if (cache_queue_tail == NULL)
417       cache_queue_head = ci;
418     else
419       cache_queue_tail->next = ci;
420     cache_queue_tail = ci;
421
422     did_insert = 1;
423   }
424
425   ci->flags |= CI_FLAGS_IN_QUEUE;
426
427   if (did_insert)
428   {
429     pthread_mutex_lock (&stats_lock);
430     stats_queue_length++;
431     pthread_mutex_unlock (&stats_lock);
432   }
433
434   return (0);
435 } /* }}} int enqueue_cache_item */
436
437 /*
438  * tree_callback_flush:
439  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
440  * while this is in progress.
441  */
442 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
443     gpointer data)
444 {
445   cache_item_t *ci;
446   callback_flush_data_t *cfd;
447
448   ci = (cache_item_t *) value;
449   cfd = (callback_flush_data_t *) data;
450
451   if ((ci->last_flush_time <= cfd->abs_timeout)
452       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
453       && (ci->values_num > 0))
454   {
455     enqueue_cache_item (ci, TAIL);
456   }
457   else if ((do_shutdown != 0)
458       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
459       && (ci->values_num > 0))
460   {
461     enqueue_cache_item (ci, TAIL);
462   }
463   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
464       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
465       && (ci->values_num <= 0))
466   {
467     char **temp;
468
469     temp = (char **) realloc (cfd->keys,
470         sizeof (char *) * (cfd->keys_num + 1));
471     if (temp == NULL)
472     {
473       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
474       return (FALSE);
475     }
476     cfd->keys = temp;
477     /* Make really sure this points to the _same_ place */
478     assert ((char *) key == ci->file);
479     cfd->keys[cfd->keys_num] = (char *) key;
480     cfd->keys_num++;
481   }
482
483   return (FALSE);
484 } /* }}} gboolean tree_callback_flush */
485
486 static int flush_old_values (int max_age)
487 {
488   callback_flush_data_t cfd;
489   size_t k;
490
491   memset (&cfd, 0, sizeof (cfd));
492   /* Pass the current time as user data so that we don't need to call
493    * `time' for each node. */
494   cfd.now = time (NULL);
495   cfd.keys = NULL;
496   cfd.keys_num = 0;
497
498   if (max_age > 0)
499     cfd.abs_timeout = cfd.now - max_age;
500   else
501     cfd.abs_timeout = cfd.now + 1;
502
503   /* `tree_callback_flush' will return the keys of all values that haven't
504    * been touched in the last `config_flush_interval' seconds in `cfd'.
505    * The char*'s in this array point to the same memory as ci->file, so we
506    * don't need to free them separately. */
507   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
508
509   for (k = 0; k < cfd.keys_num; k++)
510   {
511     cache_item_t *ci;
512
513     /* This must not fail. */
514     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
515     assert (ci != NULL);
516
517     /* If we end up here with values available, something's seriously
518      * messed up. */
519     assert (ci->values_num == 0);
520
521     /* Remove the node from the tree */
522     g_tree_remove (cache_tree, cfd.keys[k]);
523     cfd.keys[k] = NULL;
524
525     /* Now free and clean up `ci'. */
526     free (ci->file);
527     ci->file = NULL;
528     free (ci);
529     ci = NULL;
530   } /* for (k = 0; k < cfd.keys_num; k++) */
531
532   if (cfd.keys != NULL)
533   {
534     free (cfd.keys);
535     cfd.keys = NULL;
536   }
537
538   return (0);
539 } /* int flush_old_values */
540
541 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
542 {
543   struct timeval now;
544   struct timespec next_flush;
545
546   gettimeofday (&now, NULL);
547   next_flush.tv_sec = now.tv_sec + config_flush_interval;
548   next_flush.tv_nsec = 1000 * now.tv_usec;
549
550   pthread_mutex_lock (&cache_lock);
551   while ((do_shutdown == 0) || (cache_queue_head != NULL))
552   {
553     cache_item_t *ci;
554     char *file;
555     char **values;
556     int values_num;
557     int status;
558     int i;
559
560     /* First, check if it's time to do the cache flush. */
561     gettimeofday (&now, NULL);
562     if ((now.tv_sec > next_flush.tv_sec)
563         || ((now.tv_sec == next_flush.tv_sec)
564           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
565     {
566       /* Flush all values that haven't been written in the last
567        * `config_write_interval' seconds. */
568       flush_old_values (config_write_interval);
569
570       /* Determine the time of the next cache flush. */
571       while (next_flush.tv_sec <= now.tv_sec)
572         next_flush.tv_sec += config_flush_interval;
573
574       /* unlock the cache while we rotate so we don't block incoming
575        * updates if the fsync() blocks on disk I/O */
576       pthread_mutex_unlock(&cache_lock);
577       journal_rotate();
578       pthread_mutex_lock(&cache_lock);
579     }
580
581     /* Now, check if there's something to store away. If not, wait until
582      * something comes in or it's time to do the cache flush. */
583     if (cache_queue_head == NULL)
584     {
585       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
586       if ((status != 0) && (status != ETIMEDOUT))
587       {
588         RRDD_LOG (LOG_ERR, "queue_thread_main: "
589             "pthread_cond_timedwait returned %i.", status);
590       }
591     }
592
593     /* We're about to shut down, so lets flush the entire tree. */
594     if ((do_shutdown != 0) && (cache_queue_head == NULL))
595       flush_old_values (/* max age = */ -1);
596
597     /* Check if a value has arrived. This may be NULL if we timed out or there
598      * was an interrupt such as a signal. */
599     if (cache_queue_head == NULL)
600       continue;
601
602     ci = cache_queue_head;
603
604     /* copy the relevant parts */
605     file = strdup (ci->file);
606     if (file == NULL)
607     {
608       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
609       continue;
610     }
611
612     assert(ci->values != NULL);
613     assert(ci->values_num > 0);
614
615     values = ci->values;
616     values_num = ci->values_num;
617
618     _wipe_ci_values(ci, time(NULL));
619
620     cache_queue_head = ci->next;
621     if (cache_queue_head == NULL)
622       cache_queue_tail = NULL;
623     ci->next = NULL;
624
625     pthread_mutex_lock (&stats_lock);
626     assert (stats_queue_length > 0);
627     stats_queue_length--;
628     pthread_mutex_unlock (&stats_lock);
629
630     pthread_mutex_unlock (&cache_lock);
631
632     rrd_clear_error ();
633     status = rrd_update_r (file, NULL, values_num, (void *) values);
634     if (status != 0)
635     {
636       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
637           "rrd_update_r (%s) failed with status %i. (%s)",
638           file, status, rrd_get_error());
639     }
640
641     journal_write("wrote", file);
642
643     for (i = 0; i < values_num; i++)
644       free (values[i]);
645
646     free(values);
647     free(file);
648
649     if (status == 0)
650     {
651       pthread_mutex_lock (&stats_lock);
652       stats_updates_written++;
653       stats_data_sets_written += values_num;
654       pthread_mutex_unlock (&stats_lock);
655     }
656
657     pthread_mutex_lock (&cache_lock);
658     pthread_cond_broadcast (&flush_cond);
659
660     /* We're about to shut down, so lets flush the entire tree. */
661     if ((do_shutdown != 0) && (cache_queue_head == NULL))
662       flush_old_values (/* max age = */ -1);
663   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
664   pthread_mutex_unlock (&cache_lock);
665
666   assert(cache_queue_head == NULL);
667   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
668   journal_done();
669
670   return (NULL);
671 } /* }}} void *queue_thread_main */
672
673 static int buffer_get_field (char **buffer_ret, /* {{{ */
674     size_t *buffer_size_ret, char **field_ret)
675 {
676   char *buffer;
677   size_t buffer_pos;
678   size_t buffer_size;
679   char *field;
680   size_t field_size;
681   int status;
682
683   buffer = *buffer_ret;
684   buffer_pos = 0;
685   buffer_size = *buffer_size_ret;
686   field = *buffer_ret;
687   field_size = 0;
688
689   if (buffer_size <= 0)
690     return (-1);
691
692   /* This is ensured by `handle_request'. */
693   assert (buffer[buffer_size - 1] == '\0');
694
695   status = -1;
696   while (buffer_pos < buffer_size)
697   {
698     /* Check for end-of-field or end-of-buffer */
699     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
700     {
701       field[field_size] = 0;
702       field_size++;
703       buffer_pos++;
704       status = 0;
705       break;
706     }
707     /* Handle escaped characters. */
708     else if (buffer[buffer_pos] == '\\')
709     {
710       if (buffer_pos >= (buffer_size - 1))
711         break;
712       buffer_pos++;
713       field[field_size] = buffer[buffer_pos];
714       field_size++;
715       buffer_pos++;
716     }
717     /* Normal operation */ 
718     else
719     {
720       field[field_size] = buffer[buffer_pos];
721       field_size++;
722       buffer_pos++;
723     }
724   } /* while (buffer_pos < buffer_size) */
725
726   if (status != 0)
727     return (status);
728
729   *buffer_ret = buffer + buffer_pos;
730   *buffer_size_ret = buffer_size - buffer_pos;
731   *field_ret = field;
732
733   return (0);
734 } /* }}} int buffer_get_field */
735
736 static int flush_file (const char *filename) /* {{{ */
737 {
738   cache_item_t *ci;
739
740   pthread_mutex_lock (&cache_lock);
741
742   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
743   if (ci == NULL)
744   {
745     pthread_mutex_unlock (&cache_lock);
746     return (ENOENT);
747   }
748
749   /* Enqueue at head */
750   enqueue_cache_item (ci, HEAD);
751   pthread_cond_signal (&cache_cond);
752
753   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
754   {
755     ci = NULL;
756
757     pthread_cond_wait (&flush_cond, &cache_lock);
758
759     ci = g_tree_lookup (cache_tree, filename);
760     if (ci == NULL)
761     {
762       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
763           "while waiting for flush.");
764       pthread_mutex_unlock (&cache_lock);
765       return (-1);
766     }
767   }
768
769   pthread_mutex_unlock (&cache_lock);
770   return (0);
771 } /* }}} int flush_file */
772
773 static int handle_request_help (int fd, /* {{{ */
774     char *buffer, size_t buffer_size)
775 {
776   int status;
777   char **help_text;
778   size_t help_text_len;
779   char *command;
780   size_t i;
781
782   char *help_help[] =
783   {
784     "4 Command overview\n",
785     "FLUSH <filename>\n",
786     "HELP [<command>]\n",
787     "UPDATE <filename> <values> [<values> ...]\n",
788     "STATS\n"
789   };
790   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
791
792   char *help_flush[] =
793   {
794     "4 Help for FLUSH\n",
795     "Usage: FLUSH <filename>\n",
796     "\n",
797     "Adds the given filename to the head of the update queue and returns\n",
798     "after is has been dequeued.\n"
799   };
800   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
801
802   char *help_update[] =
803   {
804     "9 Help for UPDATE\n",
805     "Usage: UPDATE <filename> <values> [<values> ...]\n"
806     "\n",
807     "Adds the given file to the internal cache if it is not yet known and\n",
808     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
809     "for details.\n",
810     "\n",
811     "Each <values> has the following form:\n",
812     "  <values> = <time>:<value>[:<value>[...]]\n",
813     "See the rrdupdate(1) manpage for details.\n"
814   };
815   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
816
817   char *help_stats[] =
818   {
819     "4 Help for STATS\n",
820     "Usage: STATS\n",
821     "\n",
822     "Returns some performance counters, see the rrdcached(1) manpage for\n",
823     "a description of the values.\n"
824   };
825   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
826
827   status = buffer_get_field (&buffer, &buffer_size, &command);
828   if (status != 0)
829   {
830     help_text = help_help;
831     help_text_len = help_help_len;
832   }
833   else
834   {
835     if (strcasecmp (command, "update") == 0)
836     {
837       help_text = help_update;
838       help_text_len = help_update_len;
839     }
840     else if (strcasecmp (command, "flush") == 0)
841     {
842       help_text = help_flush;
843       help_text_len = help_flush_len;
844     }
845     else if (strcasecmp (command, "stats") == 0)
846     {
847       help_text = help_stats;
848       help_text_len = help_stats_len;
849     }
850     else
851     {
852       help_text = help_help;
853       help_text_len = help_help_len;
854     }
855   }
856
857   for (i = 0; i < help_text_len; i++)
858   {
859     status = swrite (fd, help_text[i], strlen (help_text[i]));
860     if (status < 0)
861     {
862       status = errno;
863       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
864       return (status);
865     }
866   }
867
868   return (0);
869 } /* }}} int handle_request_help */
870
871 static int handle_request_stats (int fd, /* {{{ */
872     char *buffer __attribute__((unused)),
873     size_t buffer_size __attribute__((unused)))
874 {
875   int status;
876   char outbuf[CMD_MAX];
877
878   uint64_t copy_queue_length;
879   uint64_t copy_updates_received;
880   uint64_t copy_flush_received;
881   uint64_t copy_updates_written;
882   uint64_t copy_data_sets_written;
883   uint64_t copy_journal_bytes;
884   uint64_t copy_journal_rotate;
885
886   uint64_t tree_nodes_number;
887   uint64_t tree_depth;
888
889   pthread_mutex_lock (&stats_lock);
890   copy_queue_length       = stats_queue_length;
891   copy_updates_received   = stats_updates_received;
892   copy_flush_received     = stats_flush_received;
893   copy_updates_written    = stats_updates_written;
894   copy_data_sets_written  = stats_data_sets_written;
895   copy_journal_bytes      = stats_journal_bytes;
896   copy_journal_rotate     = stats_journal_rotate;
897   pthread_mutex_unlock (&stats_lock);
898
899   pthread_mutex_lock (&cache_lock);
900   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
901   tree_depth        = (uint64_t) g_tree_height (cache_tree);
902   pthread_mutex_unlock (&cache_lock);
903
904 #define RRDD_STATS_SEND \
905   outbuf[sizeof (outbuf) - 1] = 0; \
906   status = swrite (fd, outbuf, strlen (outbuf)); \
907   if (status < 0) \
908   { \
909     status = errno; \
910     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
911     return (status); \
912   }
913
914   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
915   RRDD_STATS_SEND;
916
917   snprintf (outbuf, sizeof (outbuf),
918       "QueueLength: %"PRIu64"\n", copy_queue_length);
919   RRDD_STATS_SEND;
920
921   snprintf (outbuf, sizeof (outbuf),
922       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
923   RRDD_STATS_SEND;
924
925   snprintf (outbuf, sizeof (outbuf),
926       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
927   RRDD_STATS_SEND;
928
929   snprintf (outbuf, sizeof (outbuf),
930       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
931   RRDD_STATS_SEND;
932
933   snprintf (outbuf, sizeof (outbuf),
934       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
935   RRDD_STATS_SEND;
936
937   snprintf (outbuf, sizeof (outbuf),
938       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
939   RRDD_STATS_SEND;
940
941   snprintf (outbuf, sizeof (outbuf),
942       "TreeDepth: %"PRIu64"\n", tree_depth);
943   RRDD_STATS_SEND;
944
945   snprintf (outbuf, sizeof(outbuf),
946       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
947   RRDD_STATS_SEND;
948
949   snprintf (outbuf, sizeof(outbuf),
950       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
951   RRDD_STATS_SEND;
952
953   return (0);
954 #undef RRDD_STATS_SEND
955 } /* }}} int handle_request_stats */
956
957 static int handle_request_flush (int fd, /* {{{ */
958     char *buffer, size_t buffer_size)
959 {
960   char *file;
961   int status;
962   char result[CMD_MAX];
963
964   status = buffer_get_field (&buffer, &buffer_size, &file);
965   if (status != 0)
966   {
967     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
968   }
969   else
970   {
971     pthread_mutex_lock(&stats_lock);
972     stats_flush_received++;
973     pthread_mutex_unlock(&stats_lock);
974
975     status = flush_file (file);
976     if (status == 0)
977       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
978     else if (status == ENOENT)
979     {
980       /* no file in our tree; see whether it exists at all */
981       struct stat statbuf;
982
983       memset(&statbuf, 0, sizeof(statbuf));
984       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
985         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
986       else
987         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
988     }
989     else if (status < 0)
990       strncpy (result, "-1 Internal error.\n", sizeof (result));
991     else
992       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
993   }
994   result[sizeof (result) - 1] = 0;
995
996   status = swrite (fd, result, strlen (result));
997   if (status < 0)
998   {
999     status = errno;
1000     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1001     return (status);
1002   }
1003
1004   return (0);
1005 } /* }}} int handle_request_flush */
1006
1007 static int handle_request_update (int fd, /* {{{ */
1008     char *buffer, size_t buffer_size)
1009 {
1010   char *file;
1011   int values_num = 0;
1012   int status;
1013
1014   time_t now;
1015
1016   cache_item_t *ci;
1017   char answer[CMD_MAX];
1018
1019 #define RRDD_UPDATE_SEND \
1020   answer[sizeof (answer) - 1] = 0; \
1021   status = swrite (fd, answer, strlen (answer)); \
1022   if (status < 0) \
1023   { \
1024     status = errno; \
1025     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1026     return (status); \
1027   }
1028
1029   now = time (NULL);
1030
1031   status = buffer_get_field (&buffer, &buffer_size, &file);
1032   if (status != 0)
1033   {
1034     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1035         sizeof (answer));
1036     RRDD_UPDATE_SEND;
1037     return (0);
1038   }
1039
1040   pthread_mutex_lock(&stats_lock);
1041   stats_updates_received++;
1042   pthread_mutex_unlock(&stats_lock);
1043
1044   pthread_mutex_lock (&cache_lock);
1045
1046   ci = g_tree_lookup (cache_tree, file);
1047   if (ci == NULL) /* {{{ */
1048   {
1049     struct stat statbuf;
1050
1051     memset (&statbuf, 0, sizeof (statbuf));
1052     status = stat (file, &statbuf);
1053     if (status != 0)
1054     {
1055       pthread_mutex_unlock (&cache_lock);
1056       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1057
1058       status = errno;
1059       if (status == ENOENT)
1060         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1061       else
1062         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1063             status);
1064       RRDD_UPDATE_SEND;
1065       return (0);
1066     }
1067     if (!S_ISREG (statbuf.st_mode))
1068     {
1069       pthread_mutex_unlock (&cache_lock);
1070
1071       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1072       RRDD_UPDATE_SEND;
1073       return (0);
1074     }
1075     if (access(file, R_OK|W_OK) != 0)
1076     {
1077       pthread_mutex_unlock (&cache_lock);
1078
1079       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1080                 file, rrd_strerror(errno));
1081       RRDD_UPDATE_SEND;
1082       return (0);
1083     }
1084
1085     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1086     if (ci == NULL)
1087     {
1088       pthread_mutex_unlock (&cache_lock);
1089       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1090
1091       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1092       RRDD_UPDATE_SEND;
1093       return (0);
1094     }
1095     memset (ci, 0, sizeof (cache_item_t));
1096
1097     ci->file = strdup (file);
1098     if (ci->file == NULL)
1099     {
1100       pthread_mutex_unlock (&cache_lock);
1101       free (ci);
1102       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1103
1104       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1105       RRDD_UPDATE_SEND;
1106       return (0);
1107     }
1108
1109     _wipe_ci_values(ci, now);
1110     ci->flags = CI_FLAGS_IN_TREE;
1111
1112     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1113   } /* }}} */
1114   assert (ci != NULL);
1115
1116   while (buffer_size > 0)
1117   {
1118     char **temp;
1119     char *value;
1120
1121     status = buffer_get_field (&buffer, &buffer_size, &value);
1122     if (status != 0)
1123     {
1124       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1125       break;
1126     }
1127
1128     temp = (char **) realloc (ci->values,
1129         sizeof (char *) * (ci->values_num + 1));
1130     if (temp == NULL)
1131     {
1132       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1133       continue;
1134     }
1135     ci->values = temp;
1136
1137     ci->values[ci->values_num] = strdup (value);
1138     if (ci->values[ci->values_num] == NULL)
1139     {
1140       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1141       continue;
1142     }
1143     ci->values_num++;
1144
1145     values_num++;
1146   }
1147
1148   if (((now - ci->last_flush_time) >= config_write_interval)
1149       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1150       && (ci->values_num > 0))
1151   {
1152     enqueue_cache_item (ci, TAIL);
1153     pthread_cond_signal (&cache_cond);
1154   }
1155
1156   pthread_mutex_unlock (&cache_lock);
1157
1158   if (values_num < 1)
1159   {
1160     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1161   }
1162   else
1163   {
1164     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1165         (values_num == 1) ? "" : "s");
1166   }
1167   RRDD_UPDATE_SEND;
1168   return (0);
1169 #undef RRDD_UPDATE_SEND
1170 } /* }}} int handle_request_update */
1171
1172 /* we came across a "WROTE" entry during journal replay.
1173  * throw away any values that we have accumulated for this file
1174  */
1175 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1176                                  const char *buffer,
1177                                  size_t buffer_size __attribute__((unused)))
1178 {
1179   int i;
1180   cache_item_t *ci;
1181   const char *file = buffer;
1182
1183   pthread_mutex_lock(&cache_lock);
1184
1185   ci = g_tree_lookup(cache_tree, file);
1186   if (ci == NULL)
1187   {
1188     pthread_mutex_unlock(&cache_lock);
1189     return (0);
1190   }
1191
1192   if (ci->values)
1193   {
1194     for (i=0; i < ci->values_num; i++)
1195       free(ci->values[i]);
1196
1197     free(ci->values);
1198   }
1199
1200   _wipe_ci_values(ci, time(NULL));
1201
1202   pthread_mutex_unlock(&cache_lock);
1203   return (0);
1204 } /* }}} int handle_request_wrote */
1205
1206 /* if fd < 0, we are in journal replay mode */
1207 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1208 {
1209   char *buffer_ptr;
1210   char *command;
1211   int status;
1212
1213   assert (buffer[buffer_size - 1] == '\0');
1214
1215   buffer_ptr = buffer;
1216   command = NULL;
1217   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1218   if (status != 0)
1219   {
1220     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1221     return (-1);
1222   }
1223
1224   if (strcasecmp (command, "update") == 0)
1225   {
1226     /* don't re-write updates in replay mode */
1227     if (fd >= 0)
1228       journal_write(command, buffer_ptr);
1229
1230     return (handle_request_update (fd, buffer_ptr, buffer_size));
1231   }
1232   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1233   {
1234     /* this is only valid in replay mode */
1235     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1236   }
1237   else if (strcasecmp (command, "flush") == 0)
1238   {
1239     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1240   }
1241   else if (strcasecmp (command, "stats") == 0)
1242   {
1243     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1244   }
1245   else if (strcasecmp (command, "help") == 0)
1246   {
1247     return (handle_request_help (fd, buffer_ptr, buffer_size));
1248   }
1249   else
1250   {
1251     char result[CMD_MAX];
1252
1253     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1254     result[sizeof (result) - 1] = 0;
1255
1256     status = swrite (fd, result, strlen (result));
1257     if (status < 0)
1258     {
1259       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1260       return (-1);
1261     }
1262   }
1263
1264   return (0);
1265 } /* }}} int handle_request */
1266
1267 /* MUST NOT hold journal_lock before calling this */
1268 static void journal_rotate(void) /* {{{ */
1269 {
1270   FILE *old_fh = NULL;
1271
1272   if (journal_cur == NULL || journal_old == NULL)
1273     return;
1274
1275   pthread_mutex_lock(&journal_lock);
1276
1277   /* we rotate this way (rename before close) so that the we can release
1278    * the journal lock as fast as possible.  Journal writes to the new
1279    * journal can proceed immediately after the new file is opened.  The
1280    * fclose can then block without affecting new updates.
1281    */
1282   if (journal_fh != NULL)
1283   {
1284     old_fh = journal_fh;
1285     rename(journal_cur, journal_old);
1286     ++stats_journal_rotate;
1287   }
1288
1289   journal_fh = fopen(journal_cur, "a");
1290   pthread_mutex_unlock(&journal_lock);
1291
1292   if (old_fh != NULL)
1293     fclose(old_fh);
1294
1295   if (journal_fh == NULL)
1296     RRDD_LOG(LOG_CRIT,
1297              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1298              journal_cur, rrd_strerror(errno));
1299
1300 } /* }}} static void journal_rotate */
1301
1302 static void journal_done(void) /* {{{ */
1303 {
1304   if (journal_cur == NULL)
1305     return;
1306
1307   pthread_mutex_lock(&journal_lock);
1308   if (journal_fh != NULL)
1309   {
1310     fclose(journal_fh);
1311     journal_fh = NULL;
1312   }
1313
1314   RRDD_LOG(LOG_INFO, "removing journals");
1315
1316   unlink(journal_old);
1317   unlink(journal_cur);
1318   pthread_mutex_unlock(&journal_lock);
1319
1320 } /* }}} static void journal_done */
1321
1322 static int journal_write(char *cmd, char *args) /* {{{ */
1323 {
1324   int chars;
1325
1326   if (journal_fh == NULL)
1327     return 0;
1328
1329   pthread_mutex_lock(&journal_lock);
1330   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1331   pthread_mutex_unlock(&journal_lock);
1332
1333   if (chars > 0)
1334   {
1335     pthread_mutex_lock(&stats_lock);
1336     stats_journal_bytes += chars;
1337     pthread_mutex_unlock(&stats_lock);
1338   }
1339
1340   return chars;
1341 } /* }}} static int journal_write */
1342
1343 static int journal_replay (const char *file) /* {{{ */
1344 {
1345   FILE *fh;
1346   int entry_cnt = 0;
1347   int fail_cnt = 0;
1348   uint64_t line = 0;
1349   char entry[CMD_MAX];
1350
1351   if (file == NULL) return 0;
1352
1353   fh = fopen(file, "r");
1354   if (fh == NULL)
1355   {
1356     if (errno != ENOENT)
1357       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1358                file, rrd_strerror(errno));
1359     return 0;
1360   }
1361   else
1362     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1363
1364   while(!feof(fh))
1365   {
1366     size_t entry_len;
1367
1368     ++line;
1369     fgets(entry, sizeof(entry), fh);
1370     entry_len = strlen(entry);
1371
1372     /* check \n termination in case journal writing crashed mid-line */
1373     if (entry_len == 0)
1374       continue;
1375     else if (entry[entry_len - 1] != '\n')
1376     {
1377       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1378       ++fail_cnt;
1379       continue;
1380     }
1381
1382     entry[entry_len - 1] = '\0';
1383
1384     if (handle_request(-1, entry, entry_len) == 0)
1385       ++entry_cnt;
1386     else
1387       ++fail_cnt;
1388   }
1389
1390   fclose(fh);
1391
1392   if (entry_cnt > 0)
1393   {
1394     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1395              entry_cnt, fail_cnt);
1396     return 1;
1397   }
1398   else
1399     return 0;
1400
1401 } /* }}} static int journal_replay */
1402
1403 static void *connection_thread_main (void *args) /* {{{ */
1404 {
1405   pthread_t self;
1406   int i;
1407   int fd;
1408   
1409   fd = *((int *) args);
1410   free (args);
1411
1412   pthread_mutex_lock (&connection_threads_lock);
1413   {
1414     pthread_t *temp;
1415
1416     temp = (pthread_t *) realloc (connection_threads,
1417         sizeof (pthread_t) * (connection_threads_num + 1));
1418     if (temp == NULL)
1419     {
1420       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1421     }
1422     else
1423     {
1424       connection_threads = temp;
1425       connection_threads[connection_threads_num] = pthread_self ();
1426       connection_threads_num++;
1427     }
1428   }
1429   pthread_mutex_unlock (&connection_threads_lock);
1430
1431   while (do_shutdown == 0)
1432   {
1433     char buffer[CMD_MAX];
1434
1435     struct pollfd pollfd;
1436     int status;
1437
1438     pollfd.fd = fd;
1439     pollfd.events = POLLIN | POLLPRI;
1440     pollfd.revents = 0;
1441
1442     status = poll (&pollfd, 1, /* timeout = */ 500);
1443     if (status == 0) /* timeout */
1444       continue;
1445     else if (status < 0) /* error */
1446     {
1447       status = errno;
1448       if (status == EINTR)
1449         continue;
1450       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1451       continue;
1452     }
1453
1454     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1455     {
1456       close (fd);
1457       break;
1458     }
1459     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1460     {
1461       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1462           "poll(2) returned something unexpected: %#04hx",
1463           pollfd.revents);
1464       close (fd);
1465       break;
1466     }
1467
1468     status = (int) sread (fd, buffer, sizeof (buffer));
1469     if (status <= 0)
1470     {
1471       close (fd);
1472
1473       if (status < 0)
1474         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1475
1476       break;
1477     }
1478
1479     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1480     if (status != 0)
1481     {
1482       close (fd);
1483       break;
1484     }
1485   }
1486
1487   self = pthread_self ();
1488   /* Remove this thread from the connection threads list */
1489   pthread_mutex_lock (&connection_threads_lock);
1490   /* Find out own index in the array */
1491   for (i = 0; i < connection_threads_num; i++)
1492     if (pthread_equal (connection_threads[i], self) != 0)
1493       break;
1494   assert (i < connection_threads_num);
1495
1496   /* Move the trailing threads forward. */
1497   if (i < (connection_threads_num - 1))
1498   {
1499     memmove (connection_threads + i,
1500         connection_threads + i + 1,
1501         sizeof (pthread_t) * (connection_threads_num - i - 1));
1502   }
1503
1504   connection_threads_num--;
1505   pthread_mutex_unlock (&connection_threads_lock);
1506
1507   return (NULL);
1508 } /* }}} void *connection_thread_main */
1509
1510 static int open_listen_socket_unix (const char *path) /* {{{ */
1511 {
1512   int fd;
1513   struct sockaddr_un sa;
1514   listen_socket_t *temp;
1515   int status;
1516
1517   temp = (listen_socket_t *) realloc (listen_fds,
1518       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1519   if (temp == NULL)
1520   {
1521     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1522     return (-1);
1523   }
1524   listen_fds = temp;
1525   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1526
1527   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1528   if (fd < 0)
1529   {
1530     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1531     return (-1);
1532   }
1533
1534   memset (&sa, 0, sizeof (sa));
1535   sa.sun_family = AF_UNIX;
1536   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1537
1538   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1539   if (status != 0)
1540   {
1541     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1542     close (fd);
1543     unlink (path);
1544     return (-1);
1545   }
1546
1547   status = listen (fd, /* backlog = */ 10);
1548   if (status != 0)
1549   {
1550     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1551     close (fd);
1552     unlink (path);
1553     return (-1);
1554   }
1555   
1556   listen_fds[listen_fds_num].fd = fd;
1557   snprintf (listen_fds[listen_fds_num].path,
1558       sizeof (listen_fds[listen_fds_num].path) - 1,
1559       "unix:%s", path);
1560   listen_fds_num++;
1561
1562   return (0);
1563 } /* }}} int open_listen_socket_unix */
1564
1565 static int open_listen_socket (const char *addr) /* {{{ */
1566 {
1567   struct addrinfo ai_hints;
1568   struct addrinfo *ai_res;
1569   struct addrinfo *ai_ptr;
1570   int status;
1571
1572   assert (addr != NULL);
1573
1574   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1575     return (open_listen_socket_unix (addr + strlen ("unix:")));
1576   else if (addr[0] == '/')
1577     return (open_listen_socket_unix (addr));
1578
1579   memset (&ai_hints, 0, sizeof (ai_hints));
1580   ai_hints.ai_flags = 0;
1581 #ifdef AI_ADDRCONFIG
1582   ai_hints.ai_flags |= AI_ADDRCONFIG;
1583 #endif
1584   ai_hints.ai_family = AF_UNSPEC;
1585   ai_hints.ai_socktype = SOCK_STREAM;
1586
1587   ai_res = NULL;
1588   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
1589   if (status != 0)
1590   {
1591     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1592         "%s", addr, gai_strerror (status));
1593     return (-1);
1594   }
1595
1596   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1597   {
1598     int fd;
1599     listen_socket_t *temp;
1600
1601     temp = (listen_socket_t *) realloc (listen_fds,
1602         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1603     if (temp == NULL)
1604     {
1605       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1606       continue;
1607     }
1608     listen_fds = temp;
1609     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1610
1611     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1612     if (fd < 0)
1613     {
1614       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1615       continue;
1616     }
1617
1618     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1619     if (status != 0)
1620     {
1621       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1622       close (fd);
1623       continue;
1624     }
1625
1626     status = listen (fd, /* backlog = */ 10);
1627     if (status != 0)
1628     {
1629       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1630       close (fd);
1631       return (-1);
1632     }
1633
1634     listen_fds[listen_fds_num].fd = fd;
1635     strncpy (listen_fds[listen_fds_num].path, addr,
1636         sizeof (listen_fds[listen_fds_num].path) - 1);
1637     listen_fds_num++;
1638   } /* for (ai_ptr) */
1639
1640   return (0);
1641 } /* }}} int open_listen_socket */
1642
1643 static int close_listen_sockets (void) /* {{{ */
1644 {
1645   size_t i;
1646
1647   for (i = 0; i < listen_fds_num; i++)
1648   {
1649     close (listen_fds[i].fd);
1650     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1651       unlink (listen_fds[i].path + strlen ("unix:"));
1652   }
1653
1654   free (listen_fds);
1655   listen_fds = NULL;
1656   listen_fds_num = 0;
1657
1658   return (0);
1659 } /* }}} int close_listen_sockets */
1660
1661 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1662 {
1663   struct pollfd *pollfds;
1664   int pollfds_num;
1665   int status;
1666   int i;
1667
1668   for (i = 0; i < config_listen_address_list_len; i++)
1669     open_listen_socket (config_listen_address_list[i]);
1670
1671   if (config_listen_address_list_len < 1)
1672     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1673
1674   if (listen_fds_num < 1)
1675   {
1676     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1677         "could be opened. Sorry.");
1678     return (NULL);
1679   }
1680
1681   pollfds_num = listen_fds_num;
1682   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1683   if (pollfds == NULL)
1684   {
1685     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1686     return (NULL);
1687   }
1688   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1689
1690   RRDD_LOG(LOG_INFO, "listening for connections");
1691
1692   while (do_shutdown == 0)
1693   {
1694     assert (pollfds_num == ((int) listen_fds_num));
1695     for (i = 0; i < pollfds_num; i++)
1696     {
1697       pollfds[i].fd = listen_fds[i].fd;
1698       pollfds[i].events = POLLIN | POLLPRI;
1699       pollfds[i].revents = 0;
1700     }
1701
1702     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1703     if (status < 1)
1704     {
1705       status = errno;
1706       if (status != EINTR)
1707       {
1708         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1709       }
1710       continue;
1711     }
1712
1713     for (i = 0; i < pollfds_num; i++)
1714     {
1715       int *client_sd;
1716       struct sockaddr_storage client_sa;
1717       socklen_t client_sa_size;
1718       pthread_t tid;
1719       pthread_attr_t attr;
1720
1721       if (pollfds[i].revents == 0)
1722         continue;
1723
1724       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1725       {
1726         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1727             "poll(2) returned something unexpected for listen FD #%i.",
1728             pollfds[i].fd);
1729         continue;
1730       }
1731
1732       client_sd = (int *) malloc (sizeof (int));
1733       if (client_sd == NULL)
1734       {
1735         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1736         continue;
1737       }
1738
1739       client_sa_size = sizeof (client_sa);
1740       *client_sd = accept (pollfds[i].fd,
1741           (struct sockaddr *) &client_sa, &client_sa_size);
1742       if (*client_sd < 0)
1743       {
1744         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1745         continue;
1746       }
1747
1748       pthread_attr_init (&attr);
1749       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1750
1751       status = pthread_create (&tid, &attr, connection_thread_main,
1752           /* args = */ (void *) client_sd);
1753       if (status != 0)
1754       {
1755         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1756         close (*client_sd);
1757         free (client_sd);
1758         continue;
1759       }
1760     } /* for (pollfds_num) */
1761   } /* while (do_shutdown == 0) */
1762
1763   RRDD_LOG(LOG_INFO, "starting shutdown");
1764
1765   close_listen_sockets ();
1766
1767   pthread_mutex_lock (&connection_threads_lock);
1768   while (connection_threads_num > 0)
1769   {
1770     pthread_t wait_for;
1771
1772     wait_for = connection_threads[0];
1773
1774     pthread_mutex_unlock (&connection_threads_lock);
1775     pthread_join (wait_for, /* retval = */ NULL);
1776     pthread_mutex_lock (&connection_threads_lock);
1777   }
1778   pthread_mutex_unlock (&connection_threads_lock);
1779
1780   return (NULL);
1781 } /* }}} void *listen_thread_main */
1782
1783 static int daemonize (void) /* {{{ */
1784 {
1785   int status;
1786
1787   /* These structures are static, because `sigaction' behaves weird if the are
1788    * overwritten.. */
1789   static struct sigaction sa_int;
1790   static struct sigaction sa_term;
1791   static struct sigaction sa_pipe;
1792
1793   if (!stay_foreground)
1794   {
1795     pid_t child;
1796     char *base_dir;
1797
1798     child = fork ();
1799     if (child < 0)
1800     {
1801       fprintf (stderr, "daemonize: fork(2) failed.\n");
1802       return (-1);
1803     }
1804     else if (child > 0)
1805     {
1806       return (1);
1807     }
1808
1809     /* Change into the /tmp directory. */
1810     base_dir = (config_base_dir != NULL)
1811       ? config_base_dir
1812       : "/tmp";
1813     status = chdir (base_dir);
1814     if (status != 0)
1815     {
1816       fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1817       return (-1);
1818     }
1819
1820     /* Become session leader */
1821     setsid ();
1822
1823     /* Open the first three file descriptors to /dev/null */
1824     close (2);
1825     close (1);
1826     close (0);
1827
1828     open ("/dev/null", O_RDWR);
1829     dup (0);
1830     dup (0);
1831   } /* if (!stay_foreground) */
1832
1833   /* Install signal handlers */
1834   memset (&sa_int, 0, sizeof (sa_int));
1835   sa_int.sa_handler = sig_int_handler;
1836   sigaction (SIGINT, &sa_int, NULL);
1837
1838   memset (&sa_term, 0, sizeof (sa_term));
1839   sa_term.sa_handler = sig_term_handler;
1840   sigaction (SIGTERM, &sa_term, NULL);
1841
1842   memset (&sa_pipe, 0, sizeof (sa_pipe));
1843   sa_pipe.sa_handler = SIG_IGN;
1844   sigaction (SIGPIPE, &sa_pipe, NULL);
1845
1846   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1847   RRDD_LOG(LOG_INFO, "starting up");
1848
1849   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1850   if (cache_tree == NULL)
1851   {
1852     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1853     return (-1);
1854   }
1855
1856   status = write_pidfile ();
1857   return status;
1858 } /* }}} int daemonize */
1859
1860 static int cleanup (void) /* {{{ */
1861 {
1862   do_shutdown++;
1863
1864   pthread_cond_signal (&cache_cond);
1865   pthread_join (queue_thread, /* return = */ NULL);
1866
1867   remove_pidfile ();
1868
1869   RRDD_LOG(LOG_INFO, "goodbye");
1870   closelog ();
1871
1872   return (0);
1873 } /* }}} int cleanup */
1874
1875 static int read_options (int argc, char **argv) /* {{{ */
1876 {
1877   int option;
1878   int status = 0;
1879
1880   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1881   {
1882     switch (option)
1883     {
1884       case 'g':
1885         stay_foreground=1;
1886         break;
1887
1888       case 'l':
1889       {
1890         char **temp;
1891
1892         temp = (char **) realloc (config_listen_address_list,
1893             sizeof (char *) * (config_listen_address_list_len + 1));
1894         if (temp == NULL)
1895         {
1896           fprintf (stderr, "read_options: realloc failed.\n");
1897           return (2);
1898         }
1899         config_listen_address_list = temp;
1900
1901         temp[config_listen_address_list_len] = strdup (optarg);
1902         if (temp[config_listen_address_list_len] == NULL)
1903         {
1904           fprintf (stderr, "read_options: strdup failed.\n");
1905           return (2);
1906         }
1907         config_listen_address_list_len++;
1908       }
1909       break;
1910
1911       case 'f':
1912       {
1913         int temp;
1914
1915         temp = atoi (optarg);
1916         if (temp > 0)
1917           config_flush_interval = temp;
1918         else
1919         {
1920           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1921           status = 3;
1922         }
1923       }
1924       break;
1925
1926       case 'w':
1927       {
1928         int temp;
1929
1930         temp = atoi (optarg);
1931         if (temp > 0)
1932           config_write_interval = temp;
1933         else
1934         {
1935           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1936           status = 2;
1937         }
1938       }
1939       break;
1940
1941       case 'z':
1942       {
1943         int temp;
1944
1945         temp = atoi(optarg);
1946         if (temp > 0)
1947           config_write_jitter = temp;
1948         else
1949         {
1950           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1951           status = 2;
1952         }
1953
1954         break;
1955       }
1956
1957       case 'b':
1958       {
1959         size_t len;
1960
1961         if (config_base_dir != NULL)
1962           free (config_base_dir);
1963         config_base_dir = strdup (optarg);
1964         if (config_base_dir == NULL)
1965         {
1966           fprintf (stderr, "read_options: strdup failed.\n");
1967           return (3);
1968         }
1969
1970         len = strlen (config_base_dir);
1971         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1972         {
1973           config_base_dir[len - 1] = 0;
1974           len--;
1975         }
1976
1977         if (len < 1)
1978         {
1979           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1980           return (4);
1981         }
1982       }
1983       break;
1984
1985       case 'p':
1986       {
1987         if (config_pid_file != NULL)
1988           free (config_pid_file);
1989         config_pid_file = strdup (optarg);
1990         if (config_pid_file == NULL)
1991         {
1992           fprintf (stderr, "read_options: strdup failed.\n");
1993           return (3);
1994         }
1995       }
1996       break;
1997
1998       case 'j':
1999       {
2000         struct stat statbuf;
2001         const char *dir = optarg;
2002
2003         status = stat(dir, &statbuf);
2004         if (status != 0)
2005         {
2006           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2007           return 6;
2008         }
2009
2010         if (!S_ISDIR(statbuf.st_mode)
2011             || access(dir, R_OK|W_OK|X_OK) != 0)
2012         {
2013           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2014                   errno ? rrd_strerror(errno) : "");
2015           return 6;
2016         }
2017
2018         journal_cur = malloc(PATH_MAX + 1);
2019         journal_old = malloc(PATH_MAX + 1);
2020         if (journal_cur == NULL || journal_old == NULL)
2021         {
2022           fprintf(stderr, "malloc failure for journal files\n");
2023           return 6;
2024         }
2025         else 
2026         {
2027           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2028           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2029         }
2030       }
2031       break;
2032
2033       case 'h':
2034       case '?':
2035         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
2036             "\n"
2037             "Usage: rrdcached [options]\n"
2038             "\n"
2039             "Valid options are:\n"
2040             "  -l <address>  Socket address to listen to.\n"
2041             "  -w <seconds>  Interval in which to write data.\n"
2042             "  -z <delay>    Delay writes up to <delay> seconds to spread load" \
2043             "  -f <seconds>  Interval in which to flush dead data.\n"
2044             "  -p <file>     Location of the PID-file.\n"
2045             "  -b <dir>      Base directory to change to.\n"
2046             "\n"
2047             "For more information and a detailed description of all options "
2048             "please refer\n"
2049             "to the rrdcached(1) manual page.\n",
2050             VERSION);
2051         status = -1;
2052         break;
2053     } /* switch (option) */
2054   } /* while (getopt) */
2055
2056   /* advise the user when values are not sane */
2057   if (config_flush_interval < 2 * config_write_interval)
2058     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2059             " 2x write interval (-w) !\n");
2060   if (config_write_jitter > config_write_interval)
2061     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2062             " write interval (-w) !\n");
2063
2064   return (status);
2065 } /* }}} int read_options */
2066
2067 int main (int argc, char **argv)
2068 {
2069   int status;
2070
2071   status = read_options (argc, argv);
2072   if (status != 0)
2073   {
2074     if (status < 0)
2075       status = 0;
2076     return (status);
2077   }
2078
2079   status = daemonize ();
2080   if (status == 1)
2081   {
2082     struct sigaction sigchld;
2083
2084     memset (&sigchld, 0, sizeof (sigchld));
2085     sigchld.sa_handler = SIG_IGN;
2086     sigaction (SIGCHLD, &sigchld, NULL);
2087
2088     return (0);
2089   }
2090   else if (status != 0)
2091   {
2092     fprintf (stderr, "daemonize failed, exiting.\n");
2093     return (1);
2094   }
2095
2096   if (journal_cur != NULL)
2097   {
2098     int had_journal = 0;
2099
2100     pthread_mutex_lock(&journal_lock);
2101
2102     RRDD_LOG(LOG_INFO, "checking for journal files");
2103
2104     had_journal += journal_replay(journal_old);
2105     had_journal += journal_replay(journal_cur);
2106
2107     if (had_journal)
2108       flush_old_values(-1);
2109
2110     pthread_mutex_unlock(&journal_lock);
2111     journal_rotate();
2112
2113     RRDD_LOG(LOG_INFO, "journal processing complete");
2114   }
2115
2116   /* start the queue thread */
2117   memset (&queue_thread, 0, sizeof (queue_thread));
2118   status = pthread_create (&queue_thread,
2119                            NULL, /* attr */
2120                            queue_thread_main,
2121                            NULL); /* args */
2122   if (status != 0)
2123   {
2124     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2125     cleanup();
2126     return (1);
2127   }
2128
2129   listen_thread_main (NULL);
2130   cleanup ();
2131
2132   return (0);
2133 } /* int main */
2134
2135 /*
2136  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2137  */