src/rrd_daemon.c: Fix vim `folding'.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  *   kevin brintnall <kbrint@rufus.net>
21  **/
22
23 #ifndef _REENTRANT
24 # define _REENTRANT
25 #endif
26
27 #ifndef _THREAD_SAFE
28 # define _THREAD_SAFE
29 #endif
30
31 /* }}} */
32
33 /*
34  * Now for some includes..
35  */
36 #include "rrd.h" /* {{{ */
37 #include "rrd_client.h"
38
39 #include <stdlib.h>
40 #include <stdint.h>
41 #include <stdio.h>
42 #include <unistd.h>
43 #include <string.h>
44 #include <strings.h>
45 #include <stdint.h>
46 #include <inttypes.h>
47
48 #include <sys/types.h>
49 #include <sys/stat.h>
50 #include <fcntl.h>
51 #include <signal.h>
52 #include <sys/socket.h>
53 #include <sys/un.h>
54 #include <netdb.h>
55 #include <poll.h>
56 #include <syslog.h>
57 #include <pthread.h>
58 #include <errno.h>
59 #include <assert.h>
60 #include <sys/time.h>
61 #include <time.h>
62
63 #include <glib-2.0/glib.h>
64 /* }}} */
65
66 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
67
68 #ifndef __GNUC__
69 # define __attribute__(x) /**/
70 #endif
71
72 /*
73  * Types
74  */
75 struct listen_socket_s
76 {
77   int fd;
78   char path[PATH_MAX + 1];
79 };
80 typedef struct listen_socket_s listen_socket_t;
81
82 struct cache_item_s;
83 typedef struct cache_item_s cache_item_t;
84 struct cache_item_s
85 {
86   char *file;
87   char **values;
88   int values_num;
89   time_t last_flush_time;
90 #define CI_FLAGS_IN_TREE  (1<<0)
91 #define CI_FLAGS_IN_QUEUE (1<<1)
92   int flags;
93
94   cache_item_t *next;
95 };
96
97 struct callback_flush_data_s
98 {
99   time_t now;
100   time_t abs_timeout;
101   char **keys;
102   size_t keys_num;
103 };
104 typedef struct callback_flush_data_s callback_flush_data_t;
105
106 enum queue_side_e
107 {
108   HEAD,
109   TAIL
110 };
111 typedef enum queue_side_e queue_side_t;
112
113 /* max length of socket command or response */
114 #define CMD_MAX 4096
115
116 /*
117  * Variables
118  */
119 static int stay_foreground = 0;
120
121 static listen_socket_t *listen_fds = NULL;
122 static size_t listen_fds_num = 0;
123
124 static int do_shutdown = 0;
125
126 static pthread_t queue_thread;
127
128 static pthread_t *connection_threads = NULL;
129 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
130 static int connection_threads_num = 0;
131
132 /* Cache stuff */
133 static GTree          *cache_tree = NULL;
134 static cache_item_t   *cache_queue_head = NULL;
135 static cache_item_t   *cache_queue_tail = NULL;
136 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
137 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
138
139 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
140
141 static int config_write_interval = 300;
142 static int config_write_jitter   = 0;
143 static int config_flush_interval = 3600;
144 static char *config_pid_file = NULL;
145 static char *config_base_dir = NULL;
146
147 static char **config_listen_address_list = NULL;
148 static int config_listen_address_list_len = 0;
149
150 static uint64_t stats_queue_length = 0;
151 static uint64_t stats_updates_received = 0;
152 static uint64_t stats_flush_received = 0;
153 static uint64_t stats_updates_written = 0;
154 static uint64_t stats_data_sets_written = 0;
155 static uint64_t stats_journal_bytes = 0;
156 static uint64_t stats_journal_rotate = 0;
157 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
158
159 /* Journaled updates */
160 static char *journal_cur = NULL;
161 static char *journal_old = NULL;
162 static FILE *journal_fh = NULL;
163 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
164 static int journal_write(char *cmd, char *args);
165 static void journal_done(void);
166 static void journal_rotate(void);
167
168 /* 
169  * Functions
170  */
171 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
172 {
173   RRDD_LOG(LOG_NOTICE, "caught SIGINT");
174   do_shutdown++;
175   pthread_cond_broadcast(&cache_cond);
176 } /* }}} void sig_int_handler */
177
178 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
179 {
180   RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
181   do_shutdown++;
182   pthread_cond_broadcast(&cache_cond);
183 } /* }}} void sig_term_handler */
184
185 static int write_pidfile (void) /* {{{ */
186 {
187   pid_t pid;
188   char *file;
189   int fd;
190   FILE *fh;
191
192   pid = getpid ();
193   
194   file = (config_pid_file != NULL)
195     ? config_pid_file
196     : LOCALSTATEDIR "/run/rrdcached.pid";
197
198   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
199   if (fd < 0)
200   {
201     RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
202              file, rrd_strerror(errno));
203     return (-1);
204   }
205
206   fh = fdopen (fd, "w");
207   if (fh == NULL)
208   {
209     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
210     close(fd);
211     return (-1);
212   }
213
214   fprintf (fh, "%i\n", (int) pid);
215   fclose (fh);
216
217   return (0);
218 } /* }}} int write_pidfile */
219
220 static int remove_pidfile (void) /* {{{ */
221 {
222   char *file;
223   int status;
224
225   file = (config_pid_file != NULL)
226     ? config_pid_file
227     : LOCALSTATEDIR "/run/rrdcached.pid";
228
229   status = unlink (file);
230   if (status == 0)
231     return (0);
232   return (errno);
233 } /* }}} int remove_pidfile */
234
235 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
236 {
237   char    *buffer;
238   size_t   buffer_used;
239   size_t   buffer_free;
240   ssize_t  status;
241
242   buffer       = (char *) buffer_void;
243   buffer_used  = 0;
244   buffer_free  = buffer_size;
245
246   while (buffer_free > 0)
247   {
248     status = read (fd, buffer + buffer_used, buffer_free);
249     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
250       continue;
251
252     if (status < 0)
253       return (-1);
254
255     if (status == 0)
256       return (0);
257
258     assert ((0 > status) || (buffer_free >= (size_t) status));
259
260     buffer_free = buffer_free - status;
261     buffer_used = buffer_used + status;
262
263     if (buffer[buffer_used - 1] == '\n')
264       break;
265   }
266
267   assert (buffer_used > 0);
268
269   if (buffer[buffer_used - 1] != '\n')
270   {
271     errno = ENOBUFS;
272     return (-1);
273   }
274
275   buffer[buffer_used - 1] = 0;
276
277   /* Fix network line endings. */
278   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
279   {
280     buffer_used--;
281     buffer[buffer_used - 1] = 0;
282   }
283
284   return (buffer_used);
285 } /* }}} ssize_t sread */
286
287 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
288 {
289   const char *ptr;
290   size_t      nleft;
291   ssize_t     status;
292
293   /* special case for journal replay */
294   if (fd < 0) return 0;
295
296   ptr   = (const char *) buf;
297   nleft = count;
298
299   while (nleft > 0)
300   {
301     status = write (fd, (const void *) ptr, nleft);
302
303     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
304       continue;
305
306     if (status < 0)
307       return (status);
308
309     nleft -= status;
310     ptr   += status;
311   }
312
313   return (0);
314 } /* }}} ssize_t swrite */
315
316 static void _wipe_ci_values(cache_item_t *ci, time_t when)
317 {
318   ci->values = NULL;
319   ci->values_num = 0;
320
321   ci->last_flush_time = when;
322   if (config_write_jitter > 0)
323     ci->last_flush_time += (random() % config_write_jitter);
324
325   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
326 }
327
328 /*
329  * enqueue_cache_item:
330  * `cache_lock' must be acquired before calling this function!
331  */
332 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
333     queue_side_t side)
334 {
335   int did_insert = 0;
336
337   if (ci == NULL)
338     return (-1);
339
340   if (ci->values_num == 0)
341     return (0);
342
343   if (side == HEAD)
344   {
345     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
346     {
347       assert (ci->next == NULL);
348       ci->next = cache_queue_head;
349       cache_queue_head = ci;
350
351       if (cache_queue_tail == NULL)
352         cache_queue_tail = cache_queue_head;
353
354       did_insert = 1;
355     }
356     else if (cache_queue_head == ci)
357     {
358       /* do nothing */
359     }
360     else /* enqueued, but not first entry */
361     {
362       cache_item_t *prev;
363
364       /* find previous entry */
365       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
366         if (prev->next == ci)
367           break;
368       assert (prev != NULL);
369
370       /* move to the front */
371       prev->next = ci->next;
372       ci->next = cache_queue_head;
373       cache_queue_head = ci;
374
375       /* check if we need to adapt the tail */
376       if (cache_queue_tail == ci)
377         cache_queue_tail = prev;
378     }
379   }
380   else /* (side == TAIL) */
381   {
382     /* We don't move values back in the list.. */
383     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
384       return (0);
385
386     assert (ci->next == NULL);
387
388     if (cache_queue_tail == NULL)
389       cache_queue_head = ci;
390     else
391       cache_queue_tail->next = ci;
392     cache_queue_tail = ci;
393
394     did_insert = 1;
395   }
396
397   ci->flags |= CI_FLAGS_IN_QUEUE;
398
399   if (did_insert)
400   {
401     pthread_mutex_lock (&stats_lock);
402     stats_queue_length++;
403     pthread_mutex_unlock (&stats_lock);
404   }
405
406   return (0);
407 } /* }}} int enqueue_cache_item */
408
409 /*
410  * tree_callback_flush:
411  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
412  * while this is in progress.
413  */
414 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
415     gpointer data)
416 {
417   cache_item_t *ci;
418   callback_flush_data_t *cfd;
419
420   ci = (cache_item_t *) value;
421   cfd = (callback_flush_data_t *) data;
422
423   if ((ci->last_flush_time <= cfd->abs_timeout)
424       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
425       && (ci->values_num > 0))
426   {
427     enqueue_cache_item (ci, TAIL);
428   }
429   else if ((do_shutdown != 0)
430       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
431       && (ci->values_num > 0))
432   {
433     enqueue_cache_item (ci, TAIL);
434   }
435   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
436       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
437       && (ci->values_num <= 0))
438   {
439     char **temp;
440
441     temp = (char **) realloc (cfd->keys,
442         sizeof (char *) * (cfd->keys_num + 1));
443     if (temp == NULL)
444     {
445       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
446       return (FALSE);
447     }
448     cfd->keys = temp;
449     /* Make really sure this points to the _same_ place */
450     assert ((char *) key == ci->file);
451     cfd->keys[cfd->keys_num] = (char *) key;
452     cfd->keys_num++;
453   }
454
455   return (FALSE);
456 } /* }}} gboolean tree_callback_flush */
457
458 static int flush_old_values (int max_age)
459 {
460   callback_flush_data_t cfd;
461   size_t k;
462
463   memset (&cfd, 0, sizeof (cfd));
464   /* Pass the current time as user data so that we don't need to call
465    * `time' for each node. */
466   cfd.now = time (NULL);
467   cfd.keys = NULL;
468   cfd.keys_num = 0;
469
470   if (max_age > 0)
471     cfd.abs_timeout = cfd.now - max_age;
472   else
473     cfd.abs_timeout = cfd.now + 1;
474
475   /* `tree_callback_flush' will return the keys of all values that haven't
476    * been touched in the last `config_flush_interval' seconds in `cfd'.
477    * The char*'s in this array point to the same memory as ci->file, so we
478    * don't need to free them separately. */
479   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
480
481   for (k = 0; k < cfd.keys_num; k++)
482   {
483     cache_item_t *ci;
484
485     /* This must not fail. */
486     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
487     assert (ci != NULL);
488
489     /* If we end up here with values available, something's seriously
490      * messed up. */
491     assert (ci->values_num == 0);
492
493     /* Remove the node from the tree */
494     g_tree_remove (cache_tree, cfd.keys[k]);
495     cfd.keys[k] = NULL;
496
497     /* Now free and clean up `ci'. */
498     free (ci->file);
499     ci->file = NULL;
500     free (ci);
501     ci = NULL;
502   } /* for (k = 0; k < cfd.keys_num; k++) */
503
504   if (cfd.keys != NULL)
505   {
506     free (cfd.keys);
507     cfd.keys = NULL;
508   }
509
510   return (0);
511 } /* int flush_old_values */
512
513 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
514 {
515   struct timeval now;
516   struct timespec next_flush;
517
518   gettimeofday (&now, NULL);
519   next_flush.tv_sec = now.tv_sec + config_flush_interval;
520   next_flush.tv_nsec = 1000 * now.tv_usec;
521
522   pthread_mutex_lock (&cache_lock);
523   while ((do_shutdown == 0) || (cache_queue_head != NULL))
524   {
525     cache_item_t *ci;
526     char *file;
527     char **values;
528     int values_num;
529     int status;
530     int i;
531
532     /* First, check if it's time to do the cache flush. */
533     gettimeofday (&now, NULL);
534     if ((now.tv_sec > next_flush.tv_sec)
535         || ((now.tv_sec == next_flush.tv_sec)
536           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
537     {
538       /* Flush all values that haven't been written in the last
539        * `config_write_interval' seconds. */
540       flush_old_values (config_write_interval);
541
542       /* Determine the time of the next cache flush. */
543       while (next_flush.tv_sec <= now.tv_sec)
544         next_flush.tv_sec += config_flush_interval;
545
546       /* unlock the cache while we rotate so we don't block incoming
547        * updates if the fsync() blocks on disk I/O */
548       pthread_mutex_unlock(&cache_lock);
549       journal_rotate();
550       pthread_mutex_lock(&cache_lock);
551     }
552
553     /* Now, check if there's something to store away. If not, wait until
554      * something comes in or it's time to do the cache flush. */
555     if (cache_queue_head == NULL)
556     {
557       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
558       if ((status != 0) && (status != ETIMEDOUT))
559       {
560         RRDD_LOG (LOG_ERR, "queue_thread_main: "
561             "pthread_cond_timedwait returned %i.", status);
562       }
563     }
564
565     /* We're about to shut down, so lets flush the entire tree. */
566     if ((do_shutdown != 0) && (cache_queue_head == NULL))
567       flush_old_values (/* max age = */ -1);
568
569     /* Check if a value has arrived. This may be NULL if we timed out or there
570      * was an interrupt such as a signal. */
571     if (cache_queue_head == NULL)
572       continue;
573
574     ci = cache_queue_head;
575
576     /* copy the relevant parts */
577     file = strdup (ci->file);
578     if (file == NULL)
579     {
580       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
581       continue;
582     }
583
584     assert(ci->values != NULL);
585     assert(ci->values_num > 0);
586
587     values = ci->values;
588     values_num = ci->values_num;
589
590     _wipe_ci_values(ci, time(NULL));
591
592     cache_queue_head = ci->next;
593     if (cache_queue_head == NULL)
594       cache_queue_tail = NULL;
595     ci->next = NULL;
596
597     pthread_mutex_lock (&stats_lock);
598     assert (stats_queue_length > 0);
599     stats_queue_length--;
600     pthread_mutex_unlock (&stats_lock);
601
602     pthread_mutex_unlock (&cache_lock);
603
604     rrd_clear_error ();
605     status = rrd_update_r (file, NULL, values_num, (void *) values);
606     if (status != 0)
607     {
608       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
609           "rrd_update_r (%s) failed with status %i. (%s)",
610           file, status, rrd_get_error());
611     }
612
613     journal_write("wrote", file);
614
615     for (i = 0; i < values_num; i++)
616       free (values[i]);
617
618     free(values);
619     free(file);
620
621     if (status == 0)
622     {
623       pthread_mutex_lock (&stats_lock);
624       stats_updates_written++;
625       stats_data_sets_written += values_num;
626       pthread_mutex_unlock (&stats_lock);
627     }
628
629     pthread_mutex_lock (&cache_lock);
630     pthread_cond_broadcast (&flush_cond);
631
632     /* We're about to shut down, so lets flush the entire tree. */
633     if ((do_shutdown != 0) && (cache_queue_head == NULL))
634       flush_old_values (/* max age = */ -1);
635   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
636   pthread_mutex_unlock (&cache_lock);
637
638   assert(cache_queue_head == NULL);
639   RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
640   journal_done();
641
642   return (NULL);
643 } /* }}} void *queue_thread_main */
644
645 static int buffer_get_field (char **buffer_ret, /* {{{ */
646     size_t *buffer_size_ret, char **field_ret)
647 {
648   char *buffer;
649   size_t buffer_pos;
650   size_t buffer_size;
651   char *field;
652   size_t field_size;
653   int status;
654
655   buffer = *buffer_ret;
656   buffer_pos = 0;
657   buffer_size = *buffer_size_ret;
658   field = *buffer_ret;
659   field_size = 0;
660
661   if (buffer_size <= 0)
662     return (-1);
663
664   /* This is ensured by `handle_request'. */
665   assert (buffer[buffer_size - 1] == '\0');
666
667   status = -1;
668   while (buffer_pos < buffer_size)
669   {
670     /* Check for end-of-field or end-of-buffer */
671     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
672     {
673       field[field_size] = 0;
674       field_size++;
675       buffer_pos++;
676       status = 0;
677       break;
678     }
679     /* Handle escaped characters. */
680     else if (buffer[buffer_pos] == '\\')
681     {
682       if (buffer_pos >= (buffer_size - 1))
683         break;
684       buffer_pos++;
685       field[field_size] = buffer[buffer_pos];
686       field_size++;
687       buffer_pos++;
688     }
689     /* Normal operation */ 
690     else
691     {
692       field[field_size] = buffer[buffer_pos];
693       field_size++;
694       buffer_pos++;
695     }
696   } /* while (buffer_pos < buffer_size) */
697
698   if (status != 0)
699     return (status);
700
701   *buffer_ret = buffer + buffer_pos;
702   *buffer_size_ret = buffer_size - buffer_pos;
703   *field_ret = field;
704
705   return (0);
706 } /* }}} int buffer_get_field */
707
708 static int flush_file (const char *filename) /* {{{ */
709 {
710   cache_item_t *ci;
711
712   pthread_mutex_lock (&cache_lock);
713
714   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
715   if (ci == NULL)
716   {
717     pthread_mutex_unlock (&cache_lock);
718     return (ENOENT);
719   }
720
721   /* Enqueue at head */
722   enqueue_cache_item (ci, HEAD);
723   pthread_cond_signal (&cache_cond);
724
725   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
726   {
727     ci = NULL;
728
729     pthread_cond_wait (&flush_cond, &cache_lock);
730
731     ci = g_tree_lookup (cache_tree, filename);
732     if (ci == NULL)
733     {
734       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
735           "while waiting for flush.");
736       pthread_mutex_unlock (&cache_lock);
737       return (-1);
738     }
739   }
740
741   pthread_mutex_unlock (&cache_lock);
742   return (0);
743 } /* }}} int flush_file */
744
745 static int handle_request_help (int fd, /* {{{ */
746     char *buffer, size_t buffer_size)
747 {
748   int status;
749   char **help_text;
750   size_t help_text_len;
751   char *command;
752   size_t i;
753
754   char *help_help[] =
755   {
756     "4 Command overview\n",
757     "FLUSH <filename>\n",
758     "HELP [<command>]\n",
759     "UPDATE <filename> <values> [<values> ...]\n",
760     "STATS\n"
761   };
762   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
763
764   char *help_flush[] =
765   {
766     "4 Help for FLUSH\n",
767     "Usage: FLUSH <filename>\n",
768     "\n",
769     "Adds the given filename to the head of the update queue and returns\n",
770     "after is has been dequeued.\n"
771   };
772   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
773
774   char *help_update[] =
775   {
776     "9 Help for UPDATE\n",
777     "Usage: UPDATE <filename> <values> [<values> ...]\n"
778     "\n",
779     "Adds the given file to the internal cache if it is not yet known and\n",
780     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
781     "for details.\n",
782     "\n",
783     "Each <values> has the following form:\n",
784     "  <values> = <time>:<value>[:<value>[...]]\n",
785     "See the rrdupdate(1) manpage for details.\n"
786   };
787   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
788
789   char *help_stats[] =
790   {
791     "4 Help for STATS\n",
792     "Usage: STATS\n",
793     "\n",
794     "Returns some performance counters, see the rrdcached(1) manpage for\n",
795     "a description of the values.\n"
796   };
797   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
798
799   status = buffer_get_field (&buffer, &buffer_size, &command);
800   if (status != 0)
801   {
802     help_text = help_help;
803     help_text_len = help_help_len;
804   }
805   else
806   {
807     if (strcasecmp (command, "update") == 0)
808     {
809       help_text = help_update;
810       help_text_len = help_update_len;
811     }
812     else if (strcasecmp (command, "flush") == 0)
813     {
814       help_text = help_flush;
815       help_text_len = help_flush_len;
816     }
817     else if (strcasecmp (command, "stats") == 0)
818     {
819       help_text = help_stats;
820       help_text_len = help_stats_len;
821     }
822     else
823     {
824       help_text = help_help;
825       help_text_len = help_help_len;
826     }
827   }
828
829   for (i = 0; i < help_text_len; i++)
830   {
831     status = swrite (fd, help_text[i], strlen (help_text[i]));
832     if (status < 0)
833     {
834       status = errno;
835       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
836       return (status);
837     }
838   }
839
840   return (0);
841 } /* }}} int handle_request_help */
842
843 static int handle_request_stats (int fd, /* {{{ */
844     char *buffer __attribute__((unused)),
845     size_t buffer_size __attribute__((unused)))
846 {
847   int status;
848   char outbuf[CMD_MAX];
849
850   uint64_t copy_queue_length;
851   uint64_t copy_updates_received;
852   uint64_t copy_flush_received;
853   uint64_t copy_updates_written;
854   uint64_t copy_data_sets_written;
855   uint64_t copy_journal_bytes;
856   uint64_t copy_journal_rotate;
857
858   uint64_t tree_nodes_number;
859   uint64_t tree_depth;
860
861   pthread_mutex_lock (&stats_lock);
862   copy_queue_length       = stats_queue_length;
863   copy_updates_received   = stats_updates_received;
864   copy_flush_received     = stats_flush_received;
865   copy_updates_written    = stats_updates_written;
866   copy_data_sets_written  = stats_data_sets_written;
867   copy_journal_bytes      = stats_journal_bytes;
868   copy_journal_rotate     = stats_journal_rotate;
869   pthread_mutex_unlock (&stats_lock);
870
871   pthread_mutex_lock (&cache_lock);
872   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
873   tree_depth        = (uint64_t) g_tree_height (cache_tree);
874   pthread_mutex_unlock (&cache_lock);
875
876 #define RRDD_STATS_SEND \
877   outbuf[sizeof (outbuf) - 1] = 0; \
878   status = swrite (fd, outbuf, strlen (outbuf)); \
879   if (status < 0) \
880   { \
881     status = errno; \
882     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
883     return (status); \
884   }
885
886   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
887   RRDD_STATS_SEND;
888
889   snprintf (outbuf, sizeof (outbuf),
890       "QueueLength: %"PRIu64"\n", copy_queue_length);
891   RRDD_STATS_SEND;
892
893   snprintf (outbuf, sizeof (outbuf),
894       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
895   RRDD_STATS_SEND;
896
897   snprintf (outbuf, sizeof (outbuf),
898       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
899   RRDD_STATS_SEND;
900
901   snprintf (outbuf, sizeof (outbuf),
902       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
903   RRDD_STATS_SEND;
904
905   snprintf (outbuf, sizeof (outbuf),
906       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
907   RRDD_STATS_SEND;
908
909   snprintf (outbuf, sizeof (outbuf),
910       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
911   RRDD_STATS_SEND;
912
913   snprintf (outbuf, sizeof (outbuf),
914       "TreeDepth: %"PRIu64"\n", tree_depth);
915   RRDD_STATS_SEND;
916
917   snprintf (outbuf, sizeof(outbuf),
918       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
919   RRDD_STATS_SEND;
920
921   snprintf (outbuf, sizeof(outbuf),
922       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
923   RRDD_STATS_SEND;
924
925   return (0);
926 #undef RRDD_STATS_SEND
927 } /* }}} int handle_request_stats */
928
929 static int handle_request_flush (int fd, /* {{{ */
930     char *buffer, size_t buffer_size)
931 {
932   char *file;
933   int status;
934   char result[CMD_MAX];
935
936   status = buffer_get_field (&buffer, &buffer_size, &file);
937   if (status != 0)
938   {
939     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
940   }
941   else
942   {
943     pthread_mutex_lock(&stats_lock);
944     stats_flush_received++;
945     pthread_mutex_unlock(&stats_lock);
946
947     status = flush_file (file);
948     if (status == 0)
949       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
950     else if (status == ENOENT)
951     {
952       /* no file in our tree; see whether it exists at all */
953       struct stat statbuf;
954
955       memset(&statbuf, 0, sizeof(statbuf));
956       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
957         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
958       else
959         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
960     }
961     else if (status < 0)
962       strncpy (result, "-1 Internal error.\n", sizeof (result));
963     else
964       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
965   }
966   result[sizeof (result) - 1] = 0;
967
968   status = swrite (fd, result, strlen (result));
969   if (status < 0)
970   {
971     status = errno;
972     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
973     return (status);
974   }
975
976   return (0);
977 } /* }}} int handle_request_flush */
978
979 static int handle_request_update (int fd, /* {{{ */
980     char *buffer, size_t buffer_size)
981 {
982   char *file;
983   int values_num = 0;
984   int status;
985
986   time_t now;
987
988   cache_item_t *ci;
989   char answer[CMD_MAX];
990
991 #define RRDD_UPDATE_SEND \
992   answer[sizeof (answer) - 1] = 0; \
993   status = swrite (fd, answer, strlen (answer)); \
994   if (status < 0) \
995   { \
996     status = errno; \
997     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
998     return (status); \
999   }
1000
1001   now = time (NULL);
1002
1003   status = buffer_get_field (&buffer, &buffer_size, &file);
1004   if (status != 0)
1005   {
1006     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1007         sizeof (answer));
1008     RRDD_UPDATE_SEND;
1009     return (0);
1010   }
1011
1012   pthread_mutex_lock(&stats_lock);
1013   stats_updates_received++;
1014   pthread_mutex_unlock(&stats_lock);
1015
1016   pthread_mutex_lock (&cache_lock);
1017
1018   ci = g_tree_lookup (cache_tree, file);
1019   if (ci == NULL) /* {{{ */
1020   {
1021     struct stat statbuf;
1022
1023     memset (&statbuf, 0, sizeof (statbuf));
1024     status = stat (file, &statbuf);
1025     if (status != 0)
1026     {
1027       pthread_mutex_unlock (&cache_lock);
1028       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1029
1030       status = errno;
1031       if (status == ENOENT)
1032         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1033       else
1034         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1035             status);
1036       RRDD_UPDATE_SEND;
1037       return (0);
1038     }
1039     if (!S_ISREG (statbuf.st_mode))
1040     {
1041       pthread_mutex_unlock (&cache_lock);
1042
1043       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1044       RRDD_UPDATE_SEND;
1045       return (0);
1046     }
1047     if (access(file, R_OK|W_OK) != 0)
1048     {
1049       pthread_mutex_unlock (&cache_lock);
1050
1051       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1052                 file, rrd_strerror(errno));
1053       RRDD_UPDATE_SEND;
1054       return (0);
1055     }
1056
1057     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1058     if (ci == NULL)
1059     {
1060       pthread_mutex_unlock (&cache_lock);
1061       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1062
1063       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1064       RRDD_UPDATE_SEND;
1065       return (0);
1066     }
1067     memset (ci, 0, sizeof (cache_item_t));
1068
1069     ci->file = strdup (file);
1070     if (ci->file == NULL)
1071     {
1072       pthread_mutex_unlock (&cache_lock);
1073       free (ci);
1074       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1075
1076       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1077       RRDD_UPDATE_SEND;
1078       return (0);
1079     }
1080
1081     _wipe_ci_values(ci, now);
1082     ci->flags = CI_FLAGS_IN_TREE;
1083
1084     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1085   } /* }}} */
1086   assert (ci != NULL);
1087
1088   while (buffer_size > 0)
1089   {
1090     char **temp;
1091     char *value;
1092
1093     status = buffer_get_field (&buffer, &buffer_size, &value);
1094     if (status != 0)
1095     {
1096       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1097       break;
1098     }
1099
1100     temp = (char **) realloc (ci->values,
1101         sizeof (char *) * (ci->values_num + 1));
1102     if (temp == NULL)
1103     {
1104       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1105       continue;
1106     }
1107     ci->values = temp;
1108
1109     ci->values[ci->values_num] = strdup (value);
1110     if (ci->values[ci->values_num] == NULL)
1111     {
1112       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1113       continue;
1114     }
1115     ci->values_num++;
1116
1117     values_num++;
1118   }
1119
1120   if (((now - ci->last_flush_time) >= config_write_interval)
1121       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1122       && (ci->values_num > 0))
1123   {
1124     enqueue_cache_item (ci, TAIL);
1125     pthread_cond_signal (&cache_cond);
1126   }
1127
1128   pthread_mutex_unlock (&cache_lock);
1129
1130   if (values_num < 1)
1131   {
1132     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1133   }
1134   else
1135   {
1136     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1137         (values_num == 1) ? "" : "s");
1138   }
1139   RRDD_UPDATE_SEND;
1140   return (0);
1141 #undef RRDD_UPDATE_SEND
1142 } /* }}} int handle_request_update */
1143
1144 /* we came across a "WROTE" entry during journal replay.
1145  * throw away any values that we have accumulated for this file
1146  */
1147 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1148                                  const char *buffer,
1149                                  size_t buffer_size __attribute__((unused)))
1150 {
1151   int i;
1152   cache_item_t *ci;
1153   const char *file = buffer;
1154
1155   pthread_mutex_lock(&cache_lock);
1156
1157   ci = g_tree_lookup(cache_tree, file);
1158   if (ci == NULL)
1159   {
1160     pthread_mutex_unlock(&cache_lock);
1161     return (0);
1162   }
1163
1164   if (ci->values)
1165   {
1166     for (i=0; i < ci->values_num; i++)
1167       free(ci->values[i]);
1168
1169     free(ci->values);
1170   }
1171
1172   _wipe_ci_values(ci, time(NULL));
1173
1174   pthread_mutex_unlock(&cache_lock);
1175   return (0);
1176 } /* }}} int handle_request_wrote */
1177
1178 /* if fd < 0, we are in journal replay mode */
1179 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1180 {
1181   char *buffer_ptr;
1182   char *command;
1183   int status;
1184
1185   assert (buffer[buffer_size - 1] == '\0');
1186
1187   buffer_ptr = buffer;
1188   command = NULL;
1189   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1190   if (status != 0)
1191   {
1192     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1193     return (-1);
1194   }
1195
1196   if (strcasecmp (command, "update") == 0)
1197   {
1198     /* don't re-write updates in replay mode */
1199     if (fd >= 0)
1200       journal_write(command, buffer_ptr);
1201
1202     return (handle_request_update (fd, buffer_ptr, buffer_size));
1203   }
1204   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1205   {
1206     /* this is only valid in replay mode */
1207     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1208   }
1209   else if (strcasecmp (command, "flush") == 0)
1210   {
1211     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1212   }
1213   else if (strcasecmp (command, "stats") == 0)
1214   {
1215     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1216   }
1217   else if (strcasecmp (command, "help") == 0)
1218   {
1219     return (handle_request_help (fd, buffer_ptr, buffer_size));
1220   }
1221   else
1222   {
1223     char result[CMD_MAX];
1224
1225     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1226     result[sizeof (result) - 1] = 0;
1227
1228     status = swrite (fd, result, strlen (result));
1229     if (status < 0)
1230     {
1231       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1232       return (-1);
1233     }
1234   }
1235
1236   return (0);
1237 } /* }}} int handle_request */
1238
1239 /* MUST NOT hold journal_lock before calling this */
1240 static void journal_rotate(void) /* {{{ */
1241 {
1242   FILE *old_fh = NULL;
1243
1244   if (journal_cur == NULL || journal_old == NULL)
1245     return;
1246
1247   pthread_mutex_lock(&journal_lock);
1248
1249   /* we rotate this way (rename before close) so that the we can release
1250    * the journal lock as fast as possible.  Journal writes to the new
1251    * journal can proceed immediately after the new file is opened.  The
1252    * fclose can then block without affecting new updates.
1253    */
1254   if (journal_fh != NULL)
1255   {
1256     old_fh = journal_fh;
1257     rename(journal_cur, journal_old);
1258     ++stats_journal_rotate;
1259   }
1260
1261   journal_fh = fopen(journal_cur, "a");
1262   pthread_mutex_unlock(&journal_lock);
1263
1264   if (old_fh != NULL)
1265     fclose(old_fh);
1266
1267   if (journal_fh == NULL)
1268     RRDD_LOG(LOG_CRIT,
1269              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1270              journal_cur, rrd_strerror(errno));
1271
1272 } /* }}} static void journal_rotate */
1273
1274 static void journal_done(void) /* {{{ */
1275 {
1276   if (journal_cur == NULL)
1277     return;
1278
1279   pthread_mutex_lock(&journal_lock);
1280   if (journal_fh != NULL)
1281   {
1282     fclose(journal_fh);
1283     journal_fh = NULL;
1284   }
1285
1286   RRDD_LOG(LOG_INFO, "removing journals");
1287
1288   unlink(journal_old);
1289   unlink(journal_cur);
1290   pthread_mutex_unlock(&journal_lock);
1291
1292 } /* }}} static void journal_done */
1293
1294 static int journal_write(char *cmd, char *args) /* {{{ */
1295 {
1296   int chars;
1297
1298   if (journal_fh == NULL)
1299     return 0;
1300
1301   pthread_mutex_lock(&journal_lock);
1302   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1303   pthread_mutex_unlock(&journal_lock);
1304
1305   if (chars > 0)
1306   {
1307     pthread_mutex_lock(&stats_lock);
1308     stats_journal_bytes += chars;
1309     pthread_mutex_unlock(&stats_lock);
1310   }
1311
1312   return chars;
1313 } /* }}} static int journal_write */
1314
1315 static int journal_replay (const char *file) /* {{{ */
1316 {
1317   FILE *fh;
1318   int entry_cnt = 0;
1319   int fail_cnt = 0;
1320   uint64_t line = 0;
1321   char entry[CMD_MAX];
1322
1323   if (file == NULL) return 0;
1324
1325   fh = fopen(file, "r");
1326   if (fh == NULL)
1327   {
1328     if (errno != ENOENT)
1329       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1330                file, rrd_strerror(errno));
1331     return 0;
1332   }
1333   else
1334     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1335
1336   while(!feof(fh))
1337   {
1338     size_t entry_len;
1339
1340     ++line;
1341     fgets(entry, sizeof(entry), fh);
1342     entry_len = strlen(entry);
1343
1344     /* check \n termination in case journal writing crashed mid-line */
1345     if (entry_len == 0)
1346       continue;
1347     else if (entry[entry_len - 1] != '\n')
1348     {
1349       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1350       ++fail_cnt;
1351       continue;
1352     }
1353
1354     entry[entry_len - 1] = '\0';
1355
1356     if (handle_request(-1, entry, entry_len) == 0)
1357       ++entry_cnt;
1358     else
1359       ++fail_cnt;
1360   }
1361
1362   fclose(fh);
1363
1364   if (entry_cnt > 0)
1365   {
1366     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1367              entry_cnt, fail_cnt);
1368     return 1;
1369   }
1370   else
1371     return 0;
1372
1373 } /* }}} static int journal_replay */
1374
1375 static void *connection_thread_main (void *args) /* {{{ */
1376 {
1377   pthread_t self;
1378   int i;
1379   int fd;
1380   
1381   fd = *((int *) args);
1382   free (args);
1383
1384   pthread_mutex_lock (&connection_threads_lock);
1385   {
1386     pthread_t *temp;
1387
1388     temp = (pthread_t *) realloc (connection_threads,
1389         sizeof (pthread_t) * (connection_threads_num + 1));
1390     if (temp == NULL)
1391     {
1392       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1393     }
1394     else
1395     {
1396       connection_threads = temp;
1397       connection_threads[connection_threads_num] = pthread_self ();
1398       connection_threads_num++;
1399     }
1400   }
1401   pthread_mutex_unlock (&connection_threads_lock);
1402
1403   while (do_shutdown == 0)
1404   {
1405     char buffer[CMD_MAX];
1406
1407     struct pollfd pollfd;
1408     int status;
1409
1410     pollfd.fd = fd;
1411     pollfd.events = POLLIN | POLLPRI;
1412     pollfd.revents = 0;
1413
1414     status = poll (&pollfd, 1, /* timeout = */ 500);
1415     if (status == 0) /* timeout */
1416       continue;
1417     else if (status < 0) /* error */
1418     {
1419       status = errno;
1420       if (status == EINTR)
1421         continue;
1422       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1423       continue;
1424     }
1425
1426     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1427     {
1428       close (fd);
1429       break;
1430     }
1431     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1432     {
1433       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1434           "poll(2) returned something unexpected: %#04hx",
1435           pollfd.revents);
1436       close (fd);
1437       break;
1438     }
1439
1440     status = (int) sread (fd, buffer, sizeof (buffer));
1441     if (status <= 0)
1442     {
1443       close (fd);
1444
1445       if (status < 0)
1446         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1447
1448       break;
1449     }
1450
1451     status = handle_request (fd, buffer, /*buffer_size=*/ status);
1452     if (status != 0)
1453     {
1454       close (fd);
1455       break;
1456     }
1457   }
1458
1459   self = pthread_self ();
1460   /* Remove this thread from the connection threads list */
1461   pthread_mutex_lock (&connection_threads_lock);
1462   /* Find out own index in the array */
1463   for (i = 0; i < connection_threads_num; i++)
1464     if (pthread_equal (connection_threads[i], self) != 0)
1465       break;
1466   assert (i < connection_threads_num);
1467
1468   /* Move the trailing threads forward. */
1469   if (i < (connection_threads_num - 1))
1470   {
1471     memmove (connection_threads + i,
1472         connection_threads + i + 1,
1473         sizeof (pthread_t) * (connection_threads_num - i - 1));
1474   }
1475
1476   connection_threads_num--;
1477   pthread_mutex_unlock (&connection_threads_lock);
1478
1479   return (NULL);
1480 } /* }}} void *connection_thread_main */
1481
1482 static int open_listen_socket_unix (const char *path) /* {{{ */
1483 {
1484   int fd;
1485   struct sockaddr_un sa;
1486   listen_socket_t *temp;
1487   int status;
1488
1489   temp = (listen_socket_t *) realloc (listen_fds,
1490       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1491   if (temp == NULL)
1492   {
1493     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1494     return (-1);
1495   }
1496   listen_fds = temp;
1497   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1498
1499   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1500   if (fd < 0)
1501   {
1502     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1503     return (-1);
1504   }
1505
1506   memset (&sa, 0, sizeof (sa));
1507   sa.sun_family = AF_UNIX;
1508   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1509
1510   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1511   if (status != 0)
1512   {
1513     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1514     close (fd);
1515     unlink (path);
1516     return (-1);
1517   }
1518
1519   status = listen (fd, /* backlog = */ 10);
1520   if (status != 0)
1521   {
1522     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1523     close (fd);
1524     unlink (path);
1525     return (-1);
1526   }
1527   
1528   listen_fds[listen_fds_num].fd = fd;
1529   snprintf (listen_fds[listen_fds_num].path,
1530       sizeof (listen_fds[listen_fds_num].path) - 1,
1531       "unix:%s", path);
1532   listen_fds_num++;
1533
1534   return (0);
1535 } /* }}} int open_listen_socket_unix */
1536
1537 static int open_listen_socket (const char *addr) /* {{{ */
1538 {
1539   struct addrinfo ai_hints;
1540   struct addrinfo *ai_res;
1541   struct addrinfo *ai_ptr;
1542   int status;
1543
1544   assert (addr != NULL);
1545
1546   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1547     return (open_listen_socket_unix (addr + strlen ("unix:")));
1548   else if (addr[0] == '/')
1549     return (open_listen_socket_unix (addr));
1550
1551   memset (&ai_hints, 0, sizeof (ai_hints));
1552   ai_hints.ai_flags = 0;
1553 #ifdef AI_ADDRCONFIG
1554   ai_hints.ai_flags |= AI_ADDRCONFIG;
1555 #endif
1556   ai_hints.ai_family = AF_UNSPEC;
1557   ai_hints.ai_socktype = SOCK_STREAM;
1558
1559   ai_res = NULL;
1560   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
1561   if (status != 0)
1562   {
1563     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1564         "%s", addr, gai_strerror (status));
1565     return (-1);
1566   }
1567
1568   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1569   {
1570     int fd;
1571     listen_socket_t *temp;
1572
1573     temp = (listen_socket_t *) realloc (listen_fds,
1574         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1575     if (temp == NULL)
1576     {
1577       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1578       continue;
1579     }
1580     listen_fds = temp;
1581     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1582
1583     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1584     if (fd < 0)
1585     {
1586       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1587       continue;
1588     }
1589
1590     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1591     if (status != 0)
1592     {
1593       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1594       close (fd);
1595       continue;
1596     }
1597
1598     status = listen (fd, /* backlog = */ 10);
1599     if (status != 0)
1600     {
1601       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1602       close (fd);
1603       return (-1);
1604     }
1605
1606     listen_fds[listen_fds_num].fd = fd;
1607     strncpy (listen_fds[listen_fds_num].path, addr,
1608         sizeof (listen_fds[listen_fds_num].path) - 1);
1609     listen_fds_num++;
1610   } /* for (ai_ptr) */
1611
1612   return (0);
1613 } /* }}} int open_listen_socket */
1614
1615 static int close_listen_sockets (void) /* {{{ */
1616 {
1617   size_t i;
1618
1619   for (i = 0; i < listen_fds_num; i++)
1620   {
1621     close (listen_fds[i].fd);
1622     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1623       unlink (listen_fds[i].path + strlen ("unix:"));
1624   }
1625
1626   free (listen_fds);
1627   listen_fds = NULL;
1628   listen_fds_num = 0;
1629
1630   return (0);
1631 } /* }}} int close_listen_sockets */
1632
1633 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1634 {
1635   struct pollfd *pollfds;
1636   int pollfds_num;
1637   int status;
1638   int i;
1639
1640   for (i = 0; i < config_listen_address_list_len; i++)
1641     open_listen_socket (config_listen_address_list[i]);
1642
1643   if (config_listen_address_list_len < 1)
1644     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1645
1646   if (listen_fds_num < 1)
1647   {
1648     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1649         "could be opened. Sorry.");
1650     return (NULL);
1651   }
1652
1653   pollfds_num = listen_fds_num;
1654   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1655   if (pollfds == NULL)
1656   {
1657     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1658     return (NULL);
1659   }
1660   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1661
1662   RRDD_LOG(LOG_INFO, "listening for connections");
1663
1664   while (do_shutdown == 0)
1665   {
1666     assert (pollfds_num == ((int) listen_fds_num));
1667     for (i = 0; i < pollfds_num; i++)
1668     {
1669       pollfds[i].fd = listen_fds[i].fd;
1670       pollfds[i].events = POLLIN | POLLPRI;
1671       pollfds[i].revents = 0;
1672     }
1673
1674     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1675     if (status < 1)
1676     {
1677       status = errno;
1678       if (status != EINTR)
1679       {
1680         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1681       }
1682       continue;
1683     }
1684
1685     for (i = 0; i < pollfds_num; i++)
1686     {
1687       int *client_sd;
1688       struct sockaddr_storage client_sa;
1689       socklen_t client_sa_size;
1690       pthread_t tid;
1691       pthread_attr_t attr;
1692
1693       if (pollfds[i].revents == 0)
1694         continue;
1695
1696       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1697       {
1698         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1699             "poll(2) returned something unexpected for listen FD #%i.",
1700             pollfds[i].fd);
1701         continue;
1702       }
1703
1704       client_sd = (int *) malloc (sizeof (int));
1705       if (client_sd == NULL)
1706       {
1707         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1708         continue;
1709       }
1710
1711       client_sa_size = sizeof (client_sa);
1712       *client_sd = accept (pollfds[i].fd,
1713           (struct sockaddr *) &client_sa, &client_sa_size);
1714       if (*client_sd < 0)
1715       {
1716         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1717         continue;
1718       }
1719
1720       pthread_attr_init (&attr);
1721       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1722
1723       status = pthread_create (&tid, &attr, connection_thread_main,
1724           /* args = */ (void *) client_sd);
1725       if (status != 0)
1726       {
1727         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1728         close (*client_sd);
1729         free (client_sd);
1730         continue;
1731       }
1732     } /* for (pollfds_num) */
1733   } /* while (do_shutdown == 0) */
1734
1735   RRDD_LOG(LOG_INFO, "starting shutdown");
1736
1737   close_listen_sockets ();
1738
1739   pthread_mutex_lock (&connection_threads_lock);
1740   while (connection_threads_num > 0)
1741   {
1742     pthread_t wait_for;
1743
1744     wait_for = connection_threads[0];
1745
1746     pthread_mutex_unlock (&connection_threads_lock);
1747     pthread_join (wait_for, /* retval = */ NULL);
1748     pthread_mutex_lock (&connection_threads_lock);
1749   }
1750   pthread_mutex_unlock (&connection_threads_lock);
1751
1752   return (NULL);
1753 } /* }}} void *listen_thread_main */
1754
1755 static int daemonize (void) /* {{{ */
1756 {
1757   pid_t child;
1758   int status;
1759   char *base_dir;
1760
1761   /* These structures are static, because `sigaction' behaves weird if the are
1762    * overwritten.. */
1763   static struct sigaction sa_int;
1764   static struct sigaction sa_term;
1765   static struct sigaction sa_pipe;
1766
1767   if (stay_foreground)
1768     goto child_startup;
1769
1770   child = fork ();
1771   if (child < 0)
1772   {
1773     fprintf (stderr, "daemonize: fork(2) failed.\n");
1774     return (-1);
1775   }
1776   else if (child > 0)
1777   {
1778     return (1);
1779   }
1780
1781   /* Change into the /tmp directory. */
1782   base_dir = (config_base_dir != NULL)
1783     ? config_base_dir
1784     : "/tmp";
1785   status = chdir (base_dir);
1786   if (status != 0)
1787   {
1788     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1789     return (-1);
1790   }
1791
1792   /* Become session leader */
1793   setsid ();
1794
1795   /* Open the first three file descriptors to /dev/null */
1796   close (2);
1797   close (1);
1798   close (0);
1799
1800   open ("/dev/null", O_RDWR);
1801   dup (0);
1802   dup (0);
1803
1804 child_startup:
1805   /* Install signal handlers */
1806   memset (&sa_int, 0, sizeof (sa_int));
1807   sa_int.sa_handler = sig_int_handler;
1808   sigaction (SIGINT, &sa_int, NULL);
1809
1810   memset (&sa_term, 0, sizeof (sa_term));
1811   sa_term.sa_handler = sig_term_handler;
1812   sigaction (SIGTERM, &sa_term, NULL);
1813
1814   memset (&sa_pipe, 0, sizeof (sa_pipe));
1815   sa_pipe.sa_handler = SIG_IGN;
1816   sigaction (SIGPIPE, &sa_pipe, NULL);
1817
1818   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1819   RRDD_LOG(LOG_INFO, "starting up");
1820
1821   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1822   if (cache_tree == NULL)
1823   {
1824     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1825     return (-1);
1826   }
1827
1828   status = write_pidfile ();
1829   return status;
1830 } /* }}} int daemonize */
1831
1832 static int cleanup (void) /* {{{ */
1833 {
1834   do_shutdown++;
1835
1836   pthread_cond_signal (&cache_cond);
1837   pthread_join (queue_thread, /* return = */ NULL);
1838
1839   remove_pidfile ();
1840
1841   RRDD_LOG(LOG_INFO, "goodbye");
1842   closelog ();
1843
1844   return (0);
1845 } /* }}} int cleanup */
1846
1847 static int read_options (int argc, char **argv) /* {{{ */
1848 {
1849   int option;
1850   int status = 0;
1851
1852   while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1853   {
1854     switch (option)
1855     {
1856       case 'g':
1857         stay_foreground=1;
1858         break;
1859
1860       case 'l':
1861       {
1862         char **temp;
1863
1864         temp = (char **) realloc (config_listen_address_list,
1865             sizeof (char *) * (config_listen_address_list_len + 1));
1866         if (temp == NULL)
1867         {
1868           fprintf (stderr, "read_options: realloc failed.\n");
1869           return (2);
1870         }
1871         config_listen_address_list = temp;
1872
1873         temp[config_listen_address_list_len] = strdup (optarg);
1874         if (temp[config_listen_address_list_len] == NULL)
1875         {
1876           fprintf (stderr, "read_options: strdup failed.\n");
1877           return (2);
1878         }
1879         config_listen_address_list_len++;
1880       }
1881       break;
1882
1883       case 'f':
1884       {
1885         int temp;
1886
1887         temp = atoi (optarg);
1888         if (temp > 0)
1889           config_flush_interval = temp;
1890         else
1891         {
1892           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1893           status = 3;
1894         }
1895       }
1896       break;
1897
1898       case 'w':
1899       {
1900         int temp;
1901
1902         temp = atoi (optarg);
1903         if (temp > 0)
1904           config_write_interval = temp;
1905         else
1906         {
1907           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1908           status = 2;
1909         }
1910       }
1911       break;
1912
1913       case 'z':
1914       {
1915         int temp;
1916
1917         temp = atoi(optarg);
1918         if (temp > 0)
1919           config_write_jitter = temp;
1920         else
1921         {
1922           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1923           status = 2;
1924         }
1925
1926         break;
1927       }
1928
1929       case 'b':
1930       {
1931         size_t len;
1932
1933         if (config_base_dir != NULL)
1934           free (config_base_dir);
1935         config_base_dir = strdup (optarg);
1936         if (config_base_dir == NULL)
1937         {
1938           fprintf (stderr, "read_options: strdup failed.\n");
1939           return (3);
1940         }
1941
1942         len = strlen (config_base_dir);
1943         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1944         {
1945           config_base_dir[len - 1] = 0;
1946           len--;
1947         }
1948
1949         if (len < 1)
1950         {
1951           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1952           return (4);
1953         }
1954       }
1955       break;
1956
1957       case 'p':
1958       {
1959         if (config_pid_file != NULL)
1960           free (config_pid_file);
1961         config_pid_file = strdup (optarg);
1962         if (config_pid_file == NULL)
1963         {
1964           fprintf (stderr, "read_options: strdup failed.\n");
1965           return (3);
1966         }
1967       }
1968       break;
1969
1970       case 'j':
1971       {
1972         struct stat statbuf;
1973         const char *dir = optarg;
1974
1975         status = stat(dir, &statbuf);
1976         if (status != 0)
1977         {
1978           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
1979           return 6;
1980         }
1981
1982         if (!S_ISDIR(statbuf.st_mode)
1983             || access(dir, R_OK|W_OK|X_OK) != 0)
1984         {
1985           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
1986                   errno ? rrd_strerror(errno) : "");
1987           return 6;
1988         }
1989
1990         journal_cur = malloc(PATH_MAX + 1);
1991         journal_old = malloc(PATH_MAX + 1);
1992         if (journal_cur == NULL || journal_old == NULL)
1993         {
1994           fprintf(stderr, "malloc failure for journal files\n");
1995           return 6;
1996         }
1997         else 
1998         {
1999           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2000           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2001         }
2002       }
2003       break;
2004
2005       case 'h':
2006       case '?':
2007         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
2008             "\n"
2009             "Usage: rrdcached [options]\n"
2010             "\n"
2011             "Valid options are:\n"
2012             "  -l <address>  Socket address to listen to.\n"
2013             "  -w <seconds>  Interval in which to write data.\n"
2014             "  -z <delay>    Delay writes up to <delay> seconds to spread load" \
2015             "  -f <seconds>  Interval in which to flush dead data.\n"
2016             "  -p <file>     Location of the PID-file.\n"
2017             "  -b <dir>      Base directory to change to.\n"
2018             "\n"
2019             "For more information and a detailed description of all options "
2020             "please refer\n"
2021             "to the rrdcached(1) manual page.\n",
2022             VERSION);
2023         status = -1;
2024         break;
2025     } /* switch (option) */
2026   } /* while (getopt) */
2027
2028   /* advise the user when values are not sane */
2029   if (config_flush_interval < 2 * config_write_interval)
2030     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2031             " 2x write interval (-w) !\n");
2032   if (config_write_jitter > config_write_interval)
2033     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2034             " write interval (-w) !\n");
2035
2036   return (status);
2037 } /* }}} int read_options */
2038
2039 int main (int argc, char **argv)
2040 {
2041   int status;
2042
2043   status = read_options (argc, argv);
2044   if (status != 0)
2045   {
2046     if (status < 0)
2047       status = 0;
2048     return (status);
2049   }
2050
2051   status = daemonize ();
2052   if (status == 1)
2053   {
2054     struct sigaction sigchld;
2055
2056     memset (&sigchld, 0, sizeof (sigchld));
2057     sigchld.sa_handler = SIG_IGN;
2058     sigaction (SIGCHLD, &sigchld, NULL);
2059
2060     return (0);
2061   }
2062   else if (status != 0)
2063   {
2064     fprintf (stderr, "daemonize failed, exiting.\n");
2065     return (1);
2066   }
2067
2068   if (journal_cur != NULL)
2069   {
2070     int had_journal = 0;
2071
2072     pthread_mutex_lock(&journal_lock);
2073
2074     RRDD_LOG(LOG_INFO, "checking for journal files");
2075
2076     had_journal += journal_replay(journal_old);
2077     had_journal += journal_replay(journal_cur);
2078
2079     if (had_journal)
2080       flush_old_values(-1);
2081
2082     pthread_mutex_unlock(&journal_lock);
2083     journal_rotate();
2084
2085     RRDD_LOG(LOG_INFO, "journal processing complete");
2086   }
2087
2088   /* start the queue thread */
2089   memset (&queue_thread, 0, sizeof (queue_thread));
2090   status = pthread_create (&queue_thread,
2091                            NULL, /* attr */
2092                            queue_thread_main,
2093                            NULL); /* args */
2094   if (status != 0)
2095   {
2096     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2097     cleanup();
2098     return (1);
2099   }
2100
2101   listen_thread_main (NULL);
2102   cleanup ();
2103
2104   return (0);
2105 } /* int main */
2106
2107 /*
2108  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2109  */