src/rrdd.c: Add a debug message for disconnecting sockets.
[rrdd.git] / src / rrdd.c
1 /**
2  * collectd - src/rrdd.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #define RRDD_DEBUG 1
23
24 #include "rrdd.h"
25
26 #if RRDD_DEBUG
27 # define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0)
28 #else
29 # define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
30 #endif
31
32 /*
33  * Types
34  */
35 struct listen_socket_s
36 {
37   int fd;
38   char path[PATH_MAX + 1];
39 };
40 typedef struct listen_socket_s listen_socket_t;
41
42 struct cache_item_s;
43 typedef struct cache_item_s cache_item_t;
44 struct cache_item_s
45 {
46   char *file;
47   char **values;
48   int values_num;
49   time_t last_flush_time;
50 #define CI_FLAGS_IN_TREE  0x01
51 #define CI_FLAGS_IN_QUEUE 0x02
52   int flags;
53
54   cache_item_t *next;
55 };
56
57 /*
58  * Variables
59  */
60 static listen_socket_t *listen_fds = NULL;
61 static size_t listen_fds_num = 0;
62
63 static int do_shutdown = 0;
64
65 static pthread_t queue_thread;
66
67 static pthread_t *connetion_threads = NULL;
68 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
69 static int connetion_threads_num = 0;
70
71 /* Cache stuff */
72 static avl_tree_t     *cache_tree = NULL;
73 static cache_item_t   *cache_queue_head = NULL;
74 static cache_item_t   *cache_queue_tail = NULL;
75 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
76 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
77
78 /* 
79  * Functions
80  */
81 static void sig_int_handler (int signal) /* {{{ */
82 {
83   do_shutdown++;
84 } /* }}} void sig_int_handler */
85
86 static void sig_term_handler (int signal) /* {{{ */
87 {
88   do_shutdown++;
89 } /* }}} void sig_term_handler */
90
91 static int cache_tree_compare (const void *v0, const void *v1) /* {{{ */
92 {
93   cache_item_t *c0 = (cache_item_t *) v0;
94   cache_item_t *c1 = (cache_item_t *) v1;
95
96   assert (c0->file != NULL);
97   assert (c1->file != NULL);
98
99   return (strcmp (c0->file, c1->file));
100 } /* }}} int cache_tree_compare */
101
102 static void cache_tree_free (void *v) /* {{{ */
103 {
104   cache_item_t *c = (cache_item_t *) v;
105
106   assert (c->values_num == 0);
107   assert ((c->flags & CI_FLAGS_IN_TREE) != 0);
108   assert ((c->flags & CI_FLAGS_IN_QUEUE) == 0);
109
110   free (c->file);
111   c->file = NULL;
112   free (c);
113 } /* }}} void cache_tree_free */
114
115 static void *queue_thread_main (void *args) /* {{{ */
116 {
117   pthread_mutex_lock (&cache_lock);
118   while ((do_shutdown == 0) || (cache_queue_head != NULL))
119   {
120     cache_item_t *ci;
121
122     char *file;
123     char **values;
124     int values_num;
125     int status;
126     int i;
127
128     if (cache_queue_head == NULL)
129       pthread_cond_wait (&cache_cond, &cache_lock);
130
131     if (cache_queue_head == NULL)
132       continue;
133
134     ci = cache_queue_head;
135
136     /* copy the relevant parts */
137     file = strdup (ci->file);
138     if (file == NULL)
139     {
140       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
141       continue;
142     }
143
144     values = ci->values;
145     values_num = ci->values_num;
146
147     ci->values = NULL;
148     ci->values_num = 0;
149
150     ci->last_flush_time = time (NULL);
151     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
152
153     cache_queue_head = ci->next;
154     if (cache_queue_head == NULL)
155       cache_queue_tail = NULL;
156     ci->next = NULL;
157
158     pthread_mutex_unlock (&cache_lock);
159
160     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
161         file, values_num, (void *) values);
162
163     status = rrd_update_r (file, NULL, values_num, values);
164     if (status != 0)
165     {
166       RRDD_LOG (LOG_ERR, "queue_thread_main: "
167           "rrd_update_r failed with status %i.",
168           status);
169     }
170
171     free (file);
172     for (i = 0; i < values_num; i++)
173       free (values[i]);
174
175     pthread_mutex_lock (&cache_lock);
176   } /* while (do_shutdown == 0) */
177   pthread_mutex_unlock (&cache_lock);
178
179   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
180
181   return (NULL);
182 } /* }}} void *queue_thread_main */
183
184 static int handle_request_update (int fd, /* {{{ */
185     char *buffer, int buffer_size)
186 {
187   char *file;
188   char *value;
189   char *buffer_ptr;
190   int values_num = 0;
191   int status;
192
193   time_t now;
194
195   avl_node_t *node;
196   cache_item_t ci_temp;
197   cache_item_t *ci;
198
199   char answer[4096];
200
201   now = time (NULL);
202
203   RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
204       fd, (void *) buffer, buffer_size);
205
206   buffer_ptr = buffer;
207
208   file = buffer_ptr;
209   buffer_ptr += strlen (file) + 1;
210
211   ci_temp.file = file;
212
213   pthread_mutex_lock (&cache_lock);
214
215   node = avl_search (cache_tree, (void *) &ci_temp);
216   if (node == NULL)
217   {
218     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
219     if (ci == NULL)
220     {
221       pthread_mutex_unlock (&cache_lock);
222       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
223       return (-1);
224     }
225     memset (ci, 0, sizeof (cache_item_t));
226
227     ci->file = strdup (file);
228     if (ci->file == NULL)
229     {
230       pthread_mutex_unlock (&cache_lock);
231       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
232       free (ci);
233       return (-1);
234     }
235
236     ci->values = NULL;
237     ci->values_num = 0;
238     ci->last_flush_time = now;
239     ci->flags = CI_FLAGS_IN_TREE;
240
241     if (avl_insert (cache_tree, (void *) ci) == NULL)
242     {
243       pthread_mutex_unlock (&cache_lock);
244       RRDD_LOG (LOG_ERR, "handle_request_update: avl_insert failed.");
245       free (ci->file);
246       free (ci);
247       return (-1);
248     }
249
250     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new AVL node %s.",
251         ci->file);
252   }
253   else /* if (ci != NULL) */
254   {
255     ci = (cache_item_t *) node->item;
256   }
257   assert (ci != NULL);
258
259   while (*buffer_ptr != 0)
260   {
261     char **temp;
262
263     value = buffer_ptr;
264     buffer_ptr += strlen (value) + 1;
265
266     temp = (char **) realloc (ci->values,
267         sizeof (char *) * (ci->values_num + 1));
268     if (temp == NULL)
269     {
270       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
271       continue;
272     }
273     ci->values = temp;
274
275     ci->values[ci->values_num] = strdup (value);
276     if (ci->values[ci->values_num] == NULL)
277     {
278       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
279       continue;
280     }
281     ci->values_num++;
282
283     values_num++;
284   }
285
286   /* FIXME: Timeout should not be hard-coded. */
287   if (((now - ci->last_flush_time) > 300)
288       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
289   {
290     RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
291         ci->file);
292
293     assert (ci->next == NULL);
294
295     if (cache_queue_tail == NULL)
296       cache_queue_head = ci;
297     else
298       cache_queue_tail->next = ci;
299     cache_queue_tail = ci;
300
301     pthread_cond_signal (&cache_cond);
302   }
303
304   pthread_mutex_unlock (&cache_lock);
305
306   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
307   answer[sizeof (answer) - 1] = 0;
308
309   status = write (fd, answer, sizeof (answer));
310   if (status < 0)
311   {
312     status = errno;
313     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
314     return (status);
315   }
316
317   return (0);
318 } /* }}} int handle_request_update */
319
320 static int handle_request (int fd) /* {{{ */
321 {
322   char buffer[4096];
323   int buffer_size;
324
325   RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
326
327   buffer_size = read (fd, buffer, sizeof (buffer));
328   if (buffer_size < 1)
329   {
330     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
331     return (-1);
332   }
333   assert (((size_t) buffer_size) <= sizeof (buffer));
334
335   if ((buffer[buffer_size - 2] != 0)
336       || (buffer[buffer_size - 1] != 0))
337   {
338     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
339     return (-1);
340   }
341
342   /* fields in the buffer a separated by null bytes. */
343   if (strcmp (buffer, "update") == 0)
344   {
345     int offset = strlen ("update") + 1;
346     return (handle_request_update (fd, buffer + offset,
347           buffer_size - offset));
348   }
349   else
350   {
351     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
352     return (-1);
353   }
354 } /* }}} int handle_request */
355
356 static void *connection_thread_main (void *args) /* {{{ */
357 {
358   pthread_t self;
359   int i;
360   int fd;
361   
362   fd = *((int *) args);
363
364   RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
365       "connetion_threads[]..");
366   pthread_mutex_lock (&connetion_threads_lock);
367   {
368     pthread_t *temp;
369
370     temp = (pthread_t *) realloc (connetion_threads,
371         sizeof (pthread_t) * (connetion_threads_num + 1));
372     if (temp == NULL)
373     {
374       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
375     }
376     else
377     {
378       connetion_threads = temp;
379       connetion_threads[connetion_threads_num] = pthread_self ();
380       connetion_threads_num++;
381     }
382   }
383   pthread_mutex_unlock (&connetion_threads_lock);
384   RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
385
386   while (do_shutdown == 0)
387   {
388     struct pollfd pollfd;
389     int status;
390
391     pollfd.fd = fd;
392     pollfd.events = POLLIN | POLLPRI;
393     pollfd.revents = 0;
394
395     status = poll (&pollfd, 1, /* timeout = */ 500);
396     if (status == 0) /* timeout */
397       continue;
398     else if (status < 0) /* error */
399     {
400       status = errno;
401       if (status == EINTR)
402         continue;
403       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
404       continue;
405     }
406
407     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
408     {
409       RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
410           "poll(2) returned POLLHUP.");
411       close (fd);
412       break;
413     }
414     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
415     {
416       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
417           "poll(2) returned something unexpected: %#04hx",
418           pollfd.revents);
419       close (fd);
420       break;
421     }
422
423     status = handle_request (fd);
424     if (status != 0)
425     {
426       close (fd);
427       break;
428     }
429   }
430
431   self = pthread_self ();
432   /* Remove this thread from the connection threads list */
433   pthread_mutex_lock (&connetion_threads_lock);
434   /* Find out own index in the array */
435   for (i = 0; i < connetion_threads_num; i++)
436     if (pthread_equal (connetion_threads[i], self) != 0)
437       break;
438   assert (i < connetion_threads_num);
439
440   /* Move the trailing threads forward. */
441   if (i < (connetion_threads_num - 1))
442   {
443     memmove (connetion_threads + i,
444         connetion_threads + i + 1,
445         sizeof (pthread_t) * (connetion_threads_num - i - 1));
446   }
447
448   connetion_threads_num--;
449   pthread_mutex_unlock (&connetion_threads_lock);
450
451   free (args);
452   return (NULL);
453 } /* }}} void *connection_thread_main */
454
455 static int open_listen_socket (const char *path) /* {{{ */
456 {
457   int fd;
458   struct sockaddr_un sa;
459   listen_socket_t *temp;
460   int status;
461
462   temp = (listen_socket_t *) realloc (listen_fds,
463       sizeof (listen_fds[0]) * (listen_fds_num + 1));
464   if (temp == NULL)
465   {
466     RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
467     return (-1);
468   }
469   listen_fds = temp;
470   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
471
472   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
473   if (fd < 0)
474   {
475     RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
476     return (-1);
477   }
478
479   memset (&sa, 0, sizeof (sa));
480   sa.sun_family = AF_UNIX;
481   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
482
483   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
484   if (status != 0)
485   {
486     RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
487     close (fd);
488     unlink (path);
489     return (-1);
490   }
491
492   status = listen (fd, /* backlog = */ 10);
493   if (status != 0)
494   {
495     RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
496     close (fd);
497     unlink (path);
498     return (-1);
499   }
500   
501   listen_fds[listen_fds_num].fd = fd;
502   strncpy (listen_fds[listen_fds_num].path, path,
503       sizeof (listen_fds[listen_fds_num].path) - 1);
504   listen_fds_num++;
505
506   return (0);
507 } /* }}} int open_listen_socket */
508
509 static int close_listen_sockets (void) /* {{{ */
510 {
511   size_t i;
512
513   for (i = 0; i < listen_fds_num; i++)
514   {
515     close (listen_fds[i].fd);
516     unlink (listen_fds[i].path);
517   }
518
519   free (listen_fds);
520   listen_fds = NULL;
521   listen_fds_num = 0;
522
523   return (0);
524 } /* }}} int close_listen_sockets */
525
526 static void *listen_thread_main (void *args) /* {{{ */
527 {
528   char buffer[4096];
529   int status;
530   int i;
531
532   status = open_listen_socket (RRDD_SOCK_PATH);
533   if (status != 0)
534   {
535     RRDD_LOG (LOG_ERR, "listen_thread_main: open_listen_socket failed.");
536     return (NULL);
537   }
538
539   while (do_shutdown == 0)
540   {
541     int *client_sd;
542     struct sockaddr_un client_sa;
543     socklen_t client_sa_size;
544     pthread_t tid;
545
546     client_sd = (int *) malloc (sizeof (int));
547     if (client_sd == NULL)
548     {
549       RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
550       sleep (120);
551       continue;
552     }
553
554     client_sa_size = sizeof (client_sa);
555     /* FIXME: Don't implement listen_fds as a list or use poll(2) here! */
556     *client_sd = accept (listen_fds[0].fd,
557         (struct sockaddr *) &client_sa, &client_sa_size);
558     if (*client_sd < 0)
559     {
560       RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
561       continue;
562     }
563
564     RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
565         *client_sd);
566
567     status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
568         /* args = */ (void *) client_sd);
569     if (status != 0)
570     {
571       RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
572       close (*client_sd);
573       free (client_sd);
574       continue;
575     }
576
577     RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
578         "tid = %lu",
579         *((unsigned long *) &tid));
580   } /* while (do_shutdown == 0) */
581
582   close_listen_sockets ();
583
584   pthread_mutex_lock (&connetion_threads_lock);
585   while (connetion_threads_num > 0)
586   {
587     pthread_t wait_for;
588
589     wait_for = connetion_threads[0];
590
591     pthread_mutex_unlock (&connetion_threads_lock);
592     pthread_join (wait_for, /* retval = */ NULL);
593     pthread_mutex_lock (&connetion_threads_lock);
594   }
595   pthread_mutex_unlock (&connetion_threads_lock);
596
597   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
598
599   return (NULL);
600 } /* }}} void *listen_thread_main */
601
602 static int daemonize (void) /* {{{ */
603 {
604   pid_t child;
605   int status;
606
607 #if !RRDD_DEBUG
608   child = fork ();
609   if (child < 0)
610   {
611     fprintf (stderr, "daemonize: fork(2) failed.\n");
612     return (-1);
613   }
614   else if (child > 0)
615   {
616     return (1);
617   }
618
619   /* Change into the /tmp directory. */
620   chdir ("/tmp");
621
622   /* Become session leader */
623   setsid ();
624
625   /* Open the first three file descriptors to /dev/null */
626   close (2);
627   close (1);
628   close (0);
629
630   open ("/dev/null", O_RDWR);
631   dup (0);
632   dup (0);
633 #endif /* RRDD_DEBUG */
634
635   {
636     struct sigaction sa;
637
638     memset (&sa, 0, sizeof (sa));
639     sa.sa_handler = sig_int_handler;
640     sigaction (SIGINT, &sa, NULL);
641
642     memset (&sa, 0, sizeof (sa));
643     sa.sa_handler = sig_term_handler;
644     sigaction (SIGINT, &sa, NULL);
645
646     memset (&sa, 0, sizeof (sa));
647     sa.sa_handler = SIG_IGN;
648     sigaction (SIGPIPE, &sa, NULL);
649   }
650
651   openlog ("rrdd", LOG_PID, LOG_DAEMON);
652
653   cache_tree = avl_alloc_tree (cache_tree_compare, cache_tree_free);
654   if (cache_tree == NULL)
655   {
656     RRDD_LOG (LOG_ERR, "daemonize: avl_alloc_tree failed.");
657     return (-1);
658   }
659
660   memset (&queue_thread, 0, sizeof (queue_thread));
661   status = pthread_create (&queue_thread, /* attr = */ NULL,
662       queue_thread_main, /* args = */ NULL);
663   if (status != 0)
664   {
665     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
666     return (-1);
667   }
668
669   return (0);
670 } /* }}} int daemonize */
671
672 static int cleanup (void) /* {{{ */
673 {
674   RRDD_LOG (LOG_DEBUG, "cleanup ()");
675
676   do_shutdown++;
677
678   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
679   pthread_cond_signal (&cache_cond);
680   pthread_join (queue_thread, /* return = */ NULL);
681   RRDD_LOG (LOG_DEBUG, "cleanup: done");
682
683   closelog ();
684
685   return (0);
686 } /* }}} int cleanup */
687
688 int main (int argc, char **argv)
689 {
690   int status;
691
692   printf ("%s by Florian Forster, Version %s\n",
693       PACKAGE_NAME, PACKAGE_VERSION);
694
695   status = daemonize ();
696   if (status == 1)
697   {
698     struct sigaction sigchld;
699
700     memset (&sigchld, 0, sizeof (sigchld));
701     sigchld.sa_handler = SIG_IGN;
702     sigaction (SIGCHLD, &sigchld, NULL);
703
704     return (0);
705   }
706   else if (status != 0)
707   {
708     fprintf (stderr, "daemonize failed, exiting.\n");
709     return (1);
710   }
711
712   listen_thread_main (NULL);
713
714   cleanup ();
715
716   return (0);
717 } /* int main */
718
719 /*
720  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
721  */