src/rrdd.c: Implement option parsing.
[rrdd.git] / src / rrdd.c
1 /**
2  * collectd - src/rrdd.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #define RRDD_DEBUG 1
23
24 #include "rrdd.h"
25
26 #if RRDD_DEBUG
27 # define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0)
28 #else
29 # define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
30 #endif
31
32 /*
33  * Types
34  */
35 struct listen_socket_s
36 {
37   int fd;
38   char path[PATH_MAX + 1];
39 };
40 typedef struct listen_socket_s listen_socket_t;
41
42 struct cache_item_s;
43 typedef struct cache_item_s cache_item_t;
44 struct cache_item_s
45 {
46   char *file;
47   char **values;
48   int values_num;
49   time_t last_flush_time;
50 #define CI_FLAGS_IN_TREE  0x01
51 #define CI_FLAGS_IN_QUEUE 0x02
52   int flags;
53
54   cache_item_t *next;
55 };
56
57 /*
58  * Variables
59  */
60 static listen_socket_t *listen_fds = NULL;
61 static size_t listen_fds_num = 0;
62
63 static int do_shutdown = 0;
64
65 static pthread_t queue_thread;
66
67 static pthread_t *connetion_threads = NULL;
68 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
69 static int connetion_threads_num = 0;
70
71 /* Cache stuff */
72 static avl_tree_t     *cache_tree = NULL;
73 static cache_item_t   *cache_queue_head = NULL;
74 static cache_item_t   *cache_queue_tail = NULL;
75 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
76 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
77
78 static int config_write_interval = 300;
79 static int config_flush_interval = 3600;
80
81 /* 
82  * Functions
83  */
84 static void sig_int_handler (int signal) /* {{{ */
85 {
86   do_shutdown++;
87 } /* }}} void sig_int_handler */
88
89 static void sig_term_handler (int signal) /* {{{ */
90 {
91   do_shutdown++;
92 } /* }}} void sig_term_handler */
93
94 static int cache_tree_compare (const void *v0, const void *v1) /* {{{ */
95 {
96   cache_item_t *c0 = (cache_item_t *) v0;
97   cache_item_t *c1 = (cache_item_t *) v1;
98
99   assert (c0->file != NULL);
100   assert (c1->file != NULL);
101
102   return (strcmp (c0->file, c1->file));
103 } /* }}} int cache_tree_compare */
104
105 static void cache_tree_free (void *v) /* {{{ */
106 {
107   cache_item_t *c = (cache_item_t *) v;
108
109   assert (c->values_num == 0);
110   assert ((c->flags & CI_FLAGS_IN_TREE) != 0);
111   assert ((c->flags & CI_FLAGS_IN_QUEUE) == 0);
112
113   free (c->file);
114   c->file = NULL;
115   free (c);
116 } /* }}} void cache_tree_free */
117
118 static void *queue_thread_main (void *args) /* {{{ */
119 {
120   pthread_mutex_lock (&cache_lock);
121   while ((do_shutdown == 0) || (cache_queue_head != NULL))
122   {
123     cache_item_t *ci;
124
125     char *file;
126     char **values;
127     int values_num;
128     int status;
129     int i;
130
131     if (cache_queue_head == NULL)
132       pthread_cond_wait (&cache_cond, &cache_lock);
133
134     if (cache_queue_head == NULL)
135       continue;
136
137     ci = cache_queue_head;
138
139     /* copy the relevant parts */
140     file = strdup (ci->file);
141     if (file == NULL)
142     {
143       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
144       continue;
145     }
146
147     values = ci->values;
148     values_num = ci->values_num;
149
150     ci->values = NULL;
151     ci->values_num = 0;
152
153     ci->last_flush_time = time (NULL);
154     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
155
156     cache_queue_head = ci->next;
157     if (cache_queue_head == NULL)
158       cache_queue_tail = NULL;
159     ci->next = NULL;
160
161     pthread_mutex_unlock (&cache_lock);
162
163     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
164         file, values_num, (void *) values);
165
166     status = rrd_update_r (file, NULL, values_num, values);
167     if (status != 0)
168     {
169       RRDD_LOG (LOG_ERR, "queue_thread_main: "
170           "rrd_update_r failed with status %i.",
171           status);
172     }
173
174     free (file);
175     for (i = 0; i < values_num; i++)
176       free (values[i]);
177
178     pthread_mutex_lock (&cache_lock);
179   } /* while (do_shutdown == 0) */
180   pthread_mutex_unlock (&cache_lock);
181
182   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
183
184   return (NULL);
185 } /* }}} void *queue_thread_main */
186
187 static int handle_request_update (int fd, /* {{{ */
188     char *buffer, int buffer_size)
189 {
190   char *file;
191   char *value;
192   char *buffer_ptr;
193   int values_num = 0;
194   int status;
195
196   time_t now;
197
198   avl_node_t *node;
199   cache_item_t ci_temp;
200   cache_item_t *ci;
201
202   char answer[4096];
203
204   now = time (NULL);
205
206   RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
207       fd, (void *) buffer, buffer_size);
208
209   buffer_ptr = buffer;
210
211   file = buffer_ptr;
212   buffer_ptr += strlen (file) + 1;
213
214   ci_temp.file = file;
215
216   pthread_mutex_lock (&cache_lock);
217
218   node = avl_search (cache_tree, (void *) &ci_temp);
219   if (node == NULL)
220   {
221     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
222     if (ci == NULL)
223     {
224       pthread_mutex_unlock (&cache_lock);
225       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
226       return (-1);
227     }
228     memset (ci, 0, sizeof (cache_item_t));
229
230     ci->file = strdup (file);
231     if (ci->file == NULL)
232     {
233       pthread_mutex_unlock (&cache_lock);
234       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
235       free (ci);
236       return (-1);
237     }
238
239     ci->values = NULL;
240     ci->values_num = 0;
241     ci->last_flush_time = now;
242     ci->flags = CI_FLAGS_IN_TREE;
243
244     if (avl_insert (cache_tree, (void *) ci) == NULL)
245     {
246       pthread_mutex_unlock (&cache_lock);
247       RRDD_LOG (LOG_ERR, "handle_request_update: avl_insert failed.");
248       free (ci->file);
249       free (ci);
250       return (-1);
251     }
252
253     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new AVL node %s.",
254         ci->file);
255   }
256   else /* if (ci != NULL) */
257   {
258     ci = (cache_item_t *) node->item;
259   }
260   assert (ci != NULL);
261
262   while (*buffer_ptr != 0)
263   {
264     char **temp;
265
266     value = buffer_ptr;
267     buffer_ptr += strlen (value) + 1;
268
269     temp = (char **) realloc (ci->values,
270         sizeof (char *) * (ci->values_num + 1));
271     if (temp == NULL)
272     {
273       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
274       continue;
275     }
276     ci->values = temp;
277
278     ci->values[ci->values_num] = strdup (value);
279     if (ci->values[ci->values_num] == NULL)
280     {
281       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
282       continue;
283     }
284     ci->values_num++;
285
286     values_num++;
287   }
288
289   if (((now - ci->last_flush_time) >= config_write_interval)
290       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
291   {
292     RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
293         ci->file);
294
295     assert (ci->next == NULL);
296
297     if (cache_queue_tail == NULL)
298       cache_queue_head = ci;
299     else
300       cache_queue_tail->next = ci;
301     cache_queue_tail = ci;
302
303     pthread_cond_signal (&cache_cond);
304   }
305
306   pthread_mutex_unlock (&cache_lock);
307
308   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
309   answer[sizeof (answer) - 1] = 0;
310
311   status = write (fd, answer, sizeof (answer));
312   if (status < 0)
313   {
314     status = errno;
315     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
316     return (status);
317   }
318
319   return (0);
320 } /* }}} int handle_request_update */
321
322 static int handle_request (int fd) /* {{{ */
323 {
324   char buffer[4096];
325   int buffer_size;
326
327   RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
328
329   buffer_size = read (fd, buffer, sizeof (buffer));
330   if (buffer_size < 1)
331   {
332     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
333     return (-1);
334   }
335   assert (((size_t) buffer_size) <= sizeof (buffer));
336
337   if ((buffer[buffer_size - 2] != 0)
338       || (buffer[buffer_size - 1] != 0))
339   {
340     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
341     return (-1);
342   }
343
344   /* fields in the buffer a separated by null bytes. */
345   if (strcmp (buffer, "update") == 0)
346   {
347     int offset = strlen ("update") + 1;
348     return (handle_request_update (fd, buffer + offset,
349           buffer_size - offset));
350   }
351   else
352   {
353     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
354     return (-1);
355   }
356 } /* }}} int handle_request */
357
358 static void *connection_thread_main (void *args) /* {{{ */
359 {
360   pthread_t self;
361   int i;
362   int fd;
363   
364   fd = *((int *) args);
365
366   RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
367       "connetion_threads[]..");
368   pthread_mutex_lock (&connetion_threads_lock);
369   {
370     pthread_t *temp;
371
372     temp = (pthread_t *) realloc (connetion_threads,
373         sizeof (pthread_t) * (connetion_threads_num + 1));
374     if (temp == NULL)
375     {
376       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
377     }
378     else
379     {
380       connetion_threads = temp;
381       connetion_threads[connetion_threads_num] = pthread_self ();
382       connetion_threads_num++;
383     }
384   }
385   pthread_mutex_unlock (&connetion_threads_lock);
386   RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
387
388   while (do_shutdown == 0)
389   {
390     struct pollfd pollfd;
391     int status;
392
393     pollfd.fd = fd;
394     pollfd.events = POLLIN | POLLPRI;
395     pollfd.revents = 0;
396
397     status = poll (&pollfd, 1, /* timeout = */ 500);
398     if (status == 0) /* timeout */
399       continue;
400     else if (status < 0) /* error */
401     {
402       status = errno;
403       if (status == EINTR)
404         continue;
405       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
406       continue;
407     }
408
409     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
410     {
411       RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
412           "poll(2) returned POLLHUP.");
413       close (fd);
414       break;
415     }
416     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
417     {
418       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
419           "poll(2) returned something unexpected: %#04hx",
420           pollfd.revents);
421       close (fd);
422       break;
423     }
424
425     status = handle_request (fd);
426     if (status != 0)
427     {
428       close (fd);
429       break;
430     }
431   }
432
433   self = pthread_self ();
434   /* Remove this thread from the connection threads list */
435   pthread_mutex_lock (&connetion_threads_lock);
436   /* Find out own index in the array */
437   for (i = 0; i < connetion_threads_num; i++)
438     if (pthread_equal (connetion_threads[i], self) != 0)
439       break;
440   assert (i < connetion_threads_num);
441
442   /* Move the trailing threads forward. */
443   if (i < (connetion_threads_num - 1))
444   {
445     memmove (connetion_threads + i,
446         connetion_threads + i + 1,
447         sizeof (pthread_t) * (connetion_threads_num - i - 1));
448   }
449
450   connetion_threads_num--;
451   pthread_mutex_unlock (&connetion_threads_lock);
452
453   free (args);
454   return (NULL);
455 } /* }}} void *connection_thread_main */
456
457 static int open_listen_socket (const char *path) /* {{{ */
458 {
459   int fd;
460   struct sockaddr_un sa;
461   listen_socket_t *temp;
462   int status;
463
464   temp = (listen_socket_t *) realloc (listen_fds,
465       sizeof (listen_fds[0]) * (listen_fds_num + 1));
466   if (temp == NULL)
467   {
468     RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
469     return (-1);
470   }
471   listen_fds = temp;
472   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
473
474   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
475   if (fd < 0)
476   {
477     RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
478     return (-1);
479   }
480
481   memset (&sa, 0, sizeof (sa));
482   sa.sun_family = AF_UNIX;
483   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
484
485   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
486   if (status != 0)
487   {
488     RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
489     close (fd);
490     unlink (path);
491     return (-1);
492   }
493
494   status = listen (fd, /* backlog = */ 10);
495   if (status != 0)
496   {
497     RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
498     close (fd);
499     unlink (path);
500     return (-1);
501   }
502   
503   listen_fds[listen_fds_num].fd = fd;
504   strncpy (listen_fds[listen_fds_num].path, path,
505       sizeof (listen_fds[listen_fds_num].path) - 1);
506   listen_fds_num++;
507
508   return (0);
509 } /* }}} int open_listen_socket */
510
511 static int close_listen_sockets (void) /* {{{ */
512 {
513   size_t i;
514
515   for (i = 0; i < listen_fds_num; i++)
516   {
517     close (listen_fds[i].fd);
518     unlink (listen_fds[i].path);
519   }
520
521   free (listen_fds);
522   listen_fds = NULL;
523   listen_fds_num = 0;
524
525   return (0);
526 } /* }}} int close_listen_sockets */
527
528 static void *listen_thread_main (void *args) /* {{{ */
529 {
530   char buffer[4096];
531   int status;
532   int i;
533
534   status = open_listen_socket (RRDD_SOCK_PATH);
535   if (status != 0)
536   {
537     RRDD_LOG (LOG_ERR, "listen_thread_main: open_listen_socket failed.");
538     return (NULL);
539   }
540
541   while (do_shutdown == 0)
542   {
543     int *client_sd;
544     struct sockaddr_un client_sa;
545     socklen_t client_sa_size;
546     pthread_t tid;
547
548     client_sd = (int *) malloc (sizeof (int));
549     if (client_sd == NULL)
550     {
551       RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
552       sleep (120);
553       continue;
554     }
555
556     client_sa_size = sizeof (client_sa);
557     /* FIXME: Don't implement listen_fds as a list or use poll(2) here! */
558     *client_sd = accept (listen_fds[0].fd,
559         (struct sockaddr *) &client_sa, &client_sa_size);
560     if (*client_sd < 0)
561     {
562       RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
563       continue;
564     }
565
566     RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
567         *client_sd);
568
569     status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
570         /* args = */ (void *) client_sd);
571     if (status != 0)
572     {
573       RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
574       close (*client_sd);
575       free (client_sd);
576       continue;
577     }
578
579     RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
580         "tid = %lu",
581         *((unsigned long *) &tid));
582   } /* while (do_shutdown == 0) */
583
584   close_listen_sockets ();
585
586   pthread_mutex_lock (&connetion_threads_lock);
587   while (connetion_threads_num > 0)
588   {
589     pthread_t wait_for;
590
591     wait_for = connetion_threads[0];
592
593     pthread_mutex_unlock (&connetion_threads_lock);
594     pthread_join (wait_for, /* retval = */ NULL);
595     pthread_mutex_lock (&connetion_threads_lock);
596   }
597   pthread_mutex_unlock (&connetion_threads_lock);
598
599   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
600
601   return (NULL);
602 } /* }}} void *listen_thread_main */
603
604 static int daemonize (void) /* {{{ */
605 {
606   pid_t child;
607   int status;
608
609 #if !RRDD_DEBUG
610   child = fork ();
611   if (child < 0)
612   {
613     fprintf (stderr, "daemonize: fork(2) failed.\n");
614     return (-1);
615   }
616   else if (child > 0)
617   {
618     return (1);
619   }
620
621   /* Change into the /tmp directory. */
622   chdir ("/tmp");
623
624   /* Become session leader */
625   setsid ();
626
627   /* Open the first three file descriptors to /dev/null */
628   close (2);
629   close (1);
630   close (0);
631
632   open ("/dev/null", O_RDWR);
633   dup (0);
634   dup (0);
635 #endif /* RRDD_DEBUG */
636
637   {
638     struct sigaction sa;
639
640     memset (&sa, 0, sizeof (sa));
641     sa.sa_handler = sig_int_handler;
642     sigaction (SIGINT, &sa, NULL);
643
644     memset (&sa, 0, sizeof (sa));
645     sa.sa_handler = sig_term_handler;
646     sigaction (SIGINT, &sa, NULL);
647
648     memset (&sa, 0, sizeof (sa));
649     sa.sa_handler = SIG_IGN;
650     sigaction (SIGPIPE, &sa, NULL);
651   }
652
653   openlog ("rrdd", LOG_PID, LOG_DAEMON);
654
655   cache_tree = avl_alloc_tree (cache_tree_compare, cache_tree_free);
656   if (cache_tree == NULL)
657   {
658     RRDD_LOG (LOG_ERR, "daemonize: avl_alloc_tree failed.");
659     return (-1);
660   }
661
662   memset (&queue_thread, 0, sizeof (queue_thread));
663   status = pthread_create (&queue_thread, /* attr = */ NULL,
664       queue_thread_main, /* args = */ NULL);
665   if (status != 0)
666   {
667     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
668     return (-1);
669   }
670
671   return (0);
672 } /* }}} int daemonize */
673
674 static int cleanup (void) /* {{{ */
675 {
676   RRDD_LOG (LOG_DEBUG, "cleanup ()");
677
678   do_shutdown++;
679
680   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
681   pthread_cond_signal (&cache_cond);
682   pthread_join (queue_thread, /* return = */ NULL);
683   RRDD_LOG (LOG_DEBUG, "cleanup: done");
684
685   closelog ();
686
687   return (0);
688 } /* }}} int cleanup */
689
690 static int read_options (int argc, char **argv) /* {{{ */
691 {
692   int option;
693   int status = 0;
694
695   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
696   {
697     switch (option)
698     {
699       case 'l':
700       {
701         printf ("Listening to: %s\n", optarg);
702       }
703       break;
704
705       case 'f':
706       {
707         int temp;
708
709         temp = atoi (optarg);
710         if (temp > 0)
711           config_flush_interval = temp;
712         else
713         {
714           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
715           status = 3;
716         }
717       }
718       break;
719
720       case 'w':
721       {
722         int temp;
723
724         temp = atoi (optarg);
725         if (temp > 0)
726           config_write_interval = temp;
727         else
728         {
729           fprintf (stderr, "Invalid write interval: %s\n", optarg);
730           status = 2;
731         }
732       }
733       break;
734
735       case 'h':
736       case '?':
737         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
738             "\n"
739             "Usage: rrdd [options]\n"
740             "\n"
741             "Valid options are:\n"
742             "  -l <address>  Socket address to listen to.\n"
743             "  -w <seconds>  Interval in which to write data.\n"
744             "  -f <seconds>  Interval in which to flush dead data.\n"
745             "\n"
746             "For more information and a detailed description of all options "
747             "please refer\n"
748             "to the rrdd(1) manual page.\n",
749             PACKAGE_VERSION);
750         status = -1;
751         break;
752     } /* switch (option) */
753   } /* while (getopt) */
754
755   return (status);
756 } /* }}} int read_options */
757
758 int main (int argc, char **argv)
759 {
760   int status;
761
762   status = read_options (argc, argv);
763   if (status != 0)
764   {
765     if (status < 0)
766       status = 0;
767     return (status);
768   }
769
770   status = daemonize ();
771   if (status == 1)
772   {
773     struct sigaction sigchld;
774
775     memset (&sigchld, 0, sizeof (sigchld));
776     sigchld.sa_handler = SIG_IGN;
777     sigaction (SIGCHLD, &sigchld, NULL);
778
779     return (0);
780   }
781   else if (status != 0)
782   {
783     fprintf (stderr, "daemonize failed, exiting.\n");
784     return (1);
785   }
786
787   listen_thread_main (NULL);
788
789   cleanup ();
790
791   return (0);
792 } /* int main */
793
794 /*
795  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
796  */