23f43283d7336df388e844f6530fd357c55f8f54
[rrdd.git] / src / rrdd.c
1 /**
2  * collectd - src/rrdd.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #define RRDD_DEBUG 1
23
24 #include "rrdd.h"
25
26 #if RRDD_DEBUG
27 # define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0)
28 #else
29 # define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
30 #endif
31
32 /*
33  * Types
34  */
35 struct listen_socket_s
36 {
37   int fd;
38   char path[PATH_MAX + 1];
39 };
40 typedef struct listen_socket_s listen_socket_t;
41
42 struct cache_item_s;
43 typedef struct cache_item_s cache_item_t;
44 struct cache_item_s
45 {
46   char *file;
47   char **values;
48   int values_num;
49   time_t last_flush_time;
50 #define CI_FLAGS_IN_TREE  0x01
51 #define CI_FLAGS_IN_QUEUE 0x02
52   int flags;
53
54   cache_item_t *next;
55 };
56
57 /*
58  * Variables
59  */
60 static listen_socket_t *listen_fds = NULL;
61 static size_t listen_fds_num = 0;
62
63 static int do_shutdown = 0;
64
65 static pthread_t queue_thread;
66
67 static pthread_t *connetion_threads = NULL;
68 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
69 static int connetion_threads_num = 0;
70
71 /* Cache stuff */
72 static avl_tree_t     *cache_tree = NULL;
73 static cache_item_t   *cache_queue_head = NULL;
74 static cache_item_t   *cache_queue_tail = NULL;
75 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
76 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
77
78 /* 
79  * Functions
80  */
81 static void sig_int_handler (int signal) /* {{{ */
82 {
83   do_shutdown++;
84 } /* }}} void sig_int_handler */
85
86 static void sig_term_handler (int signal) /* {{{ */
87 {
88   do_shutdown++;
89 } /* }}} void sig_term_handler */
90
91 static int cache_tree_compare (const void *v0, const void *v1) /* {{{ */
92 {
93   cache_item_t *c0 = (cache_item_t *) v0;
94   cache_item_t *c1 = (cache_item_t *) v1;
95
96   assert (c0->file != NULL);
97   assert (c1->file != NULL);
98
99   return (strcmp (c0->file, c1->file));
100 } /* }}} int cache_tree_compare */
101
102 static void cache_tree_free (void *v) /* {{{ */
103 {
104   cache_item_t *c = (cache_item_t *) v;
105
106   assert (c->values_num == 0);
107   assert ((c->flags & CI_FLAGS_IN_TREE) != 0);
108   assert ((c->flags & CI_FLAGS_IN_QUEUE) == 0);
109
110   free (c->file);
111   c->file = NULL;
112   free (c);
113 } /* }}} void cache_tree_free */
114
115 static void *queue_thread_main (void *args) /* {{{ */
116 {
117   pthread_mutex_lock (&cache_lock);
118   while ((do_shutdown == 0) || (cache_queue_head != NULL))
119   {
120     cache_item_t *ci;
121
122     char *file;
123     char **values;
124     int values_num;
125     int i;
126
127     if (cache_queue_head == NULL)
128       pthread_cond_wait (&cache_cond, &cache_lock);
129
130     if (cache_queue_head == NULL)
131       continue;
132
133     ci = cache_queue_head;
134
135     /* copy the relevant parts */
136     file = strdup (ci->file);
137     if (file == NULL)
138     {
139       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
140       continue;
141     }
142
143     values = ci->values;
144     values_num = ci->values_num;
145
146     ci->values = NULL;
147     ci->values_num = 0;
148
149     ci->last_flush_time = time (NULL);
150     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
151
152     cache_queue_head = ci->next;
153     if (cache_queue_head == NULL)
154       cache_queue_tail = NULL;
155     ci->next = NULL;
156
157     pthread_mutex_unlock (&cache_lock);
158
159     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
160         file, values_num, (void *) values);
161
162     free (file);
163     for (i = 0; i < values_num; i++)
164       free (values[i]);
165
166     pthread_mutex_lock (&cache_lock);
167   } /* while (do_shutdown == 0) */
168   pthread_mutex_unlock (&cache_lock);
169
170   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
171
172   return (NULL);
173 } /* }}} void *queue_thread_main */
174
175 static int handle_request_update (int fd, /* {{{ */
176     char *buffer, int buffer_size)
177 {
178   char *file;
179   char *value;
180   char *buffer_ptr;
181   int values_num = 0;
182
183   time_t now;
184
185   avl_node_t *node;
186   cache_item_t ci_temp;
187   cache_item_t *ci;
188
189   char answer[4096];
190
191   now = time (NULL);
192
193   RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
194       fd, (void *) buffer, buffer_size);
195
196   buffer_ptr = buffer;
197
198   file = buffer_ptr;
199   buffer_ptr += strlen (file) + 1;
200
201   ci_temp.file = file;
202
203   pthread_mutex_lock (&cache_lock);
204
205   node = avl_search (cache_tree, (void *) &ci_temp);
206   if (node == NULL)
207   {
208     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
209     if (ci == NULL)
210     {
211       pthread_mutex_unlock (&cache_lock);
212       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
213       return (-1);
214     }
215     memset (ci, 0, sizeof (cache_item_t));
216
217     ci->file = strdup (file);
218     if (ci->file == NULL)
219     {
220       pthread_mutex_unlock (&cache_lock);
221       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
222       free (ci);
223       return (-1);
224     }
225
226     ci->values = NULL;
227     ci->values_num = 0;
228     ci->last_flush_time = now;
229     ci->flags = CI_FLAGS_IN_TREE;
230
231     if (avl_insert (cache_tree, (void *) ci) == NULL)
232     {
233       pthread_mutex_unlock (&cache_lock);
234       RRDD_LOG (LOG_ERR, "handle_request_update: avl_insert failed.");
235       free (ci->file);
236       free (ci);
237       return (-1);
238     }
239
240     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new AVL node %s.",
241         ci->file);
242   }
243   else /* if (ci != NULL) */
244   {
245     ci = (cache_item_t *) node->item;
246   }
247   assert (ci != NULL);
248
249   while (*buffer_ptr != 0)
250   {
251     char **temp;
252
253     value = buffer_ptr;
254     buffer_ptr += strlen (value) + 1;
255
256     temp = (char **) realloc (ci->values,
257         sizeof (char *) * (ci->values_num + 1));
258     if (temp == NULL)
259     {
260       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
261       continue;
262     }
263     ci->values = temp;
264
265     ci->values[ci->values_num] = strdup (value);
266     if (ci->values[ci->values_num] == NULL)
267     {
268       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
269       continue;
270     }
271     ci->values_num++;
272
273     values_num++;
274   }
275
276   /* FIXME: Timeout should not be hard-coded. */
277   if (((now - ci->last_flush_time) > 300)
278       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
279   {
280     RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
281         ci->file);
282
283     assert (ci->next == NULL);
284
285     if (cache_queue_tail == NULL)
286       cache_queue_head = ci;
287     else
288       cache_queue_tail->next = ci;
289     cache_queue_tail = ci;
290
291     pthread_cond_signal (&cache_cond);
292   }
293
294   pthread_mutex_unlock (&cache_lock);
295
296   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
297   answer[sizeof (answer) - 1] = 0;
298
299   write (fd, answer, sizeof (answer));
300
301   return (0);
302 } /* }}} int handle_request_update */
303
304 static int handle_request (int fd) /* {{{ */
305 {
306   char buffer[4096];
307   int buffer_size;
308
309   RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
310
311   buffer_size = read (fd, buffer, sizeof (buffer));
312   if (buffer_size < 1)
313   {
314     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
315     return (-1);
316   }
317   assert (((size_t) buffer_size) <= sizeof (buffer));
318
319   if ((buffer[buffer_size - 2] != 0)
320       || (buffer[buffer_size - 1] != 0))
321   {
322     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
323     return (-1);
324   }
325
326   /* fields in the buffer a separated by null bytes. */
327   if (strcmp (buffer, "update") == 0)
328   {
329     int offset = strlen ("update") + 1;
330     return (handle_request_update (fd, buffer + offset,
331           buffer_size - offset));
332   }
333   else
334   {
335     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
336     return (-1);
337   }
338 } /* }}} int handle_request */
339
340 static void *connection_thread_main (void *args) /* {{{ */
341 {
342   pthread_t self;
343   int i;
344   int fd;
345   
346   fd = *((int *) args);
347
348   RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
349       "connetion_threads[]..");
350   pthread_mutex_lock (&connetion_threads_lock);
351   {
352     pthread_t *temp;
353
354     temp = (pthread_t *) realloc (connetion_threads,
355         sizeof (pthread_t) * (connetion_threads_num + 1));
356     if (temp == NULL)
357     {
358       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
359     }
360     else
361     {
362       connetion_threads = temp;
363       connetion_threads[connetion_threads_num] = pthread_self ();
364       connetion_threads_num++;
365     }
366   }
367   pthread_mutex_unlock (&connetion_threads_lock);
368   RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
369
370   while (do_shutdown == 0)
371   {
372     struct pollfd pollfd;
373     int status;
374
375     pollfd.fd = fd;
376     pollfd.events = POLLIN | POLLPRI;
377     pollfd.revents = 0;
378
379     status = poll (&pollfd, 1, /* timeout = */ 500);
380     if (status == 0) /* timeout */
381       continue;
382     else if (status < 0) /* error */
383     {
384       status = errno;
385       if (status == EINTR)
386         continue;
387       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
388       continue;
389     }
390
391     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
392     {
393       close (fd);
394       break;
395     }
396     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
397     {
398       RRDD_LOG (LOG_WARNING, "connection_thread_main: poll(2) returned "
399           "something unexpected.");
400       close (fd);
401       break;
402     }
403
404     status = handle_request (fd);
405     if (status != 0)
406     {
407       close (fd);
408       break;
409     }
410   }
411
412   self = pthread_self ();
413   /* Remove this thread from the connection threads list */
414   pthread_mutex_lock (&connetion_threads_lock);
415   /* Find out own index in the array */
416   for (i = 0; i < connetion_threads_num; i++)
417     if (pthread_equal (connetion_threads[i], self) != 0)
418       break;
419   assert (i < connetion_threads_num);
420
421   /* Move the trailing threads forward. */
422   if (i < (connetion_threads_num - 1))
423   {
424     memmove (connetion_threads + i,
425         connetion_threads + i + 1,
426         sizeof (pthread_t) * (connetion_threads_num - i - 1));
427   }
428
429   connetion_threads_num--;
430   pthread_mutex_unlock (&connetion_threads_lock);
431
432   free (args);
433   return (NULL);
434 } /* }}} void *connection_thread_main */
435
436 static int open_listen_socket (const char *path) /* {{{ */
437 {
438   int fd;
439   struct sockaddr_un sa;
440   listen_socket_t *temp;
441   int status;
442
443   temp = (listen_socket_t *) realloc (listen_fds,
444       sizeof (listen_fds[0]) * (listen_fds_num + 1));
445   if (temp == NULL)
446   {
447     RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
448     return (-1);
449   }
450   listen_fds = temp;
451   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
452
453   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
454   if (fd < 0)
455   {
456     RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
457     return (-1);
458   }
459
460   memset (&sa, 0, sizeof (sa));
461   sa.sun_family = AF_UNIX;
462   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
463
464   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
465   if (status != 0)
466   {
467     RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
468     close (fd);
469     unlink (path);
470     return (-1);
471   }
472
473   status = listen (fd, /* backlog = */ 10);
474   if (status != 0)
475   {
476     RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
477     close (fd);
478     unlink (path);
479     return (-1);
480   }
481   
482   listen_fds[listen_fds_num].fd = fd;
483   strncpy (listen_fds[listen_fds_num].path, path,
484       sizeof (listen_fds[listen_fds_num].path) - 1);
485   listen_fds_num++;
486
487   return (0);
488 } /* }}} int open_listen_socket */
489
490 static int close_listen_sockets (void) /* {{{ */
491 {
492   size_t i;
493
494   for (i = 0; i < listen_fds_num; i++)
495   {
496     close (listen_fds[i].fd);
497     unlink (listen_fds[i].path);
498   }
499
500   free (listen_fds);
501   listen_fds = NULL;
502   listen_fds_num = 0;
503
504   return (0);
505 } /* }}} int close_listen_sockets */
506
507 static void *listen_thread_main (void *args) /* {{{ */
508 {
509   char buffer[4096];
510   int status;
511   int i;
512
513   status = open_listen_socket (RRDD_SOCK_PATH);
514   if (status != 0)
515   {
516     RRDD_LOG (LOG_ERR, "listen_thread_main: open_listen_socket failed.");
517     return (NULL);
518   }
519
520   while (do_shutdown == 0)
521   {
522     int *client_sd;
523     struct sockaddr_un client_sa;
524     socklen_t client_sa_size;
525     pthread_t tid;
526
527     client_sd = (int *) malloc (sizeof (int));
528     if (client_sd == NULL)
529     {
530       RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
531       sleep (120);
532       continue;
533     }
534
535     client_sa_size = sizeof (client_sa);
536     /* FIXME: Don't implement listen_fds as a list or use poll(2) here! */
537     *client_sd = accept (listen_fds[0].fd,
538         (struct sockaddr *) &client_sa, &client_sa_size);
539     if (*client_sd < 0)
540     {
541       RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
542       continue;
543     }
544
545     RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
546         *client_sd);
547
548     status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
549         /* args = */ (void *) client_sd);
550     if (status != 0)
551     {
552       RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
553       close (*client_sd);
554       free (client_sd);
555       continue;
556     }
557
558     RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
559         "tid = %lu",
560         *((unsigned long *) &tid));
561   } /* while (do_shutdown == 0) */
562
563   close_listen_sockets ();
564
565   pthread_mutex_lock (&connetion_threads_lock);
566   while (connetion_threads_num > 0)
567   {
568     pthread_t wait_for;
569
570     wait_for = connetion_threads[0];
571
572     pthread_mutex_unlock (&connetion_threads_lock);
573     pthread_join (wait_for, /* retval = */ NULL);
574     pthread_mutex_lock (&connetion_threads_lock);
575   }
576   pthread_mutex_unlock (&connetion_threads_lock);
577
578   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
579
580   return (NULL);
581 } /* }}} void *listen_thread_main */
582
583 static int daemonize (void) /* {{{ */
584 {
585   pid_t child;
586   int status;
587
588 #if !RRDD_DEBUG
589   child = fork ();
590   if (child < 0)
591   {
592     fprintf (stderr, "daemonize: fork(2) failed.\n");
593     return (-1);
594   }
595   else if (child > 0)
596   {
597     return (1);
598   }
599
600   /* Change into the /tmp directory. */
601   chdir ("/tmp");
602
603   /* Become session leader */
604   setsid ();
605
606   /* Open the first three file descriptors to /dev/null */
607   close (2);
608   close (1);
609   close (0);
610
611   open ("/dev/null", O_RDWR);
612   dup (0);
613   dup (0);
614 #endif /* RRDD_DEBUG */
615
616   {
617     struct sigaction sa;
618
619     memset (&sa, 0, sizeof (sa));
620     sa.sa_handler = sig_int_handler;
621     sigaction (SIGINT, &sa, NULL);
622
623     memset (&sa, 0, sizeof (sa));
624     sa.sa_handler = sig_term_handler;
625     sigaction (SIGINT, &sa, NULL);
626   }
627
628   openlog ("rrdd", LOG_PID, LOG_DAEMON);
629
630   cache_tree = avl_alloc_tree (cache_tree_compare, cache_tree_free);
631   if (cache_tree == NULL)
632   {
633     RRDD_LOG (LOG_ERR, "daemonize: avl_alloc_tree failed.");
634     return (-1);
635   }
636
637   memset (&queue_thread, 0, sizeof (queue_thread));
638   status = pthread_create (&queue_thread, /* attr = */ NULL,
639       queue_thread_main, /* args = */ NULL);
640   if (status != 0)
641   {
642     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
643     return (-1);
644   }
645
646   return (0);
647 } /* }}} int daemonize */
648
649 static int cleanup (void) /* {{{ */
650 {
651   RRDD_LOG (LOG_DEBUG, "cleanup ()");
652
653   do_shutdown++;
654
655   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
656   pthread_cond_signal (&cache_cond);
657   pthread_join (queue_thread, /* return = */ NULL);
658   RRDD_LOG (LOG_DEBUG, "cleanup: done");
659
660   closelog ();
661
662   return (0);
663 } /* }}} int cleanup */
664
665 int main (int argc, char **argv)
666 {
667   int status;
668
669   printf ("%s by Florian Forster, Version %s\n",
670       PACKAGE_NAME, PACKAGE_VERSION);
671
672   status = daemonize ();
673   if (status == 1)
674   {
675     struct sigaction sigchld;
676
677     memset (&sigchld, 0, sizeof (sigchld));
678     sigchld.sa_handler = SIG_IGN;
679     sigaction (SIGCHLD, &sigchld, NULL);
680
681     return (0);
682   }
683   else if (status != 0)
684   {
685     fprintf (stderr, "daemonize failed, exiting.\n");
686     return (1);
687   }
688
689   listen_thread_main (NULL);
690
691   cleanup ();
692
693   return (0);
694 } /* int main */
695
696 /*
697  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
698  */