c1bc108f97c319d3f6e3663bc037925587d01b42
[collectd.git] / src / pinba.c
1 /**
2  * collectd - src/pinba.c (based on code from pinba_engine 0.0.5)
3  * Copyright (c) 2007-2009  Antony Dovgal
4  * Copyright (C) 2010       Phoenix Kayo
5  * Copyright (C) 2010       Florian Forster
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Antony Dovgal <tony at daylessday.org>
22  *   Phoenix Kayo <kayo.k11.4 at gmail.com>
23  *   Florian Forster <octo at verplant.org>
24  **/
25
26 #include "collectd.h"
27 #include "common.h"
28 #include "plugin.h"
29 #include "configfile.h"
30
31 #include <pthread.h>
32 #include <sys/socket.h>
33 #include <netdb.h>
34 #include <poll.h>
35
36 #include "pinba.pb-c.h"
37
38 /*
39  *  Service declaration section
40  */
41 #ifndef PINBA_UDP_BUFFER_SIZE
42 # define PINBA_UDP_BUFFER_SIZE 65536
43 #endif
44
45 #ifndef PINBA_DEFAULT_ADDRESS
46 # define PINBA_DEFAULT_ADDRESS "127.0.0.1" /* FIXME */
47 #endif
48
49 #ifndef PINBA_DEFAULT_PORT
50 # define PINBA_DEFAULT_PORT 12345 /* FIXME */
51 #endif
52
53 #ifndef PINBA_MAX_SOCKETS
54 # define PINBA_MAX_SOCKETS 16
55 #endif
56
57 #ifndef NI_MAXSERV
58 # define NI_MAXSERV 32
59 #endif
60
61 /*
62  * Private data structures
63  */
64 typedef struct _pinba_statres_ pinba_statres;
65 struct _pinba_statres_ {
66   const char *name;
67   double req_per_sec;
68   double req_time;
69   double ru_utime;
70   double ru_stime;
71   double doc_size;
72   double mem_peak;
73 };
74
75 struct pinba_socket_s {
76   struct pollfd fd[PINBA_MAX_SOCKETS];
77   nfds_t fd_num;
78 };
79 typedef struct pinba_socket_s pinba_socket_t;
80
81 typedef double pinba_time_t;
82 typedef uint32_t pinba_size_t;
83
84 typedef struct pinba_statnode_s pinba_statnode_t;
85 struct pinba_statnode_s
86 {
87   /* collector name */
88   char *name;
89   /* query data */
90   char *host;
91   char *server;
92   char *script;
93   /* collected data */
94   pinba_time_t last_coll;
95   pinba_size_t req_count;
96   pinba_time_t req_time;
97   pinba_time_t ru_utime;
98   pinba_time_t ru_stime;
99   pinba_size_t doc_size;
100   pinba_size_t mem_peak;
101 };
102
103 static pinba_statnode_t *stat_nodes = NULL;
104 static unsigned int stat_nodes_num = 0;
105 static pthread_mutex_t stat_nodes_lock;
106
107 char service_status=0;
108 char *service_address = PINBA_DEFAULT_ADDRESS;
109 unsigned int service_port=PINBA_DEFAULT_PORT;
110
111 static _Bool collector_thread_running = 0;
112 static _Bool collector_thread_do_shutdown = 0;
113 static pthread_t collector_thread_id;
114
115 static pinba_time_t now (void) /* {{{ */
116 {
117   static struct timeval tv;
118   
119   gettimeofday (&tv, /* tz = */ NULL);
120   
121   return (double)tv.tv_sec+((double)tv.tv_usec/(double)1000000);
122 } /* }}} */
123
124 static void service_statnode_reset (pinba_statnode_t *node) /* {{{ */
125 {
126   node->last_coll=now();
127   node->req_count=0;
128   node->req_time=0.0;
129   node->ru_utime=0.0;
130   node->ru_stime=0.0;
131   node->doc_size=0;
132   node->mem_peak=0;
133 } /* }}} void service_statnode_reset */
134
135 static void strset (char **str, const char *new) /* {{{ */
136 {
137   char *tmp;
138
139   if (!str || !new)
140     return;
141
142   tmp = strdup (new);
143   if (tmp == NULL)
144     return;
145
146   sfree (*str);
147   *str = tmp;
148 } /* }}} void strset */
149
150 static void service_statnode_add(const char *name, /* {{{ */
151     const char *host,
152     const char *server,
153     const char *script)
154 {
155   pinba_statnode_t *node;
156   DEBUG("adding node `%s' to collector { %s, %s, %s }", name, host?host:"", server?server:"", script?script:"");
157   
158   stat_nodes=realloc(stat_nodes, sizeof(pinba_statnode_t)*(stat_nodes_num+1));
159   if(!stat_nodes){
160     ERROR("Realloc failed!");
161     exit(-1);
162   }
163   
164   node=&stat_nodes[stat_nodes_num];
165   
166   /* reset stat data */
167   service_statnode_reset(node);
168   
169   /* reset strings */
170   node->name=NULL;
171   node->host=NULL;
172   node->server=NULL;
173   node->script=NULL;
174   
175   /* fill query data */
176   strset(&node->name, name);
177   strset(&node->host, host);
178   strset(&node->server, server);
179   strset(&node->script, script);
180   
181   /* increment counter */
182   stat_nodes_num++;
183 } /* }}} void service_statnode_add */
184
185 static void service_statnode_free (void)
186 {
187   unsigned int i;
188
189   if(stat_nodes_num < 1)
190     return;
191
192   for (i = 0; i < stat_nodes_num; i++)
193   {
194     sfree (stat_nodes[i].name);
195     sfree (stat_nodes[i].host);
196     sfree (stat_nodes[i].server);
197     sfree (stat_nodes[i].script);
198   }
199
200   sfree (stat_nodes);
201   stat_nodes_num = 0;
202
203   pthread_mutex_destroy (&stat_nodes_lock);
204 }
205
206 static void service_statnode_init (void)
207 {
208   /* only total info collect by default */
209   service_statnode_free();
210   
211   DEBUG("initializing collector..");
212   pthread_mutex_init(&stat_nodes_lock, 0);
213 }
214
215 static void service_statnode_begin (void)
216 {
217   service_statnode_init();
218   pthread_mutex_lock(&stat_nodes_lock);
219   
220   service_statnode_add("total", NULL, NULL, NULL);
221 }
222
223 static void service_statnode_end (void)
224 {
225   pthread_mutex_unlock(&stat_nodes_lock);
226 }
227
228 static unsigned int service_statnode_collect (pinba_statres *res, /* {{{ */
229     unsigned int index)
230 {
231   pinba_statnode_t* node;
232   pinba_time_t delta;
233   
234   if (stat_nodes_num == 0)
235     return 0;
236   
237   /* begin collecting */
238   if (index == 0)
239     pthread_mutex_lock (&stat_nodes_lock);
240   
241   /* end collecting */
242   if (index >= stat_nodes_num)
243   {
244     pthread_mutex_unlock (&stat_nodes_lock);
245     return 0;
246   }
247   
248   node = stat_nodes + index;
249   delta = now() - node->last_coll;
250   
251   res->name = node->name;
252   res->req_per_sec = node->req_count / delta;
253   
254   if (node->req_count == 0)
255     node->req_count = 1;
256
257   res->req_time = node->req_time / node->req_count;
258   res->ru_utime = node->ru_utime / node->req_count;
259   res->ru_stime = node->ru_stime / node->req_count;
260   res->ru_stime = node->ru_stime / node->req_count;
261   res->doc_size = node->doc_size / node->req_count;
262   res->mem_peak = node->mem_peak / node->req_count;
263   
264   service_statnode_reset (node);
265   return (index + 1);
266 } /* }}} unsigned int service_statnode_collect */
267
268 static void service_statnode_process (pinba_statnode_t *node,
269     Pinba__Request* request)
270 {
271   node->req_count++;
272   node->req_time+=request->request_time;
273   node->ru_utime+=request->ru_utime;
274   node->ru_stime+=request->ru_stime;
275   node->doc_size+=request->document_size;
276   node->mem_peak+=request->memory_peak;
277 }
278
279 static void service_process_request (Pinba__Request *request)
280 {
281   unsigned int i;
282
283   pthread_mutex_lock (&stat_nodes_lock);
284   
285   for (i = 0; i < stat_nodes_num; i++)
286   {
287     if(stat_nodes[i].host && strcmp(request->hostname, stat_nodes[i].host))
288       continue;
289     if(stat_nodes[i].server && strcmp(request->server_name, stat_nodes[i].server))
290       continue;
291     if(stat_nodes[i].script && strcmp(request->script_name, stat_nodes[i].script))
292       continue;
293
294     service_statnode_process(&stat_nodes[i], request);
295   }
296   
297   pthread_mutex_unlock(&stat_nodes_lock);
298 }
299
300 static int pb_del_socket (pinba_socket_t *s, /* {{{ */
301     nfds_t index)
302 {
303   if (index >= s->fd_num)
304     return (EINVAL);
305
306   close (s->fd[index].fd);
307   s->fd[index].fd = -1;
308
309   /* When deleting the last element in the list, no memmove is necessary. */
310   if (index < (s->fd_num - 1))
311   {
312     memmove (&s->fd[index], &s->fd[index + 1],
313         sizeof (s->fd[0]) * (s->fd_num - (index + 1)));
314   }
315
316   s->fd_num--;
317   return (0);
318 } /* }}} int pb_del_socket */
319
320 static int pb_add_socket (pinba_socket_t *s, /* {{{ */
321     const struct addrinfo *ai)
322 {
323   int fd;
324   int tmp;
325   int status;
326
327   if (s->fd_num == PINBA_MAX_SOCKETS)
328   {
329     WARNING ("pinba plugin: Sorry, you have hit the built-in limit of "
330         "%i sockets. Please complain to the collectd developers so we can "
331         "raise the limit.", PINBA_MAX_SOCKETS);
332     return (-1);
333   }
334
335   fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol);
336   if (fd < 0)
337   {
338     char errbuf[1024];
339     ERROR ("pinba plugin: socket(2) failed: %s",
340         sstrerror (errno, errbuf, sizeof (errbuf)));
341     return (0);
342   }
343
344   tmp = 1;
345   status = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof (tmp));
346   if (status != 0)
347   {
348     char errbuf[1024];
349     WARNING ("pinba plugin: setsockopt(SO_REUSEADDR) failed: %s",
350         sstrerror (errno, errbuf, sizeof (errbuf)));
351   }
352
353   status = bind (fd, ai->ai_addr, ai->ai_addrlen);
354   if (status != 0)
355   {
356     char errbuf[1024];
357     ERROR ("pinba plugin: bind(2) failed: %s",
358         sstrerror (errno, errbuf, sizeof (errbuf)));
359     return (0);
360   }
361
362   s->fd[s->fd_num].fd = fd;
363   s->fd[s->fd_num].events = POLLIN | POLLPRI;
364   s->fd[s->fd_num].revents = 0;
365   s->fd_num++;
366
367   return (0);
368 } /* }}} int pb_add_socket */
369
370 static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */
371     int listen_port)
372 {
373   pinba_socket_t *s;
374   struct addrinfo *ai_list;
375   struct addrinfo *ai_ptr;
376   struct addrinfo  ai_hints;
377   int status;
378
379   char service[NI_MAXSERV]; /* FIXME */
380   snprintf (service, sizeof (service), "%i", listen_port);
381
382   memset (&ai_hints, 0, sizeof (ai_hints));
383   ai_hints.ai_flags = AI_PASSIVE;
384   ai_hints.ai_family = AF_UNSPEC;
385   ai_hints.ai_socktype = SOCK_DGRAM;
386   ai_hints.ai_addr = NULL;
387   ai_hints.ai_canonname = NULL;
388   ai_hints.ai_next = NULL;
389
390   ai_list = NULL;
391   status = getaddrinfo (node, service,
392       &ai_hints, &ai_list);
393   if (status != 0)
394   {
395     ERROR ("pinba plugin: getaddrinfo(3) failed: %s",
396         gai_strerror (status));
397     return (NULL);
398   }
399   assert (ai_list != NULL);
400
401   s = malloc (sizeof (*s));
402   if (s != NULL)
403   {
404     freeaddrinfo (ai_list);
405     ERROR ("pinba plugin: malloc failed.");
406     return (NULL);
407   }
408   memset (s, 0, sizeof (*s));
409
410   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
411   {
412     status = pb_add_socket (s, ai_ptr);
413     if (status != 0)
414       break;
415   } /* for (ai_list) */
416   
417   freeaddrinfo (ai_list);
418
419   if (s->fd_num < 1)
420   {
421     WARNING ("pinba plugin: Unable to open socket for address %s.", node);
422     sfree (s);
423     s = NULL;
424   }
425
426   return (s);
427 } /* }}} pinba_socket_open */
428
429 static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */
430 {
431   nfds_t i;
432
433   if (!socket)
434     return;
435   
436   for (i = 0; i < socket->fd_num; i++)
437   {
438     if (socket->fd[i].fd < 0)
439       continue;
440     close (socket->fd[i].fd);
441     socket->fd[i].fd = -1;
442   }
443   
444   sfree(socket);
445 } /* }}} void pinba_socket_free */
446
447 static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */
448     size_t buffer_size)
449 {
450   Pinba__Request *request;  
451   
452   request = pinba__request__unpack (NULL, buffer_size, buffer);
453   
454   if (!request)
455     return (-1);
456
457   service_process_request(request);
458   pinba__request__free_unpacked (request, NULL);
459     
460   return (0);
461 } /* }}} int pinba_process_stats_packet */
462
463 static int pinba_udp_read_callback_fn (int sock) /* {{{ */
464 {
465   uint8_t buffer[PINBA_UDP_BUFFER_SIZE];
466   size_t buffer_size;
467   int status;
468
469   while (42)
470   {
471     buffer_size = sizeof (buffer);
472     status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0);
473     if (status < 0)
474     {
475       char errbuf[1024];
476
477       if ((errno == EINTR)
478 #ifdef EWOULDBLOCK
479           || (errno == EWOULDBLOCK)
480 #endif
481           || (errno == EAGAIN))
482       {
483         continue;
484       }
485
486       WARNING("pinba plugin: recvfrom(2) failed: %s",
487           sstrerror (errno, errbuf, sizeof (errbuf)));
488       return (-1);
489     }
490     else if (status == 0)
491     {
492       DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero.");
493       return (-1);
494     }
495     else /* if (status > 0) */
496     {
497       assert (((size_t) status) < buffer_size);
498       buffer_size = (size_t) status;
499       buffer[buffer_size] = 0;
500
501       status = pinba_process_stats_packet (buffer, buffer_size);
502       if (status != 0)
503         DEBUG("pinba plugin: Parsing packet failed.");
504       return (status);
505     }
506   } /* while (42) */
507
508   /* not reached */
509   assert (23 == 42);
510   return (-1);
511 } /* }}} void pinba_udp_read_callback_fn */
512
513 static int receive_loop (void) /* {{{ */
514 {
515   pinba_socket_t *s;
516
517   s = pinba_socket_open (service_address, service_port);
518   if (s == NULL)
519   {
520     ERROR ("pinba plugin: Collector thread is exiting prematurely.");
521     return (-1);
522   }
523
524   while (!collector_thread_do_shutdown)
525   {
526     int status;
527     nfds_t i;
528
529     if (s->fd_num < 1)
530       break;
531
532     status = poll (s->fd, s->fd_num, /* timeout = */ 1000);
533     if (status == 0) /* timeout */
534     {
535       continue;
536     }
537     else if (status < 0)
538     {
539       char errbuf[1024];
540
541       if ((errno == EINTR) || (errno == EAGAIN))
542         continue;
543
544       ERROR ("pinba plugin: poll(2) failed: %s",
545           sstrerror (errno, errbuf, sizeof (errbuf)));
546       pinba_socket_free (s);
547       return (-1);
548     }
549
550     for (i = 0; i < s->fd_num; i++)
551     {
552       if (s->fd[i].revents & (POLLERR | POLLHUP | POLLNVAL))
553       {
554         pb_del_socket (s, i);
555         i--;
556       }
557       else if (s->fd[i].revents & (POLLIN | POLLPRI))
558       {
559         pinba_udp_read_callback_fn (s->fd[i].fd);
560       }
561     } /* for (s->fd) */
562   } /* while (!collector_thread_do_shutdown) */
563
564   pinba_socket_free (s);
565   s = NULL;
566
567   return (0);
568 } /* }}} int receive_loop */
569
570 static void *collector_thread (void *arg) /* {{{ */
571 {
572   receive_loop ();
573
574   memset (&collector_thread_id, 0, sizeof (collector_thread_id));
575   collector_thread_running = 0;
576   pthread_exit (NULL);
577   return (NULL);
578 } /* }}} void *collector_thread */
579
580 /*
581  * Plugin declaration section
582  */
583
584 static int config_set (char **var, const char *value)
585 {
586   /* code from nginx plugin for collectd */
587   if (*var != NULL) {
588     free (*var);
589     *var = NULL;
590   }
591   
592   if ((*var = strdup (value)) == NULL) return (1);
593   else return (0);
594 }
595
596 static int plugin_config (oconfig_item_t *ci)
597 {
598   unsigned int i, o;
599   int pinba_port = 0;
600   char *pinba_address = NULL;
601   
602   INFO("Pinba Configure..");
603   
604   service_statnode_begin();
605   
606   /* Set default values */
607   config_set(&pinba_address, PINBA_DEFAULT_ADDRESS);
608   pinba_port = PINBA_DEFAULT_PORT;
609   
610   for (i = 0; i < ci->children_num; i++) {
611     oconfig_item_t *child = ci->children + i;
612     if (strcasecmp ("Address", child->key) == 0) {
613       if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING)){
614         WARNING ("pinba plugin: `Address' needs exactly one string argument.");
615         return (-1);
616       }
617       config_set(&pinba_address, child->values[0].value.string);
618     } else if (strcasecmp ("Port", child->key) == 0) {
619       if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_NUMBER)){
620         WARNING ("pinba plugin: `Port' needs exactly one number argument.");
621         return (-1);
622       }
623       pinba_port=child->values[0].value.number;
624     } else if (strcasecmp ("View", child->key) == 0) {
625       const char *name=NULL, *host=NULL, *server=NULL, *script=NULL;
626       if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING) || strlen(child->values[0].value.string)==0){
627         WARNING ("pinba plugin: `View' needs exactly one non-empty string argument.");
628         return (-1);
629       }
630       name = child->values[0].value.string;
631       for(o=0; o<child->children_num; o++){
632         oconfig_item_t *node = child->children + o;
633         if (strcasecmp ("Host", node->key) == 0) {
634           if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){
635             WARNING ("pinba plugin: `View->Host' needs exactly one non-empty string argument.");
636             return (-1);
637           }
638           host = node->values[0].value.string;
639         } else if (strcasecmp ("Server", node->key) == 0) {
640           if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){
641             WARNING ("pinba plugin: `View->Server' needs exactly one non-empty string argument.");
642             return (-1);
643           }
644           server = node->values[0].value.string;
645         } else if (strcasecmp ("Script", node->key) == 0) {
646           if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){
647             WARNING ("pinba plugin: `View->Script' needs exactly one non-empty string argument.");
648             return (-1);
649           }
650           script = node->values[0].value.string;
651         } else {
652           WARNING ("pinba plugin: In `<View>' context allowed only `Host', `Server' and `Script' options but not the `%s'.", node->key);
653           return (-1);
654         }
655       }
656       /* add new statnode */
657       service_statnode_add(name, host, server, script);
658     } else {
659       WARNING ("pinba plugin: In `<Plugin pinba>' context allowed only `Address', `Port' and `Observe' options but not the `%s'.", child->key);
660       return (-1);
661     }
662   }
663   
664   service_statnode_end();
665   
666   return (0);
667 } /* int pinba_config */
668
669 static int plugin_init (void) /* {{{ */
670 {
671   int status;
672
673   if (collector_thread_running)
674     return (0);
675
676   status = pthread_create (&collector_thread_id,
677       /* attrs = */ NULL,
678       collector_thread,
679       /* args = */ NULL);
680   if (status != 0)
681   {
682     char errbuf[1024];
683     ERROR ("pinba plugin: pthread_create(3) failed: %s",
684         sstrerror (errno, errbuf, sizeof (errbuf)));
685     return (-1);
686   }
687   collector_thread_running = 1;
688
689   return (0);
690 } /* }}} */
691
692 static int plugin_shutdown (void) /* {{{ */
693 {
694   if (collector_thread_running)
695   {
696     int status;
697
698     DEBUG ("pinba plugin: Shutting down collector thread.");
699     collector_thread_do_shutdown = 1;
700
701     status = pthread_join (collector_thread_id, /* retval = */ NULL);
702     if (status != 0)
703     {
704       char errbuf[1024];
705       ERROR ("pinba plugin: pthread_join(3) failed: %s",
706           sstrerror (status, errbuf, sizeof (errbuf)));
707     }
708
709     collector_thread_running = 0;
710     collector_thread_do_shutdown = 0;
711   } /* if (collector_thread_running) */
712
713   return (0);
714 } /* }}} int plugin_shutdown */
715
716 static int plugin_submit (const char *plugin_instance,
717                const char *type,
718                const pinba_statres *res) {
719   value_t values[6];
720   value_list_t vl = VALUE_LIST_INIT;
721   
722   values[0].gauge = res->req_per_sec;
723   values[1].gauge = res->req_time;
724   values[2].gauge = res->ru_utime;
725   values[3].gauge = res->ru_stime;
726   values[4].gauge = res->doc_size;
727   values[5].gauge = res->mem_peak;
728   
729   vl.values = values;
730   vl.values_len = 6;
731   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
732   sstrncpy (vl.plugin, "pinba", sizeof (vl.plugin));
733   sstrncpy (vl.plugin_instance, plugin_instance,
734             sizeof(vl.plugin_instance));
735   sstrncpy (vl.type, type, sizeof (vl.type));
736   INFO("Pinba Dispatch");
737   plugin_dispatch_values (&vl);
738
739   return (0);
740 }
741
742 static int plugin_read (void)
743 {
744   unsigned int i=0;
745   static pinba_statres res;
746   
747   while ((i = service_statnode_collect (&res, i)) != 0)
748   {
749     plugin_submit(res.name, "pinba_view", &res);
750   }
751   
752   return 0;
753 }
754
755 void module_register (void)
756 {
757   plugin_register_complex_config ("pinba", plugin_config);
758   plugin_register_init ("pinba", plugin_init);
759   plugin_register_read ("pinba", plugin_read);
760   plugin_register_shutdown ("pinba", plugin_shutdown);
761 } /* void module_register */
762
763 /* vim: set sw=2 sts=2 et fdm=marker : */