gmond plugin: Add a plugin to receive Ganglia multicast traffic.
[collectd.git] / src / gmond.c
1 /**
2  * collectd - src/gmond.c
3  * Copyright (C) 2005-2009  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_avltree.h"
27
28 #include "network.h"
29
30 #if HAVE_PTHREAD_H
31 # include <pthread.h>
32 #endif
33 #if HAVE_SYS_SOCKET_H
34 # include <sys/socket.h>
35 #endif
36 #if HAVE_NETDB_H
37 # include <netdb.h>
38 #endif
39 #if HAVE_NETINET_IN_H
40 # include <netinet/in.h>
41 #endif
42 #if HAVE_ARPA_INET_H
43 # include <arpa/inet.h>
44 #endif
45 #if HAVE_POLL_H
46 # include <poll.h>
47 #endif
48
49 #include <gm_protocol.h>
50
51 #ifndef IPV6_ADD_MEMBERSHIP
52 # ifdef IPV6_JOIN_GROUP
53 #  define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
54 # else
55 #  error "Neither IP_ADD_MEMBERSHIP nor IPV6_JOIN_GROUP is defined"
56 # endif
57 #endif /* !IP_ADD_MEMBERSHIP */
58
59 #ifdef GANGLIA_MAX_MESSAGE_LEN
60 # define BUFF_SIZE GANGLIA_MAX_MESSAGE_LEN
61 #else
62 # define BUFF_SIZE 1400
63 #endif
64
65 struct socket_entry_s
66 {
67   int                     fd;
68   struct sockaddr_storage addr;
69   socklen_t               addrlen;
70 };
71 typedef struct socket_entry_s socket_entry_t;
72
73 struct staging_entry_s
74 {
75   char key[2 * DATA_MAX_NAME_LEN];
76   value_list_t vl;
77   int flags;
78 };
79 typedef struct staging_entry_s staging_entry_t;
80
81 struct metric_map_s
82 {
83   const char *ganglia_name;
84   const char *type;
85   const char *type_instance;
86   int ds_type;
87   size_t value_index;
88 };
89 typedef struct metric_map_s metric_map_t;
90
91 static struct pollfd *mc_receive_sockets = NULL;
92 static size_t         mc_receive_sockets_num = 0;
93
94 static socket_entry_t  *mc_send_sockets = NULL;
95 static size_t           mc_send_sockets_num = 0;
96 static pthread_mutex_t  mc_send_sockets_lock = PTHREAD_MUTEX_INITIALIZER;
97
98 static int            mc_receive_thread_loop    = 0;
99 static int            mc_receive_thread_running = 0;
100 static pthread_t      mc_receive_thread_id;
101
102 static metric_map_t metric_map[] =
103 {
104   { "load_one",     "load",       "",         DS_TYPE_GAUGE,   0 },
105   { "load_five",    "load",       "",         DS_TYPE_GAUGE,   1 },
106   { "load_fifteen", "load",       "",         DS_TYPE_GAUGE,   2 },
107   { "cpu_user",     "cpu",        "user",     DS_TYPE_COUNTER, 0 },
108   { "cpu_system",   "cpu",        "system",   DS_TYPE_COUNTER, 0 },
109   { "cpu_idle",     "cpu",        "idle",     DS_TYPE_COUNTER, 0 },
110   { "cpu_nice",     "cpu",        "nice",     DS_TYPE_COUNTER, 0 },
111   { "cpu_wio",      "cpu",        "wait",     DS_TYPE_COUNTER, 0 },
112   { "mem_free",     "memory",     "free",     DS_TYPE_GAUGE,   0 },
113   { "mem_shared",   "memory",     "shared",   DS_TYPE_GAUGE,   0 },
114   { "mem_buffers",  "memory",     "buffered", DS_TYPE_GAUGE,   0 },
115   { "mem_cached",   "memory",     "cached",   DS_TYPE_GAUGE,   0 },
116   { "mem_total",    "memory",     "total",    DS_TYPE_GAUGE,   0 },
117   { "bytes_in",     "if_octets",  "",         DS_TYPE_COUNTER, 0 },
118   { "bytes_out",    "if_octets",  "",         DS_TYPE_COUNTER, 1 },
119   { "pkts_in",      "if_packets", "",         DS_TYPE_COUNTER, 0 },
120   { "pkts_out",     "if_packets", "",         DS_TYPE_COUNTER, 1 }
121 };
122 static size_t metric_map_len = STATIC_ARRAY_SIZE (metric_map);
123
124 static c_avl_tree_t   *staging_tree;
125 static pthread_mutex_t staging_lock = PTHREAD_MUTEX_INITIALIZER;
126
127 static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */
128     size_t *ret_sockets_num,
129     const char *node, const char *service, int listen)
130 {
131   struct addrinfo  ai_hints;
132   struct addrinfo *ai_list;
133   struct addrinfo *ai_ptr;
134   int              ai_return;
135
136   socket_entry_t *sockets;
137   size_t          sockets_num;
138
139   int status;
140     
141   sockets     = *ret_sockets;
142   sockets_num = *ret_sockets_num;
143
144   memset (&ai_hints, 0, sizeof (ai_hints));
145   ai_hints.ai_flags    = 0;
146 #ifdef AI_PASSIVE
147   ai_hints.ai_flags |= AI_PASSIVE;
148 #endif
149 #ifdef AI_ADDRCONFIG
150   ai_hints.ai_flags |= AI_ADDRCONFIG;
151 #endif
152   ai_hints.ai_family   = AF_UNSPEC;
153   ai_hints.ai_socktype = SOCK_DGRAM;
154   ai_hints.ai_protocol = IPPROTO_UDP;
155
156   ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
157   if (ai_return != 0)
158   {
159     char errbuf[1024];
160     ERROR ("gmond plugin: getaddrinfo (%s, %s) failed: %s",
161         (node == NULL) ? "(null)" : node,
162         (service == NULL) ? "(null)" : service,
163         (ai_return == EAI_SYSTEM)
164         ? sstrerror (errno, errbuf, sizeof (errbuf))
165         : gai_strerror (ai_return));
166     return (-1);
167   }
168
169   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) /* {{{ */
170   {
171     socket_entry_t *tmp;
172
173     tmp = realloc (sockets, (sockets_num + 1) * sizeof (*sockets));
174     if (tmp == NULL)
175     {
176       ERROR ("gmond plugin: realloc failed.");
177       continue;
178     }
179     sockets = tmp;
180
181     sockets[sockets_num].fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
182         ai_ptr->ai_protocol);
183     if (sockets[sockets_num].fd < 0)
184     {
185       char errbuf[1024];
186       ERROR ("gmond plugin: socket failed: %s",
187           sstrerror (errno, errbuf, sizeof (errbuf)));
188       continue;
189     }
190
191     assert (sizeof (sockets[sockets_num].addr) >= ai_ptr->ai_addrlen);
192     memcpy (&sockets[sockets_num].addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
193     sockets[sockets_num].addrlen = ai_ptr->ai_addrlen;
194
195     /* Sending socket: Open only one socket and don't bind it. */
196     if (listen == 0)
197     {
198       sockets_num++;
199       break;
200     }
201     else
202     {
203       int yes = 1;
204
205       setsockopt (sockets[sockets_num].fd, SOL_SOCKET, SO_REUSEADDR,
206           (void *) &yes, sizeof (yes));
207     }
208
209     status = bind (sockets[sockets_num].fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
210     if (status != 0)
211     {
212       char errbuf[1024];
213       ERROR ("gmond plugin: bind failed: %s",
214           sstrerror (errno, errbuf, sizeof (errbuf)));
215       close (sockets[sockets_num].fd);
216       continue;
217     }
218
219     if (ai_ptr->ai_family == AF_INET)
220     {
221       struct sockaddr_in *addr;
222       struct ip_mreq mreq;
223       int loop;
224
225       addr = (struct sockaddr_in *) ai_ptr->ai_addr;
226
227       if (!IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
228       {
229         sockets_num++;
230         continue;
231       }
232
233       loop = 1;
234       setsockopt (sockets[sockets_num].fd, IPPROTO_IP, IP_MULTICAST_LOOP,
235           (void *) &loop, sizeof (loop));
236
237       memset (&mreq, 0, sizeof (mreq));
238       mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr;
239       mreq.imr_interface.s_addr = htonl (INADDR_ANY);
240       setsockopt (sockets[sockets_num].fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
241           (void *) &mreq, sizeof (mreq));
242     } /* if (ai_ptr->ai_family == AF_INET) */
243     else if (ai_ptr->ai_family == AF_INET6)
244     {
245       struct sockaddr_in6 *addr;
246       struct ipv6_mreq mreq;
247       int loop;
248
249       addr = (struct sockaddr_in6 *) ai_ptr->ai_addr;
250
251       if (!IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
252       {
253         sockets_num++;
254         continue;
255       }
256
257       loop = 1;
258       setsockopt (sockets[sockets_num].fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
259           (void *) &loop, sizeof (loop));
260
261       memset (&mreq, 0, sizeof (mreq));
262       memcpy (&mreq.ipv6mr_multiaddr,
263           &addr->sin6_addr, sizeof (addr->sin6_addr));
264       mreq.ipv6mr_interface = 0; /* any */
265       setsockopt (sockets[sockets_num].fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
266           (void *) &mreq, sizeof (mreq));
267     } /* if (ai_ptr->ai_family == AF_INET6) */
268
269     sockets_num++;
270   } /* }}} for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) */
271
272   freeaddrinfo (ai_list);
273
274   if ((*ret_sockets_num) >= sockets_num)
275     return (-1);
276
277   *ret_sockets = sockets;
278   *ret_sockets_num = sockets_num;
279   return (0);
280 } /* }}} int create_sockets */
281
282 static int request_meta_data (const char *host, const char *name) /* {{{ */
283 {
284   Ganglia_metadata_msg msg;
285   char buffer[BUFF_SIZE];
286   unsigned int buffer_size;
287   XDR xdr;
288   size_t i;
289
290   memset (&msg, 0, sizeof (msg));
291
292   msg.id = gmetadata_request;
293   msg.Ganglia_metadata_msg_u.grequest.metric_id.host = strdup (host);
294   msg.Ganglia_metadata_msg_u.grequest.metric_id.name = strdup (name);
295
296   if ((msg.Ganglia_metadata_msg_u.grequest.metric_id.host == NULL)
297       || (msg.Ganglia_metadata_msg_u.grequest.metric_id.name == NULL))
298   {
299     sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.host);
300     sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.name);
301     return (-1);
302   }
303
304   memset (buffer, 0, sizeof (buffer));
305   xdrmem_create (&xdr, buffer, sizeof (buffer), XDR_ENCODE);
306
307   if (!xdr_Ganglia_metadata_msg (&xdr, &msg))
308   {
309     sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.host);
310     sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.name);
311     return (-1);
312   }
313
314   buffer_size = xdr_getpos (&xdr);
315
316   DEBUG ("gmond plugin: Requesting meta data for %s/%s.",
317       host, name);
318
319   pthread_mutex_lock (&mc_send_sockets_lock);
320   for (i = 0; i < mc_send_sockets_num; i++)
321     sendto (mc_send_sockets[i].fd, buffer, (size_t) buffer_size,
322         /* flags = */ 0,
323         (struct sockaddr *) &mc_send_sockets[i].addr,
324         mc_send_sockets[i].addrlen);
325   pthread_mutex_unlock (&mc_send_sockets_lock);
326
327   sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.host);
328   sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.name);
329   return (0);
330 } /* }}} int request_meta_data */
331
332 static staging_entry_t *staging_entry_get (const char *host, /* {{{ */
333     const char *name,
334     const char *type, const char *type_instance,
335     int values_len)
336 {
337   char key[2 * DATA_MAX_NAME_LEN];
338   staging_entry_t *se;
339   int status;
340
341   if (staging_tree == NULL)
342     return (NULL);
343
344   ssnprintf (key, sizeof (key), "%s/%s/%s", host, type, type_instance);
345
346   se = NULL;
347   status = c_avl_get (staging_tree, key, (void *) &se);
348   if (status == 0)
349     return (se);
350
351   /* insert new entry */
352   se = (staging_entry_t *) malloc (sizeof (*se));
353   if (se == NULL)
354     return (NULL);
355   memset (se, 0, sizeof (*se));
356
357   sstrncpy (se->key, key, sizeof (se->key));
358   se->flags = 0;
359
360   se->vl.values = (value_t *) calloc (values_len, sizeof (*se->vl.values));
361   if (se->vl.values == NULL)
362   {
363     sfree (se);
364     return (NULL);
365   }
366   se->vl.values_len = values_len;
367
368   se->vl.time = 0;
369   se->vl.interval = 0;
370   sstrncpy (se->vl.host, host, sizeof (se->vl.host));
371   sstrncpy (se->vl.plugin, "gmond", sizeof (se->vl.plugin));
372   sstrncpy (se->vl.type, type, sizeof (se->vl.type));
373   if (type_instance != NULL)
374     sstrncpy (se->vl.type_instance, type_instance,
375         sizeof (se->vl.type_instance));
376
377   status = c_avl_insert (staging_tree, se->key, se);
378   if (status != 0)
379   {
380     ERROR ("gmond plugin: c_avl_insert failed.");
381     sfree (se->vl.values);
382     sfree (se);
383     return (NULL);
384   }
385
386   return (se);
387 } /* }}} staging_entry_t *staging_entry_get */
388
389 static int staging_entry_submit (const char *host, const char *name, /* {{{ */
390     staging_entry_t *se)
391 {
392   value_list_t vl;
393   value_t values[se->vl.values_len];
394
395   if (se->vl.interval == 0)
396   {
397     /* No meta data has been received for this metric yet. */
398     se->flags = 0;
399     pthread_mutex_unlock (&staging_lock);
400     request_meta_data (host, name);
401     return (0);
402   }
403
404   se->flags = 0;
405
406   memcpy (values, se->vl.values, sizeof (values));
407   memcpy (&vl, &se->vl, sizeof (vl));
408
409   /* Unlock before calling `plugin_dispatch_values'.. */
410   pthread_mutex_unlock (&staging_lock);
411
412   vl.values = values;
413
414   plugin_dispatch_values (&vl);
415
416   return (0);
417 } /* }}} int staging_entry_submit */
418
419 static int staging_entry_update (const char *host, const char *name, /* {{{ */
420     const char *type, const char *type_instance,
421     int value_index, int ds_type, value_t value)
422 {
423   const data_set_t *ds;
424   staging_entry_t *se;
425
426   ds = plugin_get_ds (type);
427   if (ds == NULL)
428   {
429     ERROR ("gmond plugin: Looking up type %s failed.", type);
430     return (-1);
431   }
432
433   if (ds->ds_num <= value_index)
434   {
435     ERROR ("gmond plugin: Invalid index %i: %s has only %i data source(s).",
436         value_index, ds->type, ds->ds_num);
437     return (-1);
438   }
439
440   pthread_mutex_lock (&staging_lock);
441
442   se = staging_entry_get (host, name, type, type_instance, ds->ds_num);
443   if (se == NULL)
444   {
445     pthread_mutex_unlock (&staging_lock);
446     ERROR ("gmond plugin: staging_entry_get failed.");
447     return (-1);
448   }
449   if (se->vl.values_len != ds->ds_num)
450   {
451     pthread_mutex_unlock (&staging_lock);
452     return (-1);
453   }
454
455   if (ds_type == DS_TYPE_COUNTER)
456     se->vl.values[value_index].counter += value.counter;
457   else if (ds_type == DS_TYPE_GAUGE)
458     se->vl.values[value_index].gauge = value.gauge;
459   se->flags |= (0x01 << value_index);
460
461   DEBUG ("gmond plugin: key = %s; flags = %i;",
462       se->key, se->flags);
463
464   /* Check if all values have been set and submit if so. */
465   if (se->flags == ((0x01 << se->vl.values_len) - 1))
466   {
467     /* `staging_lock' is unlocked in `staging_entry_submit'. */
468     staging_entry_submit (host, name, se);
469   }
470   else
471   {
472     pthread_mutex_unlock (&staging_lock);
473   }
474
475   return (0);
476 } /* }}} int staging_entry_update */
477
478 static int mc_handle_value_msg (Ganglia_value_msg *msg) /* {{{ */
479 {
480   const char *host;
481   const char *name;
482
483   value_t value_counter;
484   value_t value_gauge;
485
486   size_t i;
487
488   /* Fill in `host', `name', `value_counter', and `value_gauge' according to
489    * the value type, or return with an error. */
490   switch (msg->id) /* {{{ */
491   {
492     case gmetric_uint:
493     {
494       Ganglia_gmetric_uint msg_uint;
495
496       msg_uint = msg->Ganglia_value_msg_u.gu_int;
497
498       host = msg_uint.metric_id.host;
499       name = msg_uint.metric_id.name;
500       value_counter.counter = (counter_t) msg_uint.ui;
501       value_gauge.gauge = (gauge_t) msg_uint.ui;
502       break;
503     }
504
505     case gmetric_string:
506     {
507       Ganglia_gmetric_string msg_string;
508       char *endptr;
509
510       msg_string = msg->Ganglia_value_msg_u.gstr;
511
512       host = msg_string.metric_id.host;
513       name = msg_string.metric_id.name;
514
515       endptr = NULL;
516       errno = 0;
517       value_counter.counter = (counter_t) strtoll (msg_string.str,
518           &endptr, /* base = */ 0);
519       if ((endptr == msg_string.str) || (errno != 0))
520         value_counter.counter = -1;
521
522       endptr = NULL;
523       errno = 0;
524       value_gauge.gauge = (gauge_t) strtod (msg_string.str, &endptr);
525       if ((endptr == msg_string.str) || (errno != 0))
526         value_gauge.gauge = NAN;
527
528       break;
529     }
530
531     case gmetric_float:
532     {
533       Ganglia_gmetric_float msg_float;
534
535       msg_float = msg->Ganglia_value_msg_u.gf;
536
537       host = msg_float.metric_id.host;
538       name = msg_float.metric_id.name;
539       value_counter.counter = (counter_t) msg_float.f;
540       value_gauge.gauge = (gauge_t) msg_float.f;
541       break;
542     }
543
544     case gmetric_double:
545     {
546       Ganglia_gmetric_double msg_double;
547
548       msg_double = msg->Ganglia_value_msg_u.gd;
549
550       host = msg_double.metric_id.host;
551       name = msg_double.metric_id.name;
552       value_counter.counter = (counter_t) msg_double.d;
553       value_gauge.gauge = (gauge_t) msg_double.d;
554       break;
555     }
556     default:
557       DEBUG ("gmond plugin: Value type not handled: %i", msg->id);
558       return (-1);
559   } /* }}} switch (msg->id) */
560
561   assert (host != NULL);
562   assert (name != NULL);
563
564   for (i = 0; i < metric_map_len; i++)
565   {
566     if (strcmp (name, metric_map[i].ganglia_name) != 0)
567       continue;
568
569     return (staging_entry_update (host, name,
570           metric_map[i].type, metric_map[i].type_instance,
571           metric_map[i].value_index, metric_map[i].ds_type,
572           (metric_map[i].ds_type == DS_TYPE_COUNTER)
573           ? value_counter
574           : value_gauge));
575   }
576
577   DEBUG ("gmond plugin: Cannot find a translation for %s.", name);
578
579   return (-1);
580 } /* }}} int mc_handle_value_msg */
581
582 static int mc_handle_metadata_msg (Ganglia_metadata_msg *msg) /* {{{ */
583 {
584   switch (msg->id)
585   {
586     case gmetadata_full:
587     {
588       Ganglia_metadatadef msg_meta;
589       staging_entry_t *se;
590       const data_set_t *ds;
591       size_t i;
592
593       msg_meta = msg->Ganglia_metadata_msg_u.gfull;
594
595       if (msg_meta.metric.tmax <= 0)
596         return (-1);
597
598       for (i = 0; i < metric_map_len; i++)
599       {
600         if (strcmp (msg_meta.metric_id.name, metric_map[i].ganglia_name) == 0)
601           break;
602       }
603
604       if (i >= metric_map_len)
605       {
606         DEBUG ("gmond plugin: Not handling meta data %s.",
607             msg_meta.metric_id.name);
608         return (0);
609       }
610
611       ds = plugin_get_ds (metric_map[i].type);
612       if (ds == NULL)
613       {
614         WARNING ("gmond plugin: Could not find data set %s.",
615             metric_map[i].type);
616         return (-1);
617       }
618
619       DEBUG ("gmond plugin: Received meta data for %s/%s.",
620           msg_meta.metric_id.host, msg_meta.metric_id.name);
621
622       pthread_mutex_lock (&staging_lock);
623       se = staging_entry_get (msg_meta.metric_id.host,
624           msg_meta.metric_id.name,
625           metric_map[i].type, metric_map[i].type_instance,
626           ds->ds_num);
627       if (se != NULL)
628         se->vl.interval = (int) msg_meta.metric.tmax;
629       pthread_mutex_unlock (&staging_lock);
630
631       if (se == NULL)
632       {
633         ERROR ("gmond plugin: staging_entry_get failed.");
634         return (-1);
635       }
636
637       break;
638     }
639
640     default:
641     {
642       return (-1);
643     }
644   }
645
646   return (0);
647 } /* }}} int mc_handle_metadata_msg */
648
649 static int mc_handle_metric (void *buffer, size_t buffer_size) /* {{{ */
650 {
651   XDR xdr;
652   Ganglia_msg_formats format;
653
654   xdrmem_create (&xdr, buffer, buffer_size, XDR_DECODE);
655
656   xdr_Ganglia_msg_formats (&xdr, &format);
657   xdr_setpos (&xdr, 0);
658
659   switch (format)
660   {
661     case gmetric_ushort:
662     case gmetric_short:
663     case gmetric_int:
664     case gmetric_uint:
665     case gmetric_string:
666     case gmetric_float:
667     case gmetric_double:
668     {
669       Ganglia_value_msg msg;
670
671       memset (&msg, 0, sizeof (msg));
672       if (xdr_Ganglia_value_msg (&xdr, &msg))
673         mc_handle_value_msg (&msg);
674       break;
675     }
676
677     case gmetadata_full:
678     case gmetadata_request:
679     {
680       Ganglia_metadata_msg msg;
681       memset (&msg, 0, sizeof (msg));
682       if (xdr_Ganglia_metadata_msg (&xdr, &msg))
683         mc_handle_metadata_msg (&msg);
684       break;
685     }
686
687     default:
688       DEBUG ("gmond plugin: Unknown format: %i", format);
689       return (-1);
690   } /* switch (format) */
691
692
693   return (0);
694 } /* }}} int mc_handle_metric */
695
696 static int mc_handle_socket (struct pollfd *p) /* {{{ */
697 {
698   char buffer[BUFF_SIZE];
699   ssize_t buffer_size;
700
701   if ((p->revents & (POLLIN | POLLPRI)) == 0)
702   {
703     p->revents = 0;
704     return (-1);
705   }
706
707   buffer_size = recv (p->fd, buffer, sizeof (buffer), /* flags = */ 0);
708   if (buffer_size <= 0)
709   {
710     char errbuf[1024];
711     ERROR ("gmond plugin: recv failed: %s",
712         sstrerror (errno, errbuf, sizeof (errbuf)));
713     p->revents = 0;
714     return (-1);
715   }
716
717   mc_handle_metric (buffer, (size_t) buffer_size);
718   return (0);
719 } /* }}} int mc_handle_socket */
720
721 static void *mc_receive_thread (void *arg) /* {{{ */
722 {
723   socket_entry_t *mc_receive_socket_entries;
724   int status;
725   size_t i;
726
727   mc_receive_socket_entries = NULL;
728   status = create_sockets (&mc_receive_socket_entries, &mc_receive_sockets_num,
729       "239.2.11.71", "8649", /* listen = */ 1);
730   if (status != 0)
731   {
732     ERROR ("gmond plugin: create_sockets failed.");
733     return ((void *) -1);
734   }
735
736   mc_receive_sockets = (struct pollfd *) calloc (mc_receive_sockets_num,
737       sizeof (*mc_receive_sockets));
738   if (mc_receive_sockets == NULL)
739   {
740     ERROR ("gmond plugin: calloc failed.");
741     for (i = 0; i < mc_receive_sockets_num; i++)
742       close (mc_receive_socket_entries[i].fd);
743     free (mc_receive_socket_entries);
744     mc_receive_socket_entries = NULL;
745     mc_receive_sockets_num = 0;
746     return ((void *) -1);
747   }
748
749   for (i = 0; i < mc_receive_sockets_num; i++)
750   {
751     mc_receive_sockets[i].fd = mc_receive_socket_entries[i].fd;
752     mc_receive_sockets[i].events = POLLIN | POLLPRI;
753     mc_receive_sockets[i].revents = 0;
754   }
755
756   while (mc_receive_thread_loop != 0)
757   {
758     status = poll (mc_receive_sockets, mc_receive_sockets_num, -1);
759     if (status <= 0)
760     {
761       char errbuf[1024];
762       if (errno == EINTR)
763         continue;
764       ERROR ("gmond plugin: poll failed: %s",
765           sstrerror (errno, errbuf, sizeof (errbuf)));
766       break;
767     }
768
769     for (i = 0; i < mc_receive_sockets_num; i++)
770     {
771       if (mc_receive_sockets[i].revents != 0)
772         mc_handle_socket (mc_receive_sockets + i);
773     }
774   } /* while (mc_receive_thread_loop != 0) */
775
776   return ((void *) 0);
777 } /* }}} void *mc_receive_thread */
778
779 static int mc_receive_thread_start (void) /* {{{ */
780 {
781   int status;
782
783   if (mc_receive_thread_running != 0)
784     return (-1);
785
786   mc_receive_thread_loop = 1;
787
788   status = pthread_create (&mc_receive_thread_id, /* attr = */ NULL,
789       mc_receive_thread, /* args = */ NULL);
790   if (status != 0)
791   {
792     ERROR ("gmond plugin: Starting receive thread failed.");
793     mc_receive_thread_loop = 0;
794     return (-1);
795   }
796
797   mc_receive_thread_running = 1;
798   return (0);
799 } /* }}} int start_receive_thread */
800
801 static int mc_receive_thread_stop (void) /* {{{ */
802 {
803   if (mc_receive_thread_running == 0)
804     return (-1);
805
806   mc_receive_thread_loop = 0;
807
808   INFO ("gmond plugin: Stopping receive thread.");
809   pthread_kill (mc_receive_thread_id, SIGTERM);
810   pthread_join (mc_receive_thread_id, /* return value = */ NULL);
811   memset (&mc_receive_thread_id, 0, sizeof (mc_receive_thread_id));
812
813   mc_receive_thread_running = 0;
814
815   return (0);
816 } /* }}} int mc_receive_thread_stop */
817
818 /* 
819  * TODO: Config:
820  *
821  * <Plugin gmond>
822  *   MCReceiveFrom "239.2.11.71" "8649"
823  *   MCSendTo "239.2.11.71" "8649"
824  *   <Metric "load_one">
825  *     Type "load"
826  *     [TypeInstance "foo"]
827  *     [Index 0]
828  *   </Metric>
829  * </Plugin>
830  */
831
832 static int gmond_init (void)
833 {
834   create_sockets (&mc_send_sockets, &mc_send_sockets_num,
835       "239.2.11.71", "8649", /* listen = */ 0);
836
837   staging_tree = c_avl_create ((void *) strcmp);
838   if (staging_tree == NULL)
839   {
840     ERROR ("gmond plugin: c_avl_create failed.");
841     return (-1);
842   }
843
844   mc_receive_thread_start ();
845
846   return (0);
847 } /* int gmond_init */
848
849 static int gmond_shutdown (void)
850 {
851   size_t i;
852
853   mc_receive_thread_stop ();
854
855   pthread_mutex_lock (&mc_send_sockets_lock);
856   for (i = 0; i < mc_send_sockets_num; i++)
857   {
858     close (mc_send_sockets[i].fd);
859     mc_send_sockets[i].fd = -1;
860   }
861   sfree (mc_send_sockets);
862   mc_send_sockets_num = 0;
863   pthread_mutex_unlock (&mc_send_sockets_lock);
864
865
866   return (0);
867 } /* int gmond_shutdown */
868
869 void module_register (void)
870 {
871   plugin_register_init ("gmond", gmond_init);
872   plugin_register_shutdown ("gmond", gmond_shutdown);
873 }
874
875 /* vim: set sw=2 sts=2 et fdm=marker : */