489ba7cfa502c478e72f7fce2563df50b7af6752
[collectd.git] / src / pinba.c
1 /**
2  * collectd - src/pinba.c (based on code from pinba_engine 0.0.5)
3  * Copyright (c) 2007-2009  Antony Dovgal
4  * Copyright (C) 2010       Phoenix Kayo
5  * Copyright (C) 2010       Florian Forster
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Antony Dovgal <tony at daylessday.org>
22  *   Phoenix Kayo <kayo.k11.4 at gmail.com>
23  *   Florian Forster <octo at verplant.org>
24  **/
25
26 #include "collectd.h"
27 #include "common.h"
28 #include "plugin.h"
29 #include "configfile.h"
30
31 #include <pthread.h>
32 #include <sys/socket.h>
33 #include <netdb.h>
34 #include <poll.h>
35
36 #include "pinba.pb-c.h"
37
38 /*
39  *  Service declaration section
40  */
41 #ifndef PINBA_UDP_BUFFER_SIZE
42 # define PINBA_UDP_BUFFER_SIZE 65536
43 #endif
44
45 #ifndef PINBA_DEFAULT_NODE
46 # define PINBA_DEFAULT_NODE "127.0.0.1" /* FIXME */
47 #endif
48
49 #ifndef PINBA_DEFAULT_SERVICE
50 # define PINBA_DEFAULT_SERVICE "12345" /* FIXME */
51 #endif
52
53 #ifndef PINBA_MAX_SOCKETS
54 # define PINBA_MAX_SOCKETS 16
55 #endif
56
57 #ifndef NI_MAXSERV
58 # define NI_MAXSERV 32
59 #endif
60
61 /*
62  * Private data structures
63  */
64 typedef struct _pinba_statres_ pinba_statres;
65 struct _pinba_statres_ {
66   const char *name;
67   double req_per_sec;
68   double req_time;
69   double ru_utime;
70   double ru_stime;
71   double doc_size;
72   double mem_peak;
73 };
74
75 struct pinba_socket_s {
76   struct pollfd fd[PINBA_MAX_SOCKETS];
77   nfds_t fd_num;
78 };
79 typedef struct pinba_socket_s pinba_socket_t;
80
81 typedef double pinba_time_t;
82 typedef uint32_t pinba_size_t;
83
84 typedef struct pinba_statnode_s pinba_statnode_t;
85 struct pinba_statnode_s
86 {
87   /* collector name */
88   char *name;
89   /* query data */
90   char *host;
91   char *server;
92   char *script;
93   /* collected data */
94   pinba_time_t last_coll;
95   pinba_size_t req_count;
96   pinba_time_t req_time;
97   pinba_time_t ru_utime;
98   pinba_time_t ru_stime;
99   pinba_size_t doc_size;
100   pinba_size_t mem_peak;
101 };
102
103 static pinba_statnode_t *stat_nodes = NULL;
104 static unsigned int stat_nodes_num = 0;
105 static pthread_mutex_t stat_nodes_lock;
106
107 static char *conf_node = NULL;
108 static char *conf_service = NULL;
109
110 static _Bool collector_thread_running = 0;
111 static _Bool collector_thread_do_shutdown = 0;
112 static pthread_t collector_thread_id;
113
114 static pinba_time_t now (void) /* {{{ */
115 {
116   static struct timeval tv;
117   
118   gettimeofday (&tv, /* tz = */ NULL);
119   
120   return (double)tv.tv_sec+((double)tv.tv_usec/(double)1000000);
121 } /* }}} pinba_time_t now */
122
123 static void service_statnode_reset (pinba_statnode_t *node) /* {{{ */
124 {
125   node->last_coll=now();
126   node->req_count=0;
127   node->req_time=0.0;
128   node->ru_utime=0.0;
129   node->ru_stime=0.0;
130   node->doc_size=0;
131   node->mem_peak=0;
132 } /* }}} void service_statnode_reset */
133
134 static void strset (char **str, const char *new) /* {{{ */
135 {
136   char *tmp;
137
138   if (!str || !new)
139     return;
140
141   tmp = strdup (new);
142   if (tmp == NULL)
143     return;
144
145   sfree (*str);
146   *str = tmp;
147 } /* }}} void strset */
148
149 static void service_statnode_add(const char *name, /* {{{ */
150     const char *host,
151     const char *server,
152     const char *script)
153 {
154   pinba_statnode_t *node;
155   DEBUG("adding node `%s' to collector { %s, %s, %s }", name, host?host:"", server?server:"", script?script:"");
156   
157   stat_nodes=realloc(stat_nodes, sizeof(pinba_statnode_t)*(stat_nodes_num+1));
158   if(!stat_nodes){
159     ERROR("Realloc failed!");
160     exit(-1);
161   }
162   
163   node=&stat_nodes[stat_nodes_num];
164   
165   /* reset stat data */
166   service_statnode_reset(node);
167   
168   /* reset strings */
169   node->name=NULL;
170   node->host=NULL;
171   node->server=NULL;
172   node->script=NULL;
173   
174   /* fill query data */
175   strset(&node->name, name);
176   strset(&node->host, host);
177   strset(&node->server, server);
178   strset(&node->script, script);
179   
180   /* increment counter */
181   stat_nodes_num++;
182 } /* }}} void service_statnode_add */
183
184 static void service_statnode_free (void) /* {{{ */
185 {
186   unsigned int i;
187
188   if(stat_nodes_num < 1)
189     return;
190
191   for (i = 0; i < stat_nodes_num; i++)
192   {
193     sfree (stat_nodes[i].name);
194     sfree (stat_nodes[i].host);
195     sfree (stat_nodes[i].server);
196     sfree (stat_nodes[i].script);
197   }
198
199   sfree (stat_nodes);
200   stat_nodes_num = 0;
201
202   pthread_mutex_destroy (&stat_nodes_lock);
203 } /* }}} void service_statnode_free */
204
205 static void service_statnode_init (void) /* {{{ */
206 {
207   /* only total info collect by default */
208   service_statnode_free();
209   
210   DEBUG("initializing collector..");
211   pthread_mutex_init(&stat_nodes_lock, 0);
212 } /* }}} void service_statnode_init */
213
214 static void service_statnode_begin (void) /* {{{ */
215 {
216   service_statnode_init();
217   pthread_mutex_lock(&stat_nodes_lock);
218   
219   service_statnode_add("total", NULL, NULL, NULL);
220 } /* }}} void service_statnode_begin */
221
222 static void service_statnode_end (void) /* {{{ */
223 {
224   pthread_mutex_unlock(&stat_nodes_lock);
225 } /* }}} void service_statnode_end */
226
227 static unsigned int service_statnode_collect (pinba_statres *res, /* {{{ */
228     unsigned int index)
229 {
230   pinba_statnode_t* node;
231   pinba_time_t delta;
232   
233   if (stat_nodes_num == 0)
234     return 0;
235   
236   /* begin collecting */
237   if (index == 0)
238     pthread_mutex_lock (&stat_nodes_lock);
239   
240   /* end collecting */
241   if (index >= stat_nodes_num)
242   {
243     pthread_mutex_unlock (&stat_nodes_lock);
244     return 0;
245   }
246   
247   node = stat_nodes + index;
248   delta = now() - node->last_coll;
249   
250   res->name = node->name;
251   res->req_per_sec = node->req_count / delta;
252   
253   if (node->req_count == 0)
254     node->req_count = 1;
255
256   res->req_time = node->req_time / node->req_count;
257   res->ru_utime = node->ru_utime / node->req_count;
258   res->ru_stime = node->ru_stime / node->req_count;
259   res->ru_stime = node->ru_stime / node->req_count;
260   res->doc_size = node->doc_size / node->req_count;
261   res->mem_peak = node->mem_peak / node->req_count;
262   
263   service_statnode_reset (node);
264   return (index + 1);
265 } /* }}} unsigned int service_statnode_collect */
266
267 static void service_statnode_process (pinba_statnode_t *node, /* {{{ */
268     Pinba__Request* request)
269 {
270   node->req_count++;
271   node->req_time+=request->request_time;
272   node->ru_utime+=request->ru_utime;
273   node->ru_stime+=request->ru_stime;
274   node->doc_size+=request->document_size;
275   node->mem_peak+=request->memory_peak;
276 } /* }}} void service_statnode_process */
277
278 static void service_process_request (Pinba__Request *request) /* {{{ */
279 {
280   unsigned int i;
281
282   pthread_mutex_lock (&stat_nodes_lock);
283   
284   for (i = 0; i < stat_nodes_num; i++)
285   {
286     if(stat_nodes[i].host && strcmp(request->hostname, stat_nodes[i].host))
287       continue;
288     if(stat_nodes[i].server && strcmp(request->server_name, stat_nodes[i].server))
289       continue;
290     if(stat_nodes[i].script && strcmp(request->script_name, stat_nodes[i].script))
291       continue;
292
293     service_statnode_process(&stat_nodes[i], request);
294   }
295   
296   pthread_mutex_unlock(&stat_nodes_lock);
297 } /* }}} void service_process_request */
298
299 static int pb_del_socket (pinba_socket_t *s, /* {{{ */
300     nfds_t index)
301 {
302   if (index >= s->fd_num)
303     return (EINVAL);
304
305   close (s->fd[index].fd);
306   s->fd[index].fd = -1;
307
308   /* When deleting the last element in the list, no memmove is necessary. */
309   if (index < (s->fd_num - 1))
310   {
311     memmove (&s->fd[index], &s->fd[index + 1],
312         sizeof (s->fd[0]) * (s->fd_num - (index + 1)));
313   }
314
315   s->fd_num--;
316   return (0);
317 } /* }}} int pb_del_socket */
318
319 static int pb_add_socket (pinba_socket_t *s, /* {{{ */
320     const struct addrinfo *ai)
321 {
322   int fd;
323   int tmp;
324   int status;
325
326   if (s->fd_num == PINBA_MAX_SOCKETS)
327   {
328     WARNING ("pinba plugin: Sorry, you have hit the built-in limit of "
329         "%i sockets. Please complain to the collectd developers so we can "
330         "raise the limit.", PINBA_MAX_SOCKETS);
331     return (-1);
332   }
333
334   fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol);
335   if (fd < 0)
336   {
337     char errbuf[1024];
338     ERROR ("pinba plugin: socket(2) failed: %s",
339         sstrerror (errno, errbuf, sizeof (errbuf)));
340     return (0);
341   }
342
343   tmp = 1;
344   status = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof (tmp));
345   if (status != 0)
346   {
347     char errbuf[1024];
348     WARNING ("pinba plugin: setsockopt(SO_REUSEADDR) failed: %s",
349         sstrerror (errno, errbuf, sizeof (errbuf)));
350   }
351
352   status = bind (fd, ai->ai_addr, ai->ai_addrlen);
353   if (status != 0)
354   {
355     char errbuf[1024];
356     ERROR ("pinba plugin: bind(2) failed: %s",
357         sstrerror (errno, errbuf, sizeof (errbuf)));
358     return (0);
359   }
360
361   s->fd[s->fd_num].fd = fd;
362   s->fd[s->fd_num].events = POLLIN | POLLPRI;
363   s->fd[s->fd_num].revents = 0;
364   s->fd_num++;
365
366   return (0);
367 } /* }}} int pb_add_socket */
368
369 static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */
370     const char *service)
371 {
372   pinba_socket_t *s;
373   struct addrinfo *ai_list;
374   struct addrinfo *ai_ptr;
375   struct addrinfo  ai_hints;
376   int status;
377
378   memset (&ai_hints, 0, sizeof (ai_hints));
379   ai_hints.ai_flags = AI_PASSIVE;
380   ai_hints.ai_family = AF_UNSPEC;
381   ai_hints.ai_socktype = SOCK_DGRAM;
382   ai_hints.ai_addr = NULL;
383   ai_hints.ai_canonname = NULL;
384   ai_hints.ai_next = NULL;
385
386   if (node == NULL)
387     node = PINBA_DEFAULT_NODE;
388
389   if (service == NULL)
390     service = PINBA_DEFAULT_SERVICE;
391
392   ai_list = NULL;
393   status = getaddrinfo (node, service,
394       &ai_hints, &ai_list);
395   if (status != 0)
396   {
397     ERROR ("pinba plugin: getaddrinfo(3) failed: %s",
398         gai_strerror (status));
399     return (NULL);
400   }
401   assert (ai_list != NULL);
402
403   s = malloc (sizeof (*s));
404   if (s != NULL)
405   {
406     freeaddrinfo (ai_list);
407     ERROR ("pinba plugin: malloc failed.");
408     return (NULL);
409   }
410   memset (s, 0, sizeof (*s));
411
412   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
413   {
414     status = pb_add_socket (s, ai_ptr);
415     if (status != 0)
416       break;
417   } /* for (ai_list) */
418   
419   freeaddrinfo (ai_list);
420
421   if (s->fd_num < 1)
422   {
423     WARNING ("pinba plugin: Unable to open socket for address %s.", node);
424     sfree (s);
425     s = NULL;
426   }
427
428   return (s);
429 } /* }}} pinba_socket_open */
430
431 static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */
432 {
433   nfds_t i;
434
435   if (!socket)
436     return;
437   
438   for (i = 0; i < socket->fd_num; i++)
439   {
440     if (socket->fd[i].fd < 0)
441       continue;
442     close (socket->fd[i].fd);
443     socket->fd[i].fd = -1;
444   }
445   
446   sfree(socket);
447 } /* }}} void pinba_socket_free */
448
449 static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */
450     size_t buffer_size)
451 {
452   Pinba__Request *request;  
453   
454   request = pinba__request__unpack (NULL, buffer_size, buffer);
455   
456   if (!request)
457     return (-1);
458
459   service_process_request(request);
460   pinba__request__free_unpacked (request, NULL);
461     
462   return (0);
463 } /* }}} int pinba_process_stats_packet */
464
465 static int pinba_udp_read_callback_fn (int sock) /* {{{ */
466 {
467   uint8_t buffer[PINBA_UDP_BUFFER_SIZE];
468   size_t buffer_size;
469   int status;
470
471   while (42)
472   {
473     buffer_size = sizeof (buffer);
474     status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0);
475     if (status < 0)
476     {
477       char errbuf[1024];
478
479       if ((errno == EINTR)
480 #ifdef EWOULDBLOCK
481           || (errno == EWOULDBLOCK)
482 #endif
483           || (errno == EAGAIN))
484       {
485         continue;
486       }
487
488       WARNING("pinba plugin: recvfrom(2) failed: %s",
489           sstrerror (errno, errbuf, sizeof (errbuf)));
490       return (-1);
491     }
492     else if (status == 0)
493     {
494       DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero.");
495       return (-1);
496     }
497     else /* if (status > 0) */
498     {
499       assert (((size_t) status) < buffer_size);
500       buffer_size = (size_t) status;
501       buffer[buffer_size] = 0;
502
503       status = pinba_process_stats_packet (buffer, buffer_size);
504       if (status != 0)
505         DEBUG("pinba plugin: Parsing packet failed.");
506       return (status);
507     }
508   } /* while (42) */
509
510   /* not reached */
511   assert (23 == 42);
512   return (-1);
513 } /* }}} void pinba_udp_read_callback_fn */
514
515 static int receive_loop (void) /* {{{ */
516 {
517   pinba_socket_t *s;
518
519   s = pinba_socket_open (conf_node, conf_service);
520   if (s == NULL)
521   {
522     ERROR ("pinba plugin: Collector thread is exiting prematurely.");
523     return (-1);
524   }
525
526   while (!collector_thread_do_shutdown)
527   {
528     int status;
529     nfds_t i;
530
531     if (s->fd_num < 1)
532       break;
533
534     status = poll (s->fd, s->fd_num, /* timeout = */ 1000);
535     if (status == 0) /* timeout */
536     {
537       continue;
538     }
539     else if (status < 0)
540     {
541       char errbuf[1024];
542
543       if ((errno == EINTR) || (errno == EAGAIN))
544         continue;
545
546       ERROR ("pinba plugin: poll(2) failed: %s",
547           sstrerror (errno, errbuf, sizeof (errbuf)));
548       pinba_socket_free (s);
549       return (-1);
550     }
551
552     for (i = 0; i < s->fd_num; i++)
553     {
554       if (s->fd[i].revents & (POLLERR | POLLHUP | POLLNVAL))
555       {
556         pb_del_socket (s, i);
557         i--;
558       }
559       else if (s->fd[i].revents & (POLLIN | POLLPRI))
560       {
561         pinba_udp_read_callback_fn (s->fd[i].fd);
562       }
563     } /* for (s->fd) */
564   } /* while (!collector_thread_do_shutdown) */
565
566   pinba_socket_free (s);
567   s = NULL;
568
569   return (0);
570 } /* }}} int receive_loop */
571
572 static void *collector_thread (void *arg) /* {{{ */
573 {
574   receive_loop ();
575
576   memset (&collector_thread_id, 0, sizeof (collector_thread_id));
577   collector_thread_running = 0;
578   pthread_exit (NULL);
579   return (NULL);
580 } /* }}} void *collector_thread */
581
582 /*
583  * Plugin declaration section
584  */
585
586 static int config_set (char **var, const char *value) /* {{{ */
587 {
588   /* code from nginx plugin for collectd */
589   if (*var != NULL) {
590     free (*var);
591     *var = NULL;
592   }
593   
594   if ((*var = strdup (value)) == NULL) return (1);
595   else return (0);
596 } /* }}} int config_set */
597
598 static int plugin_config (oconfig_item_t *ci) /* {{{ */
599 {
600   unsigned int i, o;
601   
602   service_statnode_begin();
603   
604   for (i = 0; i < ci->children_num; i++) {
605     oconfig_item_t *child = ci->children + i;
606     if (strcasecmp ("Address", child->key) == 0) {
607       if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING)){
608         WARNING ("pinba plugin: `Address' needs exactly one string argument.");
609         return (-1);
610       }
611       config_set(&conf_node, child->values[0].value.string);
612     } else if (strcasecmp ("Port", child->key) == 0) {
613       if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING)){
614         WARNING ("pinba plugin: `Port' needs exactly one string argument.");
615         return (-1);
616       }
617       config_set(&conf_service, child->values[0].value.string);
618     } else if (strcasecmp ("View", child->key) == 0) {
619       const char *name=NULL, *host=NULL, *server=NULL, *script=NULL;
620       if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING) || strlen(child->values[0].value.string)==0){
621         WARNING ("pinba plugin: `View' needs exactly one non-empty string argument.");
622         return (-1);
623       }
624       name = child->values[0].value.string;
625       for(o=0; o<child->children_num; o++){
626         oconfig_item_t *node = child->children + o;
627         if (strcasecmp ("Host", node->key) == 0) {
628           if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){
629             WARNING ("pinba plugin: `View->Host' needs exactly one non-empty string argument.");
630             return (-1);
631           }
632           host = node->values[0].value.string;
633         } else if (strcasecmp ("Server", node->key) == 0) {
634           if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){
635             WARNING ("pinba plugin: `View->Server' needs exactly one non-empty string argument.");
636             return (-1);
637           }
638           server = node->values[0].value.string;
639         } else if (strcasecmp ("Script", node->key) == 0) {
640           if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){
641             WARNING ("pinba plugin: `View->Script' needs exactly one non-empty string argument.");
642             return (-1);
643           }
644           script = node->values[0].value.string;
645         } else {
646           WARNING ("pinba plugin: In `<View>' context allowed only `Host', `Server' and `Script' options but not the `%s'.", node->key);
647           return (-1);
648         }
649       }
650       /* add new statnode */
651       service_statnode_add(name, host, server, script);
652     } else {
653       WARNING ("pinba plugin: In `<Plugin pinba>' context allowed only `Address', `Port' and `Observe' options but not the `%s'.", child->key);
654       return (-1);
655     }
656   }
657   
658   service_statnode_end();
659   
660   return (0);
661 } /* int pinba_config */
662
663 static int plugin_init (void) /* {{{ */
664 {
665   int status;
666
667   if (collector_thread_running)
668     return (0);
669
670   status = pthread_create (&collector_thread_id,
671       /* attrs = */ NULL,
672       collector_thread,
673       /* args = */ NULL);
674   if (status != 0)
675   {
676     char errbuf[1024];
677     ERROR ("pinba plugin: pthread_create(3) failed: %s",
678         sstrerror (errno, errbuf, sizeof (errbuf)));
679     return (-1);
680   }
681   collector_thread_running = 1;
682
683   return (0);
684 } /* }}} */
685
686 static int plugin_shutdown (void) /* {{{ */
687 {
688   if (collector_thread_running)
689   {
690     int status;
691
692     DEBUG ("pinba plugin: Shutting down collector thread.");
693     collector_thread_do_shutdown = 1;
694
695     status = pthread_join (collector_thread_id, /* retval = */ NULL);
696     if (status != 0)
697     {
698       char errbuf[1024];
699       ERROR ("pinba plugin: pthread_join(3) failed: %s",
700           sstrerror (status, errbuf, sizeof (errbuf)));
701     }
702
703     collector_thread_running = 0;
704     collector_thread_do_shutdown = 0;
705   } /* if (collector_thread_running) */
706
707   return (0);
708 } /* }}} int plugin_shutdown */
709
710 static int plugin_submit (const char *plugin_instance, /* {{{ */
711                const char *type,
712                const pinba_statres *res) {
713   value_t values[6];
714   value_list_t vl = VALUE_LIST_INIT;
715   
716   values[0].gauge = res->req_per_sec;
717   values[1].gauge = res->req_time;
718   values[2].gauge = res->ru_utime;
719   values[3].gauge = res->ru_stime;
720   values[4].gauge = res->doc_size;
721   values[5].gauge = res->mem_peak;
722   
723   vl.values = values;
724   vl.values_len = 6;
725   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
726   sstrncpy (vl.plugin, "pinba", sizeof (vl.plugin));
727   sstrncpy (vl.plugin_instance, plugin_instance,
728             sizeof(vl.plugin_instance));
729   sstrncpy (vl.type, type, sizeof (vl.type));
730   INFO("Pinba Dispatch");
731   plugin_dispatch_values (&vl);
732
733   return (0);
734 } /* }}} int plugin_submit */
735
736 static int plugin_read (void) /* {{{ */
737 {
738   unsigned int i=0;
739   static pinba_statres res;
740   
741   while ((i = service_statnode_collect (&res, i)) != 0)
742   {
743     plugin_submit(res.name, "pinba_view", &res);
744   }
745   
746   return 0;
747 } /* }}} int plugin_read */
748
749 void module_register (void) /* {{{ */
750 {
751   plugin_register_complex_config ("pinba", plugin_config);
752   plugin_register_init ("pinba", plugin_init);
753   plugin_register_read ("pinba", plugin_read);
754   plugin_register_shutdown ("pinba", plugin_shutdown);
755 } /* }}} void module_register */
756
757 /* vim: set sw=2 sts=2 et fdm=marker : */