src/rrd_daemon.c: Count some statistics.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 /*
95  * Types
96  */
97 struct listen_socket_s
98 {
99   int fd;
100   char path[PATH_MAX + 1];
101 };
102 typedef struct listen_socket_s listen_socket_t;
103
104 struct cache_item_s;
105 typedef struct cache_item_s cache_item_t;
106 struct cache_item_s
107 {
108   char *file;
109   char **values;
110   int values_num;
111   time_t last_flush_time;
112 #define CI_FLAGS_IN_TREE  0x01
113 #define CI_FLAGS_IN_QUEUE 0x02
114   int flags;
115
116   cache_item_t *next;
117 };
118
119 struct callback_flush_data_s
120 {
121   time_t now;
122   char **keys;
123   size_t keys_num;
124 };
125 typedef struct callback_flush_data_s callback_flush_data_t;
126
127 enum queue_side_e
128 {
129   HEAD,
130   TAIL
131 };
132 typedef enum queue_side_e queue_side_t;
133
134 /*
135  * Variables
136  */
137 static listen_socket_t *listen_fds = NULL;
138 static size_t listen_fds_num = 0;
139
140 static int do_shutdown = 0;
141
142 static pthread_t queue_thread;
143
144 static pthread_t *connetion_threads = NULL;
145 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
146 static int connetion_threads_num = 0;
147
148 /* Cache stuff */
149 static GTree          *cache_tree = NULL;
150 static cache_item_t   *cache_queue_head = NULL;
151 static cache_item_t   *cache_queue_tail = NULL;
152 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
153 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
154
155 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
156
157 static int config_write_interval = 300;
158 static int config_flush_interval = 3600;
159 static char *config_pid_file = NULL;
160 static char *config_base_dir = NULL;
161
162 static char **config_listen_address_list = NULL;
163 static int config_listen_address_list_len = 0;
164
165 static uint64_t stats_queue_length = 0;
166 static uint64_t stats_updates_total = 0;
167 static uint64_t stats_values_total = 0;
168 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
169
170 /* 
171  * Functions
172  */
173 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
174 {
175   do_shutdown++;
176 } /* }}} void sig_int_handler */
177
178 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
179 {
180   do_shutdown++;
181 } /* }}} void sig_term_handler */
182
183 static int write_pidfile (void) /* {{{ */
184 {
185   pid_t pid;
186   char *file;
187   FILE *fh;
188
189   pid = getpid ();
190   
191   file = (config_pid_file != NULL)
192     ? config_pid_file
193     : LOCALSTATEDIR "/run/rrdcached.pid";
194
195   fh = fopen (file, "w");
196   if (fh == NULL)
197   {
198     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
199     return (-1);
200   }
201
202   fprintf (fh, "%i\n", (int) pid);
203   fclose (fh);
204
205   return (0);
206 } /* }}} int write_pidfile */
207
208 static int remove_pidfile (void) /* {{{ */
209 {
210   char *file;
211   int status;
212
213   file = (config_pid_file != NULL)
214     ? config_pid_file
215     : LOCALSTATEDIR "/run/rrdcached.pid";
216
217   status = unlink (file);
218   if (status == 0)
219     return (0);
220   return (errno);
221 } /* }}} int remove_pidfile */
222
223 /*
224  * enqueue_cache_item:
225  * `cache_lock' must be acquired before calling this function!
226  */
227 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
228     queue_side_t side)
229 {
230   int did_insert = 0;
231
232   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
233       ci->file);
234
235   if (ci == NULL)
236     return (-1);
237
238   if (ci->values_num == 0)
239     return (0);
240
241   if (side == HEAD)
242   {
243     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
244     {
245       assert (ci->next == NULL);
246       ci->next = cache_queue_head;
247       cache_queue_head = ci;
248
249       if (cache_queue_tail == NULL)
250         cache_queue_tail = cache_queue_head;
251
252       did_insert = 1;
253     }
254     else if (cache_queue_head == ci)
255     {
256       /* do nothing */
257     }
258     else /* enqueued, but not first entry */
259     {
260       cache_item_t *prev;
261
262       /* find previous entry */
263       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
264         if (prev->next == ci)
265           break;
266       assert (prev != NULL);
267
268       /* move to the front */
269       prev->next = ci->next;
270       ci->next = cache_queue_head;
271       cache_queue_head = ci;
272
273       /* check if we need to adapt the tail */
274       if (cache_queue_tail == ci)
275         cache_queue_tail = prev;
276     }
277   }
278   else /* (side == TAIL) */
279   {
280     /* We don't move values back in the list.. */
281     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
282       return (0);
283
284     assert (ci->next == NULL);
285
286     if (cache_queue_tail == NULL)
287       cache_queue_head = ci;
288     else
289       cache_queue_tail->next = ci;
290     cache_queue_tail = ci;
291
292     did_insert = 1;
293   }
294
295   ci->flags |= CI_FLAGS_IN_QUEUE;
296
297   if (did_insert)
298   {
299     pthread_mutex_lock (&stats_lock);
300     stats_queue_length++;
301     pthread_mutex_unlock (&stats_lock);
302   }
303
304   return (0);
305 } /* }}} int enqueue_cache_item */
306
307 /*
308  * tree_callback_flush:
309  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
310  * while this is in progress.
311  */
312 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
313     gpointer data)
314 {
315   cache_item_t *ci;
316   callback_flush_data_t *cfd;
317
318   ci = (cache_item_t *) value;
319   cfd = (callback_flush_data_t *) data;
320
321   if (((cfd->now - ci->last_flush_time) >= config_write_interval)
322       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
323       && (ci->values_num > 0))
324   {
325     enqueue_cache_item (ci, TAIL);
326   }
327   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
328       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
329       && (ci->values_num <= 0))
330   {
331     char **temp;
332
333     temp = (char **) realloc (cfd->keys,
334         sizeof (char *) * (cfd->keys_num + 1));
335     if (temp == NULL)
336     {
337       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
338       return (FALSE);
339     }
340     cfd->keys = temp;
341     /* Make really sure this points to the _same_ place */
342     assert ((char *) key == ci->file);
343     cfd->keys[cfd->keys_num] = (char *) key;
344     cfd->keys_num++;
345   }
346
347   return (FALSE);
348 } /* }}} gboolean tree_callback_flush */
349
350 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
351 {
352   struct timeval now;
353   struct timespec next_flush;
354
355   gettimeofday (&now, NULL);
356   next_flush.tv_sec = now.tv_sec + config_flush_interval;
357   next_flush.tv_nsec = 1000 * now.tv_usec;
358
359   pthread_mutex_lock (&cache_lock);
360   while ((do_shutdown == 0) || (cache_queue_head != NULL))
361   {
362     cache_item_t *ci;
363     char *file;
364     char **values;
365     int values_num;
366     int status;
367     int i;
368
369     /* First, check if it's time to do the cache flush. */
370     gettimeofday (&now, NULL);
371     if ((now.tv_sec > next_flush.tv_sec)
372         || ((now.tv_sec == next_flush.tv_sec)
373           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
374     {
375       callback_flush_data_t cfd;
376       size_t k;
377
378       memset (&cfd, 0, sizeof (cfd));
379       /* Pass the current time as user data so that we don't need to call
380        * `time' for each node. */
381       cfd.now = time (NULL);
382       cfd.keys = NULL;
383       cfd.keys_num = 0;
384
385       /* `tree_callback_flush' will return the keys of all values that haven't
386        * been touched in the last `config_flush_interval' seconds in `cfd'.
387        * The char*'s in this array point to the same memory as ci->file, so we
388        * don't need to free them separately. */
389       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
390
391       for (k = 0; k < cfd.keys_num; k++)
392       {
393         /* This must not fail. */
394         ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
395         assert (ci != NULL);
396
397         /* If we end up here with values available, something's seriously
398          * messed up. */
399         assert (ci->values_num == 0);
400
401         /* Remove the node from the tree */
402         g_tree_remove (cache_tree, cfd.keys[k]);
403         cfd.keys[k] = NULL;
404
405         /* Now free and clean up `ci'. */
406         free (ci->file);
407         ci->file = NULL;
408         free (ci);
409         ci = NULL;
410       } /* for (k = 0; k < cfd.keys_num; k++) */
411
412       if (cfd.keys != NULL)
413       {
414         free (cfd.keys);
415         cfd.keys = NULL;
416       }
417
418       /* Determine the time of the next cache flush. */
419       while (next_flush.tv_sec < now.tv_sec)
420         next_flush.tv_sec += config_flush_interval;
421     }
422
423     /* Now, check if there's something to store away. If not, wait until
424      * something comes in or it's time to do the cache flush. */
425     if (cache_queue_head == NULL)
426     {
427       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
428       if ((status != 0) && (status != ETIMEDOUT))
429       {
430         RRDD_LOG (LOG_ERR, "queue_thread_main: "
431             "pthread_cond_timedwait returned %i.", status);
432       }
433     }
434
435     /* Check if a value has arrived. This may be NULL if we timed out or there
436      * was an interrupt such as a signal. */
437     if (cache_queue_head == NULL)
438       continue;
439
440     ci = cache_queue_head;
441
442     /* copy the relevant parts */
443     file = strdup (ci->file);
444     if (file == NULL)
445     {
446       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
447       continue;
448     }
449
450     values = ci->values;
451     values_num = ci->values_num;
452
453     ci->values = NULL;
454     ci->values_num = 0;
455
456     ci->last_flush_time = time (NULL);
457     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
458
459     cache_queue_head = ci->next;
460     if (cache_queue_head == NULL)
461       cache_queue_tail = NULL;
462     ci->next = NULL;
463
464     pthread_mutex_lock (&stats_lock);
465     assert (stats_queue_length > 0);
466     stats_queue_length--;
467     pthread_mutex_unlock (&stats_lock);
468
469     pthread_mutex_unlock (&cache_lock);
470
471     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
472         file, values_num, (void *) values);
473
474     status = rrd_update_r (file, NULL, values_num, (void *) values);
475     if (status != 0)
476     {
477       RRDD_LOG (LOG_ERR, "queue_thread_main: "
478           "rrd_update_r failed with status %i.",
479           status);
480     }
481
482     free (file);
483     for (i = 0; i < values_num; i++)
484       free (values[i]);
485
486     pthread_mutex_lock (&stats_lock);
487     stats_updates_total++;
488     stats_values_total += values_num;
489     pthread_mutex_unlock (&stats_lock);
490
491     pthread_mutex_lock (&cache_lock);
492     pthread_cond_broadcast (&flush_cond);
493   } /* while (do_shutdown == 0) */
494   pthread_mutex_unlock (&cache_lock);
495
496   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
497
498   return (NULL);
499 } /* }}} void *queue_thread_main */
500
501 static int buffer_get_field (char **buffer_ret, /* {{{ */
502     size_t *buffer_size_ret, char **field_ret)
503 {
504   char *buffer;
505   size_t buffer_pos;
506   size_t buffer_size;
507   char *field;
508   size_t field_size;
509   int status;
510
511   buffer = *buffer_ret;
512   buffer_pos = 0;
513   buffer_size = *buffer_size_ret;
514   field = *buffer_ret;
515   field_size = 0;
516
517   /* This is ensured by `handle_request'. */
518   assert (buffer[buffer_size - 1] == ' ');
519
520   status = -1;
521   while (buffer_pos < buffer_size)
522   {
523     /* Check for end-of-field or end-of-buffer */
524     if (buffer[buffer_pos] == ' ')
525     {
526       field[field_size] = 0;
527       field_size++;
528       buffer_pos++;
529       status = 0;
530       break;
531     }
532     /* Handle escaped characters. */
533     else if (buffer[buffer_pos] == '\\')
534     {
535       if (buffer_pos >= (buffer_size - 1))
536         break;
537       buffer_pos++;
538       field[field_size] = buffer[buffer_pos];
539       field_size++;
540       buffer_pos++;
541     }
542     /* Normal operation */ 
543     else
544     {
545       field[field_size] = buffer[buffer_pos];
546       field_size++;
547       buffer_pos++;
548     }
549   } /* while (buffer_pos < buffer_size) */
550
551   if (status != 0)
552     return (status);
553
554   *buffer_ret = buffer + buffer_pos;
555   *buffer_size_ret = buffer_size - buffer_pos;
556   *field_ret = field;
557
558   return (0);
559 } /* }}} int buffer_get_field */
560
561 static int flush_file (const char *filename) /* {{{ */
562 {
563   cache_item_t *ci;
564
565   pthread_mutex_lock (&cache_lock);
566
567   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
568   if (ci == NULL)
569   {
570     pthread_mutex_unlock (&cache_lock);
571     return (ENOENT);
572   }
573
574   /* Enqueue at head */
575   enqueue_cache_item (ci, HEAD);
576   pthread_cond_signal (&cache_cond);
577
578   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
579   {
580     ci = NULL;
581
582     pthread_cond_wait (&flush_cond, &cache_lock);
583
584     ci = g_tree_lookup (cache_tree, filename);
585     if (ci == NULL)
586     {
587       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
588           "while waiting for flush.");
589       pthread_mutex_unlock (&cache_lock);
590       return (-1);
591     }
592   }
593
594   pthread_mutex_unlock (&cache_lock);
595   return (0);
596 } /* }}} int flush_file */
597
598 static int handle_request_flush (int fd, /* {{{ */
599     char *buffer, size_t buffer_size)
600 {
601   char *file;
602   int status;
603   char result[4096];
604
605   status = buffer_get_field (&buffer, &buffer_size, &file);
606   if (status != 0)
607   {
608     RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name.");
609     return (-1);
610   }
611
612   status = flush_file (file);
613   if (status == 0)
614     snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
615   else if (status == ENOENT)
616     snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
617   else if (status < 0)
618     strncpy (result, "-1 Internal error.\n", sizeof (result));
619   else
620     snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
621   result[sizeof (result) - 1] = 0;
622
623   status = write (fd, result, strlen (result));
624   if (status < 0)
625   {
626     status = errno;
627     RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error.");
628     return (status);
629   }
630
631   return (0);
632 } /* }}} int handle_request_flush */
633
634 static int handle_request_update (int fd, /* {{{ */
635     char *buffer, size_t buffer_size)
636 {
637   char *file;
638   int values_num = 0;
639   int status;
640
641   time_t now;
642
643   cache_item_t *ci;
644   char answer[4096];
645
646   now = time (NULL);
647
648   status = buffer_get_field (&buffer, &buffer_size, &file);
649   if (status != 0)
650   {
651     RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
652     return (-1);
653   }
654
655   pthread_mutex_lock (&cache_lock);
656
657   ci = g_tree_lookup (cache_tree, file);
658   if (ci == NULL) /* {{{ */
659   {
660     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
661     if (ci == NULL)
662     {
663       pthread_mutex_unlock (&cache_lock);
664       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
665       return (-1);
666     }
667     memset (ci, 0, sizeof (cache_item_t));
668
669     ci->file = strdup (file);
670     if (ci->file == NULL)
671     {
672       pthread_mutex_unlock (&cache_lock);
673       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
674       free (ci);
675       return (-1);
676     }
677
678     ci->values = NULL;
679     ci->values_num = 0;
680     ci->last_flush_time = now;
681     ci->flags = CI_FLAGS_IN_TREE;
682
683     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
684
685     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
686         ci->file);
687   } /* }}} */
688   assert (ci != NULL);
689
690   while (buffer_size > 0)
691   {
692     char **temp;
693     char *value;
694
695     status = buffer_get_field (&buffer, &buffer_size, &value);
696     if (status != 0)
697     {
698       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
699       break;
700     }
701
702     temp = (char **) realloc (ci->values,
703         sizeof (char *) * (ci->values_num + 1));
704     if (temp == NULL)
705     {
706       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
707       continue;
708     }
709     ci->values = temp;
710
711     ci->values[ci->values_num] = strdup (value);
712     if (ci->values[ci->values_num] == NULL)
713     {
714       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
715       continue;
716     }
717     ci->values_num++;
718
719     values_num++;
720   }
721
722   if (((now - ci->last_flush_time) >= config_write_interval)
723       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
724       && (ci->values_num > 0))
725   {
726     enqueue_cache_item (ci, TAIL);
727     pthread_cond_signal (&cache_cond);
728   }
729
730   pthread_mutex_unlock (&cache_lock);
731
732   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
733   answer[sizeof (answer) - 1] = 0;
734
735   status = write (fd, answer, strlen (answer));
736   if (status < 0)
737   {
738     status = errno;
739     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
740     return (status);
741   }
742
743   return (0);
744 } /* }}} int handle_request_update */
745
746 static int handle_request (int fd) /* {{{ */
747 {
748   char buffer[4096];
749   size_t buffer_size;
750   char *buffer_ptr;
751   char *command;
752   int status;
753
754   status = read (fd, buffer, sizeof (buffer));
755   if (status == 0)
756   {
757     return (1);
758   }
759   else if (status < 0)
760   {
761     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
762     return (-1);
763   }
764   buffer_size = status;
765   assert (((size_t) buffer_size) <= sizeof (buffer));
766
767   if (buffer[buffer_size - 1] != '\n')
768   {
769     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
770     return (-1);
771   }
772
773   /* Accept Windows style line endings, too */
774   if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r'))
775   {
776     buffer_size--;
777     buffer[buffer_size - 1] = '\n';
778   }
779
780   /* Place the normal field separator at the end to simplify
781    * `buffer_get_field's work. */
782   buffer[buffer_size - 1] = ' ';
783
784   buffer_ptr = buffer;
785   command = NULL;
786   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
787   if (status != 0)
788   {
789     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
790     return (-1);
791   }
792
793   if (strcmp (command, "update") == 0)
794   {
795     return (handle_request_update (fd, buffer_ptr, buffer_size));
796   }
797   else if (strcmp (command, "flush") == 0)
798   {
799     return (handle_request_flush (fd, buffer_ptr, buffer_size));
800   }
801   else
802   {
803     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
804     return (-1);
805   }
806 } /* }}} int handle_request */
807
808 static void *connection_thread_main (void *args /* {{{ */
809     __attribute__((unused)))
810 {
811   pthread_t self;
812   int i;
813   int fd;
814   
815   fd = *((int *) args);
816
817   pthread_mutex_lock (&connetion_threads_lock);
818   {
819     pthread_t *temp;
820
821     temp = (pthread_t *) realloc (connetion_threads,
822         sizeof (pthread_t) * (connetion_threads_num + 1));
823     if (temp == NULL)
824     {
825       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
826     }
827     else
828     {
829       connetion_threads = temp;
830       connetion_threads[connetion_threads_num] = pthread_self ();
831       connetion_threads_num++;
832     }
833   }
834   pthread_mutex_unlock (&connetion_threads_lock);
835
836   while (do_shutdown == 0)
837   {
838     struct pollfd pollfd;
839     int status;
840
841     pollfd.fd = fd;
842     pollfd.events = POLLIN | POLLPRI;
843     pollfd.revents = 0;
844
845     status = poll (&pollfd, 1, /* timeout = */ 500);
846     if (status == 0) /* timeout */
847       continue;
848     else if (status < 0) /* error */
849     {
850       status = errno;
851       if (status == EINTR)
852         continue;
853       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
854       continue;
855     }
856
857     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
858     {
859       close (fd);
860       break;
861     }
862     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
863     {
864       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
865           "poll(2) returned something unexpected: %#04hx",
866           pollfd.revents);
867       close (fd);
868       break;
869     }
870
871     status = handle_request (fd);
872     if (status != 0)
873     {
874       close (fd);
875       break;
876     }
877   }
878
879   self = pthread_self ();
880   /* Remove this thread from the connection threads list */
881   pthread_mutex_lock (&connetion_threads_lock);
882   /* Find out own index in the array */
883   for (i = 0; i < connetion_threads_num; i++)
884     if (pthread_equal (connetion_threads[i], self) != 0)
885       break;
886   assert (i < connetion_threads_num);
887
888   /* Move the trailing threads forward. */
889   if (i < (connetion_threads_num - 1))
890   {
891     memmove (connetion_threads + i,
892         connetion_threads + i + 1,
893         sizeof (pthread_t) * (connetion_threads_num - i - 1));
894   }
895
896   connetion_threads_num--;
897   pthread_mutex_unlock (&connetion_threads_lock);
898
899   free (args);
900   return (NULL);
901 } /* }}} void *connection_thread_main */
902
903 static int open_listen_socket_unix (const char *path) /* {{{ */
904 {
905   int fd;
906   struct sockaddr_un sa;
907   listen_socket_t *temp;
908   int status;
909
910   temp = (listen_socket_t *) realloc (listen_fds,
911       sizeof (listen_fds[0]) * (listen_fds_num + 1));
912   if (temp == NULL)
913   {
914     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
915     return (-1);
916   }
917   listen_fds = temp;
918   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
919
920   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
921   if (fd < 0)
922   {
923     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
924     return (-1);
925   }
926
927   memset (&sa, 0, sizeof (sa));
928   sa.sun_family = AF_UNIX;
929   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
930
931   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
932   if (status != 0)
933   {
934     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
935     close (fd);
936     unlink (path);
937     return (-1);
938   }
939
940   status = listen (fd, /* backlog = */ 10);
941   if (status != 0)
942   {
943     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
944     close (fd);
945     unlink (path);
946     return (-1);
947   }
948   
949   listen_fds[listen_fds_num].fd = fd;
950   snprintf (listen_fds[listen_fds_num].path,
951       sizeof (listen_fds[listen_fds_num].path) - 1,
952       "unix:%s", path);
953   listen_fds_num++;
954
955   return (0);
956 } /* }}} int open_listen_socket_unix */
957
958 static int open_listen_socket (const char *addr) /* {{{ */
959 {
960   struct addrinfo ai_hints;
961   struct addrinfo *ai_res;
962   struct addrinfo *ai_ptr;
963   int status;
964
965   assert (addr != NULL);
966
967   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
968     return (open_listen_socket_unix (addr + strlen ("unix:")));
969   else if (addr[0] == '/')
970     return (open_listen_socket_unix (addr));
971
972   memset (&ai_hints, 0, sizeof (ai_hints));
973   ai_hints.ai_flags = 0;
974 #ifdef AI_ADDRCONFIG
975   ai_hints.ai_flags |= AI_ADDRCONFIG;
976 #endif
977   ai_hints.ai_family = AF_UNSPEC;
978   ai_hints.ai_socktype = SOCK_STREAM;
979
980   ai_res = NULL;
981   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
982   if (status != 0)
983   {
984     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
985         "%s", addr, gai_strerror (status));
986     return (-1);
987   }
988
989   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
990   {
991     int fd;
992     listen_socket_t *temp;
993
994     temp = (listen_socket_t *) realloc (listen_fds,
995         sizeof (listen_fds[0]) * (listen_fds_num + 1));
996     if (temp == NULL)
997     {
998       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
999       continue;
1000     }
1001     listen_fds = temp;
1002     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1003
1004     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1005     if (fd < 0)
1006     {
1007       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1008       continue;
1009     }
1010
1011     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1012     if (status != 0)
1013     {
1014       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1015       close (fd);
1016       continue;
1017     }
1018
1019     status = listen (fd, /* backlog = */ 10);
1020     if (status != 0)
1021     {
1022       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1023       close (fd);
1024       return (-1);
1025     }
1026
1027     listen_fds[listen_fds_num].fd = fd;
1028     strncpy (listen_fds[listen_fds_num].path, addr,
1029         sizeof (listen_fds[listen_fds_num].path) - 1);
1030     listen_fds_num++;
1031   } /* for (ai_ptr) */
1032
1033   return (0);
1034 } /* }}} int open_listen_socket */
1035
1036 static int close_listen_sockets (void) /* {{{ */
1037 {
1038   size_t i;
1039
1040   for (i = 0; i < listen_fds_num; i++)
1041   {
1042     close (listen_fds[i].fd);
1043     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1044       unlink (listen_fds[i].path + strlen ("unix:"));
1045   }
1046
1047   free (listen_fds);
1048   listen_fds = NULL;
1049   listen_fds_num = 0;
1050
1051   return (0);
1052 } /* }}} int close_listen_sockets */
1053
1054 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1055 {
1056   struct pollfd *pollfds;
1057   int pollfds_num;
1058   int status;
1059   int i;
1060
1061   for (i = 0; i < config_listen_address_list_len; i++)
1062   {
1063     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
1064         "= %s", i, config_listen_address_list[i]);
1065     open_listen_socket (config_listen_address_list[i]);
1066   }
1067
1068   if (config_listen_address_list_len < 1)
1069     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1070
1071   if (listen_fds_num < 1)
1072   {
1073     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1074         "could be opened. Sorry.");
1075     return (NULL);
1076   }
1077
1078   pollfds_num = listen_fds_num;
1079   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1080   if (pollfds == NULL)
1081   {
1082     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1083     return (NULL);
1084   }
1085   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1086
1087   while (do_shutdown == 0)
1088   {
1089     assert (pollfds_num == ((int) listen_fds_num));
1090     for (i = 0; i < pollfds_num; i++)
1091     {
1092       pollfds[i].fd = listen_fds[i].fd;
1093       pollfds[i].events = POLLIN | POLLPRI;
1094       pollfds[i].revents = 0;
1095     }
1096
1097     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1098     if (status < 1)
1099     {
1100       status = errno;
1101       if (status != EINTR)
1102       {
1103         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1104       }
1105       continue;
1106     }
1107
1108     for (i = 0; i < pollfds_num; i++)
1109     {
1110       int *client_sd;
1111       struct sockaddr_storage client_sa;
1112       socklen_t client_sa_size;
1113       pthread_t tid;
1114
1115       if (pollfds[i].revents == 0)
1116         continue;
1117
1118       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1119       {
1120         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1121             "poll(2) returned something unexpected for listen FD #%i.",
1122             pollfds[i].fd);
1123         continue;
1124       }
1125
1126       client_sd = (int *) malloc (sizeof (int));
1127       if (client_sd == NULL)
1128       {
1129         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1130         continue;
1131       }
1132
1133       client_sa_size = sizeof (client_sa);
1134       *client_sd = accept (pollfds[i].fd,
1135           (struct sockaddr *) &client_sa, &client_sa_size);
1136       if (*client_sd < 0)
1137       {
1138         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1139         continue;
1140       }
1141
1142       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
1143           /* args = */ (void *) client_sd);
1144       if (status != 0)
1145       {
1146         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1147         close (*client_sd);
1148         free (client_sd);
1149         continue;
1150       }
1151     } /* for (pollfds_num) */
1152   } /* while (do_shutdown == 0) */
1153
1154   close_listen_sockets ();
1155
1156   pthread_mutex_lock (&connetion_threads_lock);
1157   while (connetion_threads_num > 0)
1158   {
1159     pthread_t wait_for;
1160
1161     wait_for = connetion_threads[0];
1162
1163     pthread_mutex_unlock (&connetion_threads_lock);
1164     pthread_join (wait_for, /* retval = */ NULL);
1165     pthread_mutex_lock (&connetion_threads_lock);
1166   }
1167   pthread_mutex_unlock (&connetion_threads_lock);
1168
1169   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
1170
1171   return (NULL);
1172 } /* }}} void *listen_thread_main */
1173
1174 static int daemonize (void) /* {{{ */
1175 {
1176   pid_t child;
1177   int status;
1178   char *base_dir;
1179
1180   /* These structures are static, because `sigaction' behaves weird if the are
1181    * overwritten.. */
1182   static struct sigaction sa_int;
1183   static struct sigaction sa_term;
1184   static struct sigaction sa_pipe;
1185
1186   child = fork ();
1187   if (child < 0)
1188   {
1189     fprintf (stderr, "daemonize: fork(2) failed.\n");
1190     return (-1);
1191   }
1192   else if (child > 0)
1193   {
1194     return (1);
1195   }
1196
1197   /* Change into the /tmp directory. */
1198   base_dir = (config_base_dir != NULL)
1199     ? config_base_dir
1200     : "/tmp";
1201   status = chdir (base_dir);
1202   if (status != 0)
1203   {
1204     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1205     return (-1);
1206   }
1207
1208   /* Become session leader */
1209   setsid ();
1210
1211   /* Open the first three file descriptors to /dev/null */
1212   close (2);
1213   close (1);
1214   close (0);
1215
1216   open ("/dev/null", O_RDWR);
1217   dup (0);
1218   dup (0);
1219
1220   /* Install signal handlers */
1221   memset (&sa_int, 0, sizeof (sa_int));
1222   sa_int.sa_handler = sig_int_handler;
1223   sigaction (SIGINT, &sa_int, NULL);
1224
1225   memset (&sa_term, 0, sizeof (sa_term));
1226   sa_term.sa_handler = sig_term_handler;
1227   sigaction (SIGINT, &sa_term, NULL);
1228
1229   memset (&sa_pipe, 0, sizeof (sa_pipe));
1230   sa_pipe.sa_handler = SIG_IGN;
1231   sigaction (SIGPIPE, &sa_pipe, NULL);
1232
1233   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1234
1235   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1236   if (cache_tree == NULL)
1237   {
1238     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1239     return (-1);
1240   }
1241
1242   memset (&queue_thread, 0, sizeof (queue_thread));
1243   status = pthread_create (&queue_thread, /* attr = */ NULL,
1244       queue_thread_main, /* args = */ NULL);
1245   if (status != 0)
1246   {
1247     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
1248     return (-1);
1249   }
1250
1251   write_pidfile ();
1252
1253   return (0);
1254 } /* }}} int daemonize */
1255
1256 static int cleanup (void) /* {{{ */
1257 {
1258   RRDD_LOG (LOG_DEBUG, "cleanup ()");
1259
1260   do_shutdown++;
1261
1262   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
1263   pthread_cond_signal (&cache_cond);
1264   pthread_join (queue_thread, /* return = */ NULL);
1265   RRDD_LOG (LOG_DEBUG, "cleanup: done");
1266
1267   remove_pidfile ();
1268
1269   closelog ();
1270
1271   return (0);
1272 } /* }}} int cleanup */
1273
1274 static int read_options (int argc, char **argv) /* {{{ */
1275 {
1276   int option;
1277   int status = 0;
1278
1279   while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1)
1280   {
1281     switch (option)
1282     {
1283       case 'l':
1284       {
1285         char **temp;
1286
1287         temp = (char **) realloc (config_listen_address_list,
1288             sizeof (char *) * (config_listen_address_list_len + 1));
1289         if (temp == NULL)
1290         {
1291           fprintf (stderr, "read_options: realloc failed.\n");
1292           return (2);
1293         }
1294         config_listen_address_list = temp;
1295
1296         temp[config_listen_address_list_len] = strdup (optarg);
1297         if (temp[config_listen_address_list_len] == NULL)
1298         {
1299           fprintf (stderr, "read_options: strdup failed.\n");
1300           return (2);
1301         }
1302         config_listen_address_list_len++;
1303       }
1304       break;
1305
1306       case 'f':
1307       {
1308         int temp;
1309
1310         temp = atoi (optarg);
1311         if (temp > 0)
1312           config_flush_interval = temp;
1313         else
1314         {
1315           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1316           status = 3;
1317         }
1318       }
1319       break;
1320
1321       case 'w':
1322       {
1323         int temp;
1324
1325         temp = atoi (optarg);
1326         if (temp > 0)
1327           config_write_interval = temp;
1328         else
1329         {
1330           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1331           status = 2;
1332         }
1333       }
1334       break;
1335
1336       case 'b':
1337       {
1338         size_t len;
1339
1340         if (config_base_dir != NULL)
1341           free (config_base_dir);
1342         config_base_dir = strdup (optarg);
1343         if (config_base_dir == NULL)
1344         {
1345           fprintf (stderr, "read_options: strdup failed.\n");
1346           return (3);
1347         }
1348
1349         len = strlen (config_base_dir);
1350         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1351         {
1352           config_base_dir[len - 1] = 0;
1353           len--;
1354         }
1355
1356         if (len < 1)
1357         {
1358           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1359           return (4);
1360         }
1361       }
1362       break;
1363
1364       case 'p':
1365       {
1366         if (config_pid_file != NULL)
1367           free (config_pid_file);
1368         config_pid_file = strdup (optarg);
1369         if (config_pid_file == NULL)
1370         {
1371           fprintf (stderr, "read_options: strdup failed.\n");
1372           return (3);
1373         }
1374       }
1375       break;
1376
1377       case 'h':
1378       case '?':
1379         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1380             "\n"
1381             "Usage: rrdcached [options]\n"
1382             "\n"
1383             "Valid options are:\n"
1384             "  -l <address>  Socket address to listen to.\n"
1385             "  -w <seconds>  Interval in which to write data.\n"
1386             "  -f <seconds>  Interval in which to flush dead data.\n"
1387             "  -p <file>     Location of the PID-file.\n"
1388             "  -b <dir>      Base directory to change to.\n"
1389             "\n"
1390             "For more information and a detailed description of all options "
1391             "please refer\n"
1392             "to the rrdcached(1) manual page.\n",
1393             VERSION);
1394         status = -1;
1395         break;
1396     } /* switch (option) */
1397   } /* while (getopt) */
1398
1399   return (status);
1400 } /* }}} int read_options */
1401
1402 int main (int argc, char **argv)
1403 {
1404   int status;
1405
1406   status = read_options (argc, argv);
1407   if (status != 0)
1408   {
1409     if (status < 0)
1410       status = 0;
1411     return (status);
1412   }
1413
1414   status = daemonize ();
1415   if (status == 1)
1416   {
1417     struct sigaction sigchld;
1418
1419     memset (&sigchld, 0, sizeof (sigchld));
1420     sigchld.sa_handler = SIG_IGN;
1421     sigaction (SIGCHLD, &sigchld, NULL);
1422
1423     return (0);
1424   }
1425   else if (status != 0)
1426   {
1427     fprintf (stderr, "daemonize failed, exiting.\n");
1428     return (1);
1429   }
1430
1431   listen_thread_main (NULL);
1432
1433   cleanup ();
1434
1435   return (0);
1436 } /* int main */
1437
1438 /*
1439  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1440  */