src/rrd_daemon.c: Updated the enqueueing function to provide insertion at the head.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 /*
95  * Types
96  */
97 struct listen_socket_s
98 {
99   int fd;
100   char path[PATH_MAX + 1];
101 };
102 typedef struct listen_socket_s listen_socket_t;
103
104 struct cache_item_s;
105 typedef struct cache_item_s cache_item_t;
106 struct cache_item_s
107 {
108   char *file;
109   char **values;
110   int values_num;
111   time_t last_flush_time;
112 #define CI_FLAGS_IN_TREE  0x01
113 #define CI_FLAGS_IN_QUEUE 0x02
114   int flags;
115
116   cache_item_t *next;
117 };
118
119 enum queue_side_e
120 {
121   HEAD,
122   TAIL
123 };
124 typedef enum queue_side_e queue_side_t;
125
126 /*
127  * Variables
128  */
129 static listen_socket_t *listen_fds = NULL;
130 static size_t listen_fds_num = 0;
131
132 static int do_shutdown = 0;
133
134 static pthread_t queue_thread;
135
136 static pthread_t *connetion_threads = NULL;
137 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
138 static int connetion_threads_num = 0;
139
140 /* Cache stuff */
141 static GTree          *cache_tree = NULL;
142 static cache_item_t   *cache_queue_head = NULL;
143 static cache_item_t   *cache_queue_tail = NULL;
144 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
145 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
146
147 static int config_write_interval = 300;
148 static int config_flush_interval = 3600;
149
150 static char **config_listen_address_list = NULL;
151 static int config_listen_address_list_len = 0;
152
153 /* 
154  * Functions
155  */
156 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
157 {
158   do_shutdown++;
159 } /* }}} void sig_int_handler */
160
161 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
162 {
163   do_shutdown++;
164 } /* }}} void sig_term_handler */
165
166 /*
167  * enqueue_cache_item:
168  * `cache_lock' must be acquired before calling this function!
169  */
170 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
171     queue_side_t side)
172 {
173   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
174       ci->file);
175
176   if (ci == NULL)
177     return (-1);
178
179   if (ci->values_num == 0)
180     return (0);
181
182   if (side == HEAD)
183   {
184     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
185     {
186       assert (ci->next == NULL);
187       ci->next = cache_queue_head;
188       cache_queue_head = ci;
189
190       if (cache_queue_tail == NULL)
191         cache_queue_tail = cache_queue_head;
192     }
193     else if (cache_queue_head == ci)
194     {
195       /* do nothing */
196     }
197     else /* enqueued, but not first entry */
198     {
199       cache_item_t *prev;
200
201       /* find previous entry */
202       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
203         if (prev->next == ci)
204           break;
205       assert (prev != NULL);
206
207       /* move to the front */
208       prev->next = ci->next;
209       ci->next = cache_queue_head;
210       cache_queue_head = ci;
211
212       /* check if we need to adapt the tail */
213       if (cache_queue_tail == ci)
214         cache_queue_tail = prev;
215     }
216   }
217   else /* (side == TAIL) */
218   {
219     /* We don't move values back in the list.. */
220     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
221       return (0);
222
223     assert (ci->next == NULL);
224
225     if (cache_queue_tail == NULL)
226       cache_queue_head = ci;
227     else
228       cache_queue_tail->next = ci;
229     cache_queue_tail = ci;
230   }
231
232   ci->flags |= CI_FLAGS_IN_QUEUE;
233
234   return (0);
235 } /* }}} int enqueue_cache_item */
236
237 /*
238  * tree_callback_flush:
239  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
240  * while this is in progress.
241  */
242 static gboolean tree_callback_flush (gpointer key /* {{{ */
243     __attribute__((unused)), gpointer value, gpointer data)
244 {
245   cache_item_t *ci;
246   time_t now;
247
248   key = NULL; /* make compiler happy */
249
250   ci = (cache_item_t *) value;
251   now = *((time_t *) data);
252
253   if (((now - ci->last_flush_time) >= config_write_interval)
254       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
255       && (ci->values_num > 0))
256     enqueue_cache_item (ci, TAIL);
257
258   return (TRUE);
259 } /* }}} gboolean tree_callback_flush */
260
261 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
262 {
263   struct timeval now;
264   struct timespec next_flush;
265
266   gettimeofday (&now, NULL);
267   next_flush.tv_sec = now.tv_sec + config_flush_interval;
268   next_flush.tv_nsec = 1000 * now.tv_usec;
269
270   pthread_mutex_lock (&cache_lock);
271   while ((do_shutdown == 0) || (cache_queue_head != NULL))
272   {
273     cache_item_t *ci;
274     char *file;
275     char **values;
276     int values_num;
277     int status;
278     int i;
279
280     /* First, check if it's time to do the cache flush. */
281     gettimeofday (&now, NULL);
282     if ((now.tv_sec > next_flush.tv_sec)
283         || ((now.tv_sec == next_flush.tv_sec)
284           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
285     {
286       time_t time_now;
287
288       /* Pass the current time as user data so that we don't need to call
289        * `time' for each node. */
290       time_now = time (NULL);
291
292       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
293
294       /* Determine the time of the next cache flush. */
295       while (next_flush.tv_sec < now.tv_sec)
296         next_flush.tv_sec += config_flush_interval;
297     }
298
299     /* Now, check if there's something to store away. If not, wait until
300      * something comes in or it's time to do the cache flush. */
301     if (cache_queue_head == NULL)
302     {
303       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
304       if ((status != 0) && (status != ETIMEDOUT))
305       {
306         RRDD_LOG (LOG_ERR, "queue_thread_main: "
307             "pthread_cond_timedwait returned %i.", status);
308       }
309     }
310
311     /* Check if a value has arrived. This may be NULL if we timed out or there
312      * was an interrupt such as a signal. */
313     if (cache_queue_head == NULL)
314       continue;
315
316     ci = cache_queue_head;
317
318     /* copy the relevant parts */
319     file = strdup (ci->file);
320     if (file == NULL)
321     {
322       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
323       continue;
324     }
325
326     values = ci->values;
327     values_num = ci->values_num;
328
329     ci->values = NULL;
330     ci->values_num = 0;
331
332     ci->last_flush_time = time (NULL);
333     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
334
335     cache_queue_head = ci->next;
336     if (cache_queue_head == NULL)
337       cache_queue_tail = NULL;
338     ci->next = NULL;
339
340     pthread_mutex_unlock (&cache_lock);
341
342     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
343         file, values_num, (void *) values);
344
345     status = rrd_update_r (file, NULL, values_num, (void *) values);
346     if (status != 0)
347     {
348       RRDD_LOG (LOG_ERR, "queue_thread_main: "
349           "rrd_update_r failed with status %i.",
350           status);
351     }
352
353     free (file);
354     for (i = 0; i < values_num; i++)
355       free (values[i]);
356
357     pthread_mutex_lock (&cache_lock);
358   } /* while (do_shutdown == 0) */
359   pthread_mutex_unlock (&cache_lock);
360
361   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
362
363   return (NULL);
364 } /* }}} void *queue_thread_main */
365
366 static int buffer_get_field (char **buffer_ret, /* {{{ */
367     size_t *buffer_size_ret, char **field_ret)
368 {
369   char *buffer;
370   size_t buffer_pos;
371   size_t buffer_size;
372   char *field;
373   size_t field_size;
374   int status;
375
376   buffer = *buffer_ret;
377   buffer_pos = 0;
378   buffer_size = *buffer_size_ret;
379   field = *buffer_ret;
380   field_size = 0;
381
382   /* This is ensured by `handle_request'. */
383   assert (buffer[buffer_size - 1] == ' ');
384
385   status = -1;
386   while (buffer_pos < buffer_size)
387   {
388     /* Check for end-of-field or end-of-buffer */
389     if (buffer[buffer_pos] == ' ')
390     {
391       field[field_size] = 0;
392       field_size++;
393       buffer_pos++;
394       status = 0;
395       break;
396     }
397     /* Handle escaped characters. */
398     else if (buffer[buffer_pos] == '\\')
399     {
400       if (buffer_pos >= (buffer_size - 1))
401         break;
402       buffer_pos++;
403       field[field_size] = buffer[buffer_pos];
404       field_size++;
405       buffer_pos++;
406     }
407     /* Normal operation */ 
408     else
409     {
410       field[field_size] = buffer[buffer_pos];
411       field_size++;
412       buffer_pos++;
413     }
414   } /* while (buffer_pos < buffer_size) */
415
416   if (status != 0)
417     return (status);
418
419   *buffer_ret = buffer + buffer_pos;
420   *buffer_size_ret = buffer_size - buffer_pos;
421   *field_ret = field;
422
423   return (0);
424 } /* }}} int buffer_get_field */
425
426 static int handle_request_update (int fd, /* {{{ */
427     char *buffer, size_t buffer_size)
428 {
429   char *file;
430   int values_num = 0;
431   int status;
432
433   time_t now;
434
435   cache_item_t *ci;
436   char answer[4096];
437
438   now = time (NULL);
439
440   status = buffer_get_field (&buffer, &buffer_size, &file);
441   if (status != 0)
442   {
443     RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
444     return (-1);
445   }
446
447   pthread_mutex_lock (&cache_lock);
448
449   ci = g_tree_lookup (cache_tree, file);
450   if (ci == NULL) /* {{{ */
451   {
452     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
453     if (ci == NULL)
454     {
455       pthread_mutex_unlock (&cache_lock);
456       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
457       return (-1);
458     }
459     memset (ci, 0, sizeof (cache_item_t));
460
461     ci->file = strdup (file);
462     if (ci->file == NULL)
463     {
464       pthread_mutex_unlock (&cache_lock);
465       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
466       free (ci);
467       return (-1);
468     }
469
470     ci->values = NULL;
471     ci->values_num = 0;
472     ci->last_flush_time = now;
473     ci->flags = CI_FLAGS_IN_TREE;
474
475     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
476
477     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
478         ci->file);
479   } /* }}} */
480   assert (ci != NULL);
481
482   while (buffer_size > 0)
483   {
484     char **temp;
485     char *value;
486
487     status = buffer_get_field (&buffer, &buffer_size, &value);
488     if (status != 0)
489     {
490       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
491       break;
492     }
493
494     temp = (char **) realloc (ci->values,
495         sizeof (char *) * (ci->values_num + 1));
496     if (temp == NULL)
497     {
498       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
499       continue;
500     }
501     ci->values = temp;
502
503     ci->values[ci->values_num] = strdup (value);
504     if (ci->values[ci->values_num] == NULL)
505     {
506       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
507       continue;
508     }
509     ci->values_num++;
510
511     values_num++;
512   }
513
514   if (((now - ci->last_flush_time) >= config_write_interval)
515       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
516       && (ci->values_num > 0))
517   {
518     enqueue_cache_item (ci, TAIL);
519     pthread_cond_signal (&cache_cond);
520   }
521
522   pthread_mutex_unlock (&cache_lock);
523
524   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
525   answer[sizeof (answer) - 1] = 0;
526
527   status = write (fd, answer, strlen (answer));
528   if (status < 0)
529   {
530     status = errno;
531     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
532     return (status);
533   }
534
535   return (0);
536 } /* }}} int handle_request_update */
537
538 static int handle_request (int fd) /* {{{ */
539 {
540   char buffer[4096];
541   size_t buffer_size;
542   char *buffer_ptr;
543   char *command;
544   int status;
545
546   status = read (fd, buffer, sizeof (buffer));
547   if (status < 1)
548   {
549     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
550     return (-1);
551   }
552   buffer_size = status;
553   assert (((size_t) buffer_size) <= sizeof (buffer));
554
555   if (buffer[buffer_size - 1] != '\n')
556   {
557     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
558     return (-1);
559   }
560   /* Place the normal field separator at the end to simplify
561    * `buffer_get_field's work. */
562   buffer[buffer_size - 1] = ' ';
563
564   buffer_ptr = buffer;
565   command = NULL;
566   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
567   if (status != 0)
568   {
569     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
570     return (-1);
571   }
572
573   if (strcmp (command, "update") == 0)
574   {
575     return (handle_request_update (fd, buffer_ptr, buffer_size));
576   }
577   else
578   {
579     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
580     return (-1);
581   }
582 } /* }}} int handle_request */
583
584 static void *connection_thread_main (void *args /* {{{ */
585     __attribute__((unused)))
586 {
587   pthread_t self;
588   int i;
589   int fd;
590   
591   fd = *((int *) args);
592
593   pthread_mutex_lock (&connetion_threads_lock);
594   {
595     pthread_t *temp;
596
597     temp = (pthread_t *) realloc (connetion_threads,
598         sizeof (pthread_t) * (connetion_threads_num + 1));
599     if (temp == NULL)
600     {
601       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
602     }
603     else
604     {
605       connetion_threads = temp;
606       connetion_threads[connetion_threads_num] = pthread_self ();
607       connetion_threads_num++;
608     }
609   }
610   pthread_mutex_unlock (&connetion_threads_lock);
611
612   while (do_shutdown == 0)
613   {
614     struct pollfd pollfd;
615     int status;
616
617     pollfd.fd = fd;
618     pollfd.events = POLLIN | POLLPRI;
619     pollfd.revents = 0;
620
621     status = poll (&pollfd, 1, /* timeout = */ 500);
622     if (status == 0) /* timeout */
623       continue;
624     else if (status < 0) /* error */
625     {
626       status = errno;
627       if (status == EINTR)
628         continue;
629       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
630       continue;
631     }
632
633     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
634     {
635       close (fd);
636       break;
637     }
638     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
639     {
640       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
641           "poll(2) returned something unexpected: %#04hx",
642           pollfd.revents);
643       close (fd);
644       break;
645     }
646
647     status = handle_request (fd);
648     if (status != 0)
649     {
650       close (fd);
651       break;
652     }
653   }
654
655   self = pthread_self ();
656   /* Remove this thread from the connection threads list */
657   pthread_mutex_lock (&connetion_threads_lock);
658   /* Find out own index in the array */
659   for (i = 0; i < connetion_threads_num; i++)
660     if (pthread_equal (connetion_threads[i], self) != 0)
661       break;
662   assert (i < connetion_threads_num);
663
664   /* Move the trailing threads forward. */
665   if (i < (connetion_threads_num - 1))
666   {
667     memmove (connetion_threads + i,
668         connetion_threads + i + 1,
669         sizeof (pthread_t) * (connetion_threads_num - i - 1));
670   }
671
672   connetion_threads_num--;
673   pthread_mutex_unlock (&connetion_threads_lock);
674
675   free (args);
676   return (NULL);
677 } /* }}} void *connection_thread_main */
678
679 static int open_listen_socket_unix (const char *path) /* {{{ */
680 {
681   int fd;
682   struct sockaddr_un sa;
683   listen_socket_t *temp;
684   int status;
685
686   temp = (listen_socket_t *) realloc (listen_fds,
687       sizeof (listen_fds[0]) * (listen_fds_num + 1));
688   if (temp == NULL)
689   {
690     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
691     return (-1);
692   }
693   listen_fds = temp;
694   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
695
696   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
697   if (fd < 0)
698   {
699     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
700     return (-1);
701   }
702
703   memset (&sa, 0, sizeof (sa));
704   sa.sun_family = AF_UNIX;
705   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
706
707   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
708   if (status != 0)
709   {
710     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
711     close (fd);
712     unlink (path);
713     return (-1);
714   }
715
716   status = listen (fd, /* backlog = */ 10);
717   if (status != 0)
718   {
719     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
720     close (fd);
721     unlink (path);
722     return (-1);
723   }
724   
725   listen_fds[listen_fds_num].fd = fd;
726   snprintf (listen_fds[listen_fds_num].path,
727       sizeof (listen_fds[listen_fds_num].path) - 1,
728       "unix:%s", path);
729   listen_fds_num++;
730
731   return (0);
732 } /* }}} int open_listen_socket_unix */
733
734 static int open_listen_socket (const char *addr) /* {{{ */
735 {
736   struct addrinfo ai_hints;
737   struct addrinfo *ai_res;
738   struct addrinfo *ai_ptr;
739   int status;
740
741   assert (addr != NULL);
742
743   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
744     return (open_listen_socket_unix (addr + strlen ("unix:")));
745   else if (addr[0] == '/')
746     return (open_listen_socket_unix (addr));
747
748   memset (&ai_hints, 0, sizeof (ai_hints));
749   ai_hints.ai_flags = 0;
750 #ifdef AI_ADDRCONFIG
751   ai_hints.ai_flags |= AI_ADDRCONFIG;
752 #endif
753   ai_hints.ai_family = AF_UNSPEC;
754   ai_hints.ai_socktype = SOCK_STREAM;
755
756   ai_res = NULL;
757   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
758   if (status != 0)
759   {
760     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
761         "%s", addr, gai_strerror (status));
762     return (-1);
763   }
764
765   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
766   {
767     int fd;
768     listen_socket_t *temp;
769
770     temp = (listen_socket_t *) realloc (listen_fds,
771         sizeof (listen_fds[0]) * (listen_fds_num + 1));
772     if (temp == NULL)
773     {
774       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
775       continue;
776     }
777     listen_fds = temp;
778     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
779
780     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
781     if (fd < 0)
782     {
783       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
784       continue;
785     }
786
787     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
788     if (status != 0)
789     {
790       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
791       close (fd);
792       continue;
793     }
794
795     status = listen (fd, /* backlog = */ 10);
796     if (status != 0)
797     {
798       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
799       close (fd);
800       return (-1);
801     }
802
803     listen_fds[listen_fds_num].fd = fd;
804     strncpy (listen_fds[listen_fds_num].path, addr,
805         sizeof (listen_fds[listen_fds_num].path) - 1);
806     listen_fds_num++;
807   } /* for (ai_ptr) */
808
809   return (0);
810 } /* }}} int open_listen_socket */
811
812 static int close_listen_sockets (void) /* {{{ */
813 {
814   size_t i;
815
816   for (i = 0; i < listen_fds_num; i++)
817   {
818     close (listen_fds[i].fd);
819     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
820       unlink (listen_fds[i].path + strlen ("unix:"));
821   }
822
823   free (listen_fds);
824   listen_fds = NULL;
825   listen_fds_num = 0;
826
827   return (0);
828 } /* }}} int close_listen_sockets */
829
830 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
831 {
832   struct pollfd *pollfds;
833   int pollfds_num;
834   int status;
835   int i;
836
837   for (i = 0; i < config_listen_address_list_len; i++)
838   {
839     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
840         "= %s", i, config_listen_address_list[i]);
841     open_listen_socket (config_listen_address_list[i]);
842   }
843
844   if (config_listen_address_list_len < 1)
845     open_listen_socket (RRDD_SOCK_PATH);
846
847   if (listen_fds_num < 1)
848   {
849     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
850         "could be opened. Sorry.");
851     return (NULL);
852   }
853
854   pollfds_num = listen_fds_num;
855   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
856   if (pollfds == NULL)
857   {
858     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
859     return (NULL);
860   }
861   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
862
863   while (do_shutdown == 0)
864   {
865     assert (pollfds_num == ((int) listen_fds_num));
866     for (i = 0; i < pollfds_num; i++)
867     {
868       pollfds[i].fd = listen_fds[i].fd;
869       pollfds[i].events = POLLIN | POLLPRI;
870       pollfds[i].revents = 0;
871     }
872
873     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
874     if (status < 1)
875     {
876       status = errno;
877       if (status != EINTR)
878       {
879         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
880       }
881       continue;
882     }
883
884     for (i = 0; i < pollfds_num; i++)
885     {
886       int *client_sd;
887       struct sockaddr_storage client_sa;
888       socklen_t client_sa_size;
889       pthread_t tid;
890
891       if (pollfds[i].revents == 0)
892         continue;
893
894       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
895       {
896         RRDD_LOG (LOG_ERR, "listen_thread_main: "
897             "poll(2) returned something unexpected for listen FD #%i.",
898             pollfds[i].fd);
899         continue;
900       }
901
902       client_sd = (int *) malloc (sizeof (int));
903       if (client_sd == NULL)
904       {
905         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
906         continue;
907       }
908
909       client_sa_size = sizeof (client_sa);
910       *client_sd = accept (pollfds[i].fd,
911           (struct sockaddr *) &client_sa, &client_sa_size);
912       if (*client_sd < 0)
913       {
914         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
915         continue;
916       }
917
918       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
919           /* args = */ (void *) client_sd);
920       if (status != 0)
921       {
922         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
923         close (*client_sd);
924         free (client_sd);
925         continue;
926       }
927     } /* for (pollfds_num) */
928   } /* while (do_shutdown == 0) */
929
930   close_listen_sockets ();
931
932   pthread_mutex_lock (&connetion_threads_lock);
933   while (connetion_threads_num > 0)
934   {
935     pthread_t wait_for;
936
937     wait_for = connetion_threads[0];
938
939     pthread_mutex_unlock (&connetion_threads_lock);
940     pthread_join (wait_for, /* retval = */ NULL);
941     pthread_mutex_lock (&connetion_threads_lock);
942   }
943   pthread_mutex_unlock (&connetion_threads_lock);
944
945   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
946
947   return (NULL);
948 } /* }}} void *listen_thread_main */
949
950 static int daemonize (void) /* {{{ */
951 {
952   pid_t child;
953   int status;
954
955   child = fork ();
956   if (child < 0)
957   {
958     fprintf (stderr, "daemonize: fork(2) failed.\n");
959     return (-1);
960   }
961   else if (child > 0)
962   {
963     return (1);
964   }
965
966   /* Change into the /tmp directory. */
967   chdir ("/tmp");
968
969   /* Become session leader */
970   setsid ();
971
972   /* Open the first three file descriptors to /dev/null */
973   close (2);
974   close (1);
975   close (0);
976
977   open ("/dev/null", O_RDWR);
978   dup (0);
979   dup (0);
980
981   {
982     struct sigaction sa;
983
984     memset (&sa, 0, sizeof (sa));
985     sa.sa_handler = sig_int_handler;
986     sigaction (SIGINT, &sa, NULL);
987
988     memset (&sa, 0, sizeof (sa));
989     sa.sa_handler = sig_term_handler;
990     sigaction (SIGINT, &sa, NULL);
991
992     memset (&sa, 0, sizeof (sa));
993     sa.sa_handler = SIG_IGN;
994     sigaction (SIGPIPE, &sa, NULL);
995   }
996
997   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
998
999   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1000   if (cache_tree == NULL)
1001   {
1002     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1003     return (-1);
1004   }
1005
1006   memset (&queue_thread, 0, sizeof (queue_thread));
1007   status = pthread_create (&queue_thread, /* attr = */ NULL,
1008       queue_thread_main, /* args = */ NULL);
1009   if (status != 0)
1010   {
1011     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
1012     return (-1);
1013   }
1014
1015   return (0);
1016 } /* }}} int daemonize */
1017
1018 static int cleanup (void) /* {{{ */
1019 {
1020   RRDD_LOG (LOG_DEBUG, "cleanup ()");
1021
1022   do_shutdown++;
1023
1024   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
1025   pthread_cond_signal (&cache_cond);
1026   pthread_join (queue_thread, /* return = */ NULL);
1027   RRDD_LOG (LOG_DEBUG, "cleanup: done");
1028
1029   closelog ();
1030
1031   return (0);
1032 } /* }}} int cleanup */
1033
1034 static int read_options (int argc, char **argv) /* {{{ */
1035 {
1036   int option;
1037   int status = 0;
1038
1039   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
1040   {
1041     switch (option)
1042     {
1043       case 'l':
1044       {
1045         char **temp;
1046
1047         temp = (char **) realloc (config_listen_address_list,
1048             sizeof (char *) * (config_listen_address_list_len + 1));
1049         if (temp == NULL)
1050         {
1051           fprintf (stderr, "read_options: realloc failed.\n");
1052           return (2);
1053         }
1054         config_listen_address_list = temp;
1055
1056         temp[config_listen_address_list_len] = strdup (optarg);
1057         if (temp[config_listen_address_list_len] == NULL)
1058         {
1059           fprintf (stderr, "read_options: strdup failed.\n");
1060           return (2);
1061         }
1062         config_listen_address_list_len++;
1063       }
1064       break;
1065
1066       case 'f':
1067       {
1068         int temp;
1069
1070         temp = atoi (optarg);
1071         if (temp > 0)
1072           config_flush_interval = temp;
1073         else
1074         {
1075           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1076           status = 3;
1077         }
1078       }
1079       break;
1080
1081       case 'w':
1082       {
1083         int temp;
1084
1085         temp = atoi (optarg);
1086         if (temp > 0)
1087           config_write_interval = temp;
1088         else
1089         {
1090           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1091           status = 2;
1092         }
1093       }
1094       break;
1095
1096       case 'h':
1097       case '?':
1098         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1099             "\n"
1100             "Usage: rrdcached [options]\n"
1101             "\n"
1102             "Valid options are:\n"
1103             "  -l <address>  Socket address to listen to.\n"
1104             "  -w <seconds>  Interval in which to write data.\n"
1105             "  -f <seconds>  Interval in which to flush dead data.\n"
1106             "\n"
1107             "For more information and a detailed description of all options "
1108             "please refer\n"
1109             "to the rrdcached(1) manual page.\n",
1110             VERSION);
1111         status = -1;
1112         break;
1113     } /* switch (option) */
1114   } /* while (getopt) */
1115
1116   return (status);
1117 } /* }}} int read_options */
1118
1119 int main (int argc, char **argv)
1120 {
1121   int status;
1122
1123   status = read_options (argc, argv);
1124   if (status != 0)
1125   {
1126     if (status < 0)
1127       status = 0;
1128     return (status);
1129   }
1130
1131   status = daemonize ();
1132   if (status == 1)
1133   {
1134     struct sigaction sigchld;
1135
1136     memset (&sigchld, 0, sizeof (sigchld));
1137     sigchld.sa_handler = SIG_IGN;
1138     sigaction (SIGCHLD, &sigchld, NULL);
1139
1140     return (0);
1141   }
1142   else if (status != 0)
1143   {
1144     fprintf (stderr, "daemonize failed, exiting.\n");
1145     return (1);
1146   }
1147
1148   listen_thread_main (NULL);
1149
1150   cleanup ();
1151
1152   return (0);
1153 } /* int main */
1154
1155 /*
1156  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1157  */