Merge pull request #1830 from rubenk/move-collectd-header
[collectd.git] / src / gmond.c
1 /**
2  * collectd - src/gmond.c
3  * Copyright (C) 2009-2015  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "common.h"
31 #include "configfile.h"
32 #include "utils_avltree.h"
33
34 #if HAVE_NETDB_H
35 # include <netdb.h>
36 #endif
37 #if HAVE_NETINET_IN_H
38 # include <netinet/in.h>
39 #endif
40 #if HAVE_ARPA_INET_H
41 # include <arpa/inet.h>
42 #endif
43 #if HAVE_POLL_H
44 # include <poll.h>
45 #endif
46
47 #include <gm_protocol.h>
48
49 #ifndef IPV6_ADD_MEMBERSHIP
50 # ifdef IPV6_JOIN_GROUP
51 #  define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
52 # else
53 #  error "Neither IP_ADD_MEMBERSHIP nor IPV6_JOIN_GROUP is defined"
54 # endif
55 #endif /* !IP_ADD_MEMBERSHIP */
56
57 #ifdef GANGLIA_MAX_MESSAGE_LEN
58 # define BUFF_SIZE GANGLIA_MAX_MESSAGE_LEN
59 #else
60 # define BUFF_SIZE 1400
61 #endif
62
63 struct socket_entry_s
64 {
65   int                     fd;
66   struct sockaddr_storage addr;
67   socklen_t               addrlen;
68 };
69 typedef struct socket_entry_s socket_entry_t;
70
71 struct staging_entry_s
72 {
73   char key[2 * DATA_MAX_NAME_LEN];
74   value_list_t vl;
75   int flags;
76 };
77 typedef struct staging_entry_s staging_entry_t;
78
79 struct metric_map_s
80 {
81   char  *ganglia_name;
82   char  *type;
83   char  *type_instance;
84   char  *ds_name;
85   int    ds_type;
86   size_t ds_index;
87 };
88 typedef struct metric_map_s metric_map_t;
89
90 #define MC_RECEIVE_GROUP_DEFAULT "239.2.11.71"
91 static char          *mc_receive_group = NULL;
92 #define MC_RECEIVE_PORT_DEFAULT "8649"
93 static char          *mc_receive_port = NULL;
94
95 static struct pollfd *mc_receive_sockets = NULL;
96 static size_t         mc_receive_sockets_num = 0;
97
98 static socket_entry_t  *mc_send_sockets = NULL;
99 static size_t           mc_send_sockets_num = 0;
100 static pthread_mutex_t  mc_send_sockets_lock = PTHREAD_MUTEX_INITIALIZER;
101
102 static int            mc_receive_thread_loop    = 0;
103 static int            mc_receive_thread_running = 0;
104 static pthread_t      mc_receive_thread_id;
105
106 static metric_map_t metric_map_default[] =
107 { /*---------------+-------------+-----------+-------------+------+-----*
108    * ganglia_name  ! type        ! type_inst ! data_source ! type ! idx *
109    *---------------+-------------+-----------+-------------+------+-----*/
110   { "load_one",     "load",       "",         "shortterm",     -1,   -1 },
111   { "load_five",    "load",       "",         "midterm",       -1,   -1 },
112   { "load_fifteen", "load",       "",         "longterm",      -1,   -1 },
113   { "cpu_user",     "cpu",        "user",     "value",         -1,   -1 },
114   { "cpu_system",   "cpu",        "system",   "value",         -1,   -1 },
115   { "cpu_idle",     "cpu",        "idle",     "value",         -1,   -1 },
116   { "cpu_nice",     "cpu",        "nice",     "value",         -1,   -1 },
117   { "cpu_wio",      "cpu",        "wait",     "value",         -1,   -1 },
118   { "mem_free",     "memory",     "free",     "value",         -1,   -1 },
119   { "mem_shared",   "memory",     "shared",   "value",         -1,   -1 },
120   { "mem_buffers",  "memory",     "buffered", "value",         -1,   -1 },
121   { "mem_cached",   "memory",     "cached",   "value",         -1,   -1 },
122   { "mem_total",    "memory",     "total",    "value",         -1,   -1 },
123   { "bytes_in",     "if_octets",  "",         "rx",            -1,   -1 },
124   { "bytes_out",    "if_octets",  "",         "tx",            -1,   -1 },
125   { "pkts_in",      "if_packets", "",         "rx",            -1,   -1 },
126   { "pkts_out",     "if_packets", "",         "tx",            -1,   -1 }
127 };
128 static size_t metric_map_len_default = STATIC_ARRAY_SIZE (metric_map_default);
129
130 static metric_map_t *metric_map = NULL;
131 static size_t        metric_map_len = 0;
132
133 static c_avl_tree_t   *staging_tree;
134 static pthread_mutex_t staging_lock = PTHREAD_MUTEX_INITIALIZER;
135
136 static metric_map_t *metric_lookup (const char *key) /* {{{ */
137 {
138   metric_map_t *map;
139   size_t map_len;
140   size_t i;
141
142   /* Search the user-supplied table first.. */
143   map = metric_map;
144   map_len = metric_map_len;
145   for (i = 0; i < map_len; i++)
146     if (strcmp (map[i].ganglia_name, key) == 0)
147       break;
148
149   /* .. and fall back to the built-in table if nothing is found. */
150   if (i >= map_len)
151   {
152     map = metric_map_default;
153     map_len = metric_map_len_default;
154
155     for (i = 0; i < map_len; i++)
156       if (strcmp (map[i].ganglia_name, key) == 0)
157         break;
158   }
159
160   if (i >= map_len)
161     return (NULL);
162
163   /* Look up the DS type and ds_index. */
164   if (map[i].ds_type < 0) /* {{{ */
165   {
166     const data_set_t *ds;
167
168     ds = plugin_get_ds (map[i].type);
169     if (ds == NULL)
170     {
171       WARNING ("gmond plugin: Type not defined: %s", map[i].type);
172       return (NULL);
173     }
174
175     if ((map[i].ds_name == NULL) && (ds->ds_num != 1))
176     {
177       WARNING ("gmond plugin: No data source name defined for metric %s, "
178           "but type %s has more than one data source.",
179           map[i].ganglia_name, map[i].type);
180       return (NULL);
181     }
182
183     if (map[i].ds_name == NULL)
184     {
185       map[i].ds_index = 0;
186     }
187     else
188     {
189       size_t j;
190
191       for (j = 0; j < ds->ds_num; j++)
192         if (strcasecmp (ds->ds[j].name, map[i].ds_name) == 0)
193           break;
194
195       if (j >= ds->ds_num)
196       {
197         WARNING ("gmond plugin: There is no data source "
198             "named `%s' in type `%s'.",
199             map[i].ds_name, ds->type);
200         return (NULL);
201       }
202       map[i].ds_index = j;
203     }
204
205     map[i].ds_type = ds->ds[map[i].ds_index].type;
206   } /* }}} if ((map[i].ds_type < 0) || (map[i].ds_index < 0)) */
207
208   return (map + i);
209 } /* }}} metric_map_t *metric_lookup */
210
211 static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */
212     size_t *ret_sockets_num,
213     const char *node, const char *service, int listen)
214 {
215   struct addrinfo *ai_list;
216   struct addrinfo *ai_ptr;
217   int              ai_return;
218
219   socket_entry_t *sockets = NULL;
220   size_t          sockets_num = 0;
221
222   int status;
223
224   if (*ret_sockets != NULL)
225     return (EINVAL);
226
227   struct addrinfo ai_hints = {
228     .ai_family = AF_UNSPEC,
229     .ai_flags = AI_ADDRCONFIG | AI_PASSIVE,
230     .ai_protocol = IPPROTO_UDP,
231     .ai_socktype = SOCK_DGRAM
232   };
233
234   ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
235   if (ai_return != 0)
236   {
237     char errbuf[1024];
238     ERROR ("gmond plugin: getaddrinfo (%s, %s) failed: %s",
239         (node == NULL) ? "(null)" : node,
240         (service == NULL) ? "(null)" : service,
241         (ai_return == EAI_SYSTEM)
242         ? sstrerror (errno, errbuf, sizeof (errbuf))
243         : gai_strerror (ai_return));
244     return (-1);
245   }
246
247   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) /* {{{ */
248   {
249     socket_entry_t *tmp;
250
251     tmp = realloc (sockets, (sockets_num + 1) * sizeof (*sockets));
252     if (tmp == NULL)
253     {
254       ERROR ("gmond plugin: realloc failed.");
255       continue;
256     }
257     sockets = tmp;
258
259     sockets[sockets_num].fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
260         ai_ptr->ai_protocol);
261     if (sockets[sockets_num].fd < 0)
262     {
263       char errbuf[1024];
264       ERROR ("gmond plugin: socket failed: %s",
265           sstrerror (errno, errbuf, sizeof (errbuf)));
266       continue;
267     }
268
269     assert (sizeof (sockets[sockets_num].addr) >= ai_ptr->ai_addrlen);
270     memcpy (&sockets[sockets_num].addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
271     sockets[sockets_num].addrlen = ai_ptr->ai_addrlen;
272
273     /* Sending socket: Open only one socket and don't bind it. */
274     if (listen == 0)
275     {
276       sockets_num++;
277       break;
278     }
279     else
280     {
281       int yes = 1;
282
283       status = setsockopt (sockets[sockets_num].fd, SOL_SOCKET, SO_REUSEADDR,
284           (void *) &yes, sizeof (yes));
285       if (status != 0)
286       {
287         char errbuf[1024];
288         WARNING ("gmond plugin: setsockopt(2) failed: %s",
289                  sstrerror (errno, errbuf, sizeof (errbuf)));
290       }
291     }
292
293     status = bind (sockets[sockets_num].fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
294     if (status != 0)
295     {
296       char errbuf[1024];
297       ERROR ("gmond plugin: bind failed: %s",
298           sstrerror (errno, errbuf, sizeof (errbuf)));
299       close (sockets[sockets_num].fd);
300       continue;
301     }
302
303     if (ai_ptr->ai_family == AF_INET)
304     {
305       struct sockaddr_in *addr;
306       int loop;
307
308       addr = (struct sockaddr_in *) ai_ptr->ai_addr;
309
310       if (!IN_MULTICAST (ntohl (addr->sin_addr.s_addr)))
311       {
312         sockets_num++;
313         continue;
314       }
315
316       loop = 1;
317       status = setsockopt (sockets[sockets_num].fd, IPPROTO_IP, IP_MULTICAST_LOOP,
318           (void *) &loop, sizeof (loop));
319       if (status != 0)
320       {
321         char errbuf[1024];
322         WARNING ("gmond plugin: setsockopt(2) failed: %s",
323                  sstrerror (errno, errbuf, sizeof (errbuf)));
324       }
325
326       struct ip_mreq mreq = {
327         .imr_multiaddr.s_addr = addr->sin_addr.s_addr,
328         .imr_interface.s_addr = htonl (INADDR_ANY)
329       };
330
331       status = setsockopt (sockets[sockets_num].fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
332           (void *) &mreq, sizeof (mreq));
333       if (status != 0)
334       {
335         char errbuf[1024];
336         WARNING ("gmond plugin: setsockopt(2) failed: %s",
337                  sstrerror (errno, errbuf, sizeof (errbuf)));
338       }
339     } /* if (ai_ptr->ai_family == AF_INET) */
340     else if (ai_ptr->ai_family == AF_INET6)
341     {
342       struct sockaddr_in6 *addr;
343       int loop;
344
345       addr = (struct sockaddr_in6 *) ai_ptr->ai_addr;
346
347       if (!IN6_IS_ADDR_MULTICAST (&addr->sin6_addr))
348       {
349         sockets_num++;
350         continue;
351       }
352
353       loop = 1;
354       status = setsockopt (sockets[sockets_num].fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
355           (void *) &loop, sizeof (loop));
356       if (status != 0)
357       {
358         char errbuf[1024];
359         WARNING ("gmond plugin: setsockopt(2) failed: %s",
360                  sstrerror (errno, errbuf, sizeof (errbuf)));
361       }
362
363       struct ipv6_mreq mreq = {
364         .ipv6mr_interface = 0 /* any */
365       };
366
367       memcpy (&mreq.ipv6mr_multiaddr,
368           &addr->sin6_addr, sizeof (addr->sin6_addr));
369       status = setsockopt (sockets[sockets_num].fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
370           (void *) &mreq, sizeof (mreq));
371       if (status != 0)
372       {
373         char errbuf[1024];
374         WARNING ("gmond plugin: setsockopt(2) failed: %s",
375                  sstrerror (errno, errbuf, sizeof (errbuf)));
376       }
377     } /* if (ai_ptr->ai_family == AF_INET6) */
378
379     sockets_num++;
380   } /* }}} for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) */
381
382   freeaddrinfo (ai_list);
383
384   if (sockets_num == 0)
385   {
386     sfree (sockets);
387     return (-1);
388   }
389
390   *ret_sockets = sockets;
391   *ret_sockets_num = sockets_num;
392   return (0);
393 } /* }}} int create_sockets */
394
395 static int request_meta_data (const char *host, const char *name) /* {{{ */
396 {
397   Ganglia_metadata_msg msg = { 0 };
398   char buffer[BUFF_SIZE] = { 0 };
399   unsigned int buffer_size;
400   XDR xdr;
401   size_t i;
402
403   msg.id = gmetadata_request;
404   msg.Ganglia_metadata_msg_u.grequest.metric_id.host = strdup (host);
405   msg.Ganglia_metadata_msg_u.grequest.metric_id.name = strdup (name);
406
407   if ((msg.Ganglia_metadata_msg_u.grequest.metric_id.host == NULL)
408       || (msg.Ganglia_metadata_msg_u.grequest.metric_id.name == NULL))
409   {
410     sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.host);
411     sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.name);
412     return (-1);
413   }
414
415   xdrmem_create (&xdr, buffer, sizeof (buffer), XDR_ENCODE);
416
417   if (!xdr_Ganglia_metadata_msg (&xdr, &msg))
418   {
419     sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.host);
420     sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.name);
421     return (-1);
422   }
423
424   buffer_size = xdr_getpos (&xdr);
425
426   DEBUG ("gmond plugin: Requesting meta data for %s/%s.",
427       host, name);
428
429   pthread_mutex_lock (&mc_send_sockets_lock);
430   for (i = 0; i < mc_send_sockets_num; i++)
431   {
432     ssize_t status = sendto (mc_send_sockets[i].fd, buffer, (size_t) buffer_size,
433         /* flags = */ 0,
434         (struct sockaddr *) &mc_send_sockets[i].addr,
435         mc_send_sockets[i].addrlen);
436     if (status == -1)
437     {
438       char errbuf[1024];
439       ERROR ("gmond plugin: sendto(2) failed: %s",
440              sstrerror (errno, errbuf, sizeof (errbuf)));
441       continue;
442     }
443   }
444   pthread_mutex_unlock (&mc_send_sockets_lock);
445
446   sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.host);
447   sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.name);
448   return (0);
449 } /* }}} int request_meta_data */
450
451 static staging_entry_t *staging_entry_get (const char *host, /* {{{ */
452     const char *name,
453     const char *type, const char *type_instance,
454     int values_len)
455 {
456   char key[2 * DATA_MAX_NAME_LEN];
457   staging_entry_t *se;
458   int status;
459
460   if (staging_tree == NULL)
461     return (NULL);
462
463   ssnprintf (key, sizeof (key), "%s/%s/%s", host, type,
464       (type_instance != NULL) ? type_instance : "");
465
466   se = NULL;
467   status = c_avl_get (staging_tree, key, (void *) &se);
468   if (status == 0)
469     return (se);
470
471   /* insert new entry */
472   se = calloc (1, sizeof (*se));
473   if (se == NULL)
474     return (NULL);
475
476   sstrncpy (se->key, key, sizeof (se->key));
477   se->flags = 0;
478
479   se->vl.values = (value_t *) calloc (values_len, sizeof (*se->vl.values));
480   if (se->vl.values == NULL)
481   {
482     sfree (se);
483     return (NULL);
484   }
485   se->vl.values_len = values_len;
486
487   se->vl.time = 0;
488   se->vl.interval = 0;
489   sstrncpy (se->vl.host, host, sizeof (se->vl.host));
490   sstrncpy (se->vl.plugin, "gmond", sizeof (se->vl.plugin));
491   sstrncpy (se->vl.type, type, sizeof (se->vl.type));
492   if (type_instance != NULL)
493     sstrncpy (se->vl.type_instance, type_instance,
494         sizeof (se->vl.type_instance));
495
496   status = c_avl_insert (staging_tree, se->key, se);
497   if (status != 0)
498   {
499     ERROR ("gmond plugin: c_avl_insert failed.");
500     sfree (se->vl.values);
501     sfree (se);
502     return (NULL);
503   }
504
505   return (se);
506 } /* }}} staging_entry_t *staging_entry_get */
507
508 static int staging_entry_update (const char *host, const char *name, /* {{{ */
509     const char *type, const char *type_instance,
510     size_t ds_index, int ds_type, value_t value)
511 {
512   const data_set_t *ds;
513   staging_entry_t *se;
514
515   ds = plugin_get_ds (type);
516   if (ds == NULL)
517   {
518     ERROR ("gmond plugin: Looking up type %s failed.", type);
519     return (-1);
520   }
521
522   if (ds->ds_num <= ds_index)
523   {
524     ERROR ("gmond plugin: Invalid index %zu: %s has only %zu data source(s).",
525         ds_index, ds->type, ds->ds_num);
526     return (-1);
527   }
528
529   pthread_mutex_lock (&staging_lock);
530
531   se = staging_entry_get (host, name, type, type_instance, ds->ds_num);
532   if (se == NULL)
533   {
534     pthread_mutex_unlock (&staging_lock);
535     ERROR ("gmond plugin: staging_entry_get failed.");
536     return (-1);
537   }
538   if (se->vl.values_len != ds->ds_num)
539   {
540     pthread_mutex_unlock (&staging_lock);
541     return (-1);
542   }
543
544   if (ds_type == DS_TYPE_COUNTER)
545     se->vl.values[ds_index].counter += value.counter;
546   else if (ds_type == DS_TYPE_GAUGE)
547     se->vl.values[ds_index].gauge = value.gauge;
548   else if (ds_type == DS_TYPE_DERIVE)
549     se->vl.values[ds_index].derive += value.derive;
550   else if (ds_type == DS_TYPE_ABSOLUTE)
551     se->vl.values[ds_index].absolute = value.absolute;
552   else
553     assert (23 == 42);
554
555   se->flags |= (0x01 << ds_index);
556
557   /* Check if all data sources have been set. If not, return here. */
558   if (se->flags != ((0x01 << se->vl.values_len) - 1))
559   {
560     pthread_mutex_unlock (&staging_lock);
561     return (0);
562   }
563
564   /* Check if the interval of this metric is known. If not, request meta data
565    * and return. */
566   if (se->vl.interval == 0)
567   {
568     /* No meta data has been received for this metric yet. */
569     se->flags = 0;
570     pthread_mutex_unlock (&staging_lock);
571
572     request_meta_data (host, name);
573     return (0);
574   }
575
576   plugin_dispatch_values (&se->vl);
577
578   se->flags = 0;
579   pthread_mutex_unlock (&staging_lock);
580
581   return (0);
582 } /* }}} int staging_entry_update */
583
584 static int mc_handle_value_msg (Ganglia_value_msg *msg) /* {{{ */
585 {
586   const char *host;
587   const char *name;
588   metric_map_t *map;
589
590   value_t value_counter;
591   value_t value_gauge;
592   value_t value_derive;
593
594   /* Fill in `host', `name', `value_counter', and `value_gauge' according to
595    * the value type, or return with an error. */
596   switch (msg->id) /* {{{ */
597   {
598     case gmetric_uint:
599     {
600       Ganglia_gmetric_uint msg_uint;
601
602       msg_uint = msg->Ganglia_value_msg_u.gu_int;
603
604       host = msg_uint.metric_id.host;
605       name = msg_uint.metric_id.name;
606       value_counter.counter = (counter_t) msg_uint.ui;
607       value_gauge.gauge = (gauge_t) msg_uint.ui;
608       value_derive.derive = (derive_t) msg_uint.ui;
609       break;
610     }
611
612     case gmetric_string:
613     {
614       Ganglia_gmetric_string msg_string;
615       int status;
616
617       msg_string = msg->Ganglia_value_msg_u.gstr;
618
619       host = msg_string.metric_id.host;
620       name = msg_string.metric_id.name;
621
622       status = parse_value (msg_string.str, &value_derive, DS_TYPE_DERIVE);
623       if (status != 0)
624         value_derive.derive = -1;
625
626       status = parse_value (msg_string.str, &value_gauge, DS_TYPE_GAUGE);
627       if (status != 0)
628         value_gauge.gauge = NAN;
629
630       status = parse_value (msg_string.str, &value_counter, DS_TYPE_COUNTER);
631       if (status != 0)
632         value_counter.counter = 0;
633
634       break;
635     }
636
637     case gmetric_float:
638     {
639       Ganglia_gmetric_float msg_float;
640
641       msg_float = msg->Ganglia_value_msg_u.gf;
642
643       host = msg_float.metric_id.host;
644       name = msg_float.metric_id.name;
645       value_counter.counter = (counter_t) msg_float.f;
646       value_gauge.gauge = (gauge_t) msg_float.f;
647       value_derive.derive = (derive_t) msg_float.f;
648       break;
649     }
650
651     case gmetric_double:
652     {
653       Ganglia_gmetric_double msg_double;
654
655       msg_double = msg->Ganglia_value_msg_u.gd;
656
657       host = msg_double.metric_id.host;
658       name = msg_double.metric_id.name;
659       value_counter.counter = (counter_t) msg_double.d;
660       value_gauge.gauge = (gauge_t) msg_double.d;
661       value_derive.derive = (derive_t) msg_double.d;
662       break;
663     }
664     default:
665       DEBUG ("gmond plugin: Value type not handled: %i", msg->id);
666       return (-1);
667   } /* }}} switch (msg->id) */
668
669   assert (host != NULL);
670   assert (name != NULL);
671
672   map = metric_lookup (name);
673   if (map != NULL)
674   {
675     value_t val_copy;
676
677     if ((map->ds_type == DS_TYPE_COUNTER)
678         || (map->ds_type == DS_TYPE_ABSOLUTE))
679       val_copy = value_counter;
680     else if (map->ds_type == DS_TYPE_GAUGE)
681       val_copy = value_gauge;
682     else if (map->ds_type == DS_TYPE_DERIVE)
683       val_copy = value_derive;
684     else
685       assert (23 == 42);
686
687     return (staging_entry_update (host, name,
688           map->type, map->type_instance,
689           map->ds_index, map->ds_type,
690           val_copy));
691   }
692
693   DEBUG ("gmond plugin: Cannot find a translation for %s.", name);
694   return (-1);
695 } /* }}} int mc_handle_value_msg */
696
697 static int mc_handle_metadata_msg (Ganglia_metadata_msg *msg) /* {{{ */
698 {
699   switch (msg->id)
700   {
701     case gmetadata_full:
702     {
703       Ganglia_metadatadef msg_meta;
704       staging_entry_t *se;
705       const data_set_t *ds;
706       metric_map_t *map;
707
708       msg_meta = msg->Ganglia_metadata_msg_u.gfull;
709
710       if (msg_meta.metric.tmax == 0)
711         return (-1);
712
713       map = metric_lookup (msg_meta.metric_id.name);
714       if (map == NULL)
715       {
716         DEBUG ("gmond plugin: Not handling meta data %s.",
717             msg_meta.metric_id.name);
718         return (0);
719       }
720
721       ds = plugin_get_ds (map->type);
722       if (ds == NULL)
723       {
724         WARNING ("gmond plugin: Could not find data set %s.", map->type);
725         return (-1);
726       }
727
728       DEBUG ("gmond plugin: Received meta data for %s/%s.",
729           msg_meta.metric_id.host, msg_meta.metric_id.name);
730
731       pthread_mutex_lock (&staging_lock);
732       se = staging_entry_get (msg_meta.metric_id.host,
733           msg_meta.metric_id.name,
734           map->type, map->type_instance,
735           ds->ds_num);
736       if (se != NULL)
737         se->vl.interval = TIME_T_TO_CDTIME_T (msg_meta.metric.tmax);
738       pthread_mutex_unlock (&staging_lock);
739
740       if (se == NULL)
741       {
742         ERROR ("gmond plugin: staging_entry_get failed.");
743         return (-1);
744       }
745
746       break;
747     }
748
749     default:
750     {
751       return (-1);
752     }
753   }
754
755   return (0);
756 } /* }}} int mc_handle_metadata_msg */
757
758 static int mc_handle_metric (void *buffer, size_t buffer_size) /* {{{ */
759 {
760   XDR xdr;
761   Ganglia_msg_formats format;
762
763   xdrmem_create (&xdr, buffer, buffer_size, XDR_DECODE);
764
765   xdr_Ganglia_msg_formats (&xdr, &format);
766   xdr_setpos (&xdr, 0);
767
768   switch (format)
769   {
770     case gmetric_ushort:
771     case gmetric_short:
772     case gmetric_int:
773     case gmetric_uint:
774     case gmetric_string:
775     case gmetric_float:
776     case gmetric_double:
777     {
778       Ganglia_value_msg msg = { 0 };
779
780       if (xdr_Ganglia_value_msg (&xdr, &msg))
781         mc_handle_value_msg (&msg);
782       break;
783     }
784
785     case gmetadata_full:
786     case gmetadata_request:
787     {
788       Ganglia_metadata_msg msg = { 0 };
789       if (xdr_Ganglia_metadata_msg (&xdr, &msg))
790         mc_handle_metadata_msg (&msg);
791       break;
792     }
793
794     default:
795       DEBUG ("gmond plugin: Unknown format: %i", format);
796       return (-1);
797   } /* switch (format) */
798
799
800   return (0);
801 } /* }}} int mc_handle_metric */
802
803 static int mc_handle_socket (struct pollfd *p) /* {{{ */
804 {
805   char buffer[BUFF_SIZE];
806   ssize_t buffer_size;
807
808   if ((p->revents & (POLLIN | POLLPRI)) == 0)
809   {
810     p->revents = 0;
811     return (-1);
812   }
813
814   buffer_size = recv (p->fd, buffer, sizeof (buffer), /* flags = */ 0);
815   if (buffer_size <= 0)
816   {
817     char errbuf[1024];
818     ERROR ("gmond plugin: recv failed: %s",
819         sstrerror (errno, errbuf, sizeof (errbuf)));
820     p->revents = 0;
821     return (-1);
822   }
823
824   mc_handle_metric (buffer, (size_t) buffer_size);
825   return (0);
826 } /* }}} int mc_handle_socket */
827
828 static void *mc_receive_thread (void *arg) /* {{{ */
829 {
830   socket_entry_t *mc_receive_socket_entries;
831   int status;
832   size_t i;
833
834   mc_receive_socket_entries = NULL;
835   status = create_sockets (&mc_receive_socket_entries, &mc_receive_sockets_num,
836       (mc_receive_group != NULL) ? mc_receive_group : MC_RECEIVE_GROUP_DEFAULT,
837       (mc_receive_port != NULL) ? mc_receive_port : MC_RECEIVE_PORT_DEFAULT,
838       /* listen = */ 1);
839   if (status != 0)
840   {
841     ERROR ("gmond plugin: create_sockets failed.");
842     return ((void *) -1);
843   }
844
845   mc_receive_sockets = (struct pollfd *) calloc (mc_receive_sockets_num,
846       sizeof (*mc_receive_sockets));
847   if (mc_receive_sockets == NULL)
848   {
849     ERROR ("gmond plugin: calloc failed.");
850     for (i = 0; i < mc_receive_sockets_num; i++)
851       close (mc_receive_socket_entries[i].fd);
852     free (mc_receive_socket_entries);
853     mc_receive_socket_entries = NULL;
854     mc_receive_sockets_num = 0;
855     return ((void *) -1);
856   }
857
858   for (i = 0; i < mc_receive_sockets_num; i++)
859   {
860     mc_receive_sockets[i].fd = mc_receive_socket_entries[i].fd;
861     mc_receive_sockets[i].events = POLLIN | POLLPRI;
862     mc_receive_sockets[i].revents = 0;
863   }
864
865   while (mc_receive_thread_loop != 0)
866   {
867     status = poll (mc_receive_sockets, mc_receive_sockets_num, -1);
868     if (status <= 0)
869     {
870       char errbuf[1024];
871       if (errno == EINTR)
872         continue;
873       ERROR ("gmond plugin: poll failed: %s",
874           sstrerror (errno, errbuf, sizeof (errbuf)));
875       break;
876     }
877
878     for (i = 0; i < mc_receive_sockets_num; i++)
879     {
880       if (mc_receive_sockets[i].revents != 0)
881         mc_handle_socket (mc_receive_sockets + i);
882     }
883   } /* while (mc_receive_thread_loop != 0) */
884
885   free (mc_receive_socket_entries);
886   return ((void *) 0);
887 } /* }}} void *mc_receive_thread */
888
889 static int mc_receive_thread_start (void) /* {{{ */
890 {
891   int status;
892
893   if (mc_receive_thread_running != 0)
894     return (-1);
895
896   mc_receive_thread_loop = 1;
897
898   status = plugin_thread_create (&mc_receive_thread_id, /* attr = */ NULL,
899       mc_receive_thread, /* args = */ NULL);
900   if (status != 0)
901   {
902     ERROR ("gmond plugin: Starting receive thread failed.");
903     mc_receive_thread_loop = 0;
904     return (-1);
905   }
906
907   mc_receive_thread_running = 1;
908   return (0);
909 } /* }}} int start_receive_thread */
910
911 static int mc_receive_thread_stop (void) /* {{{ */
912 {
913   if (mc_receive_thread_running == 0)
914     return (-1);
915
916   mc_receive_thread_loop = 0;
917
918   INFO ("gmond plugin: Stopping receive thread.");
919   pthread_kill (mc_receive_thread_id, SIGTERM);
920   pthread_join (mc_receive_thread_id, /* return value = */ NULL);
921   memset (&mc_receive_thread_id, 0, sizeof (mc_receive_thread_id));
922
923   mc_receive_thread_running = 0;
924
925   return (0);
926 } /* }}} int mc_receive_thread_stop */
927
928 /*
929  * Config:
930  *
931  * <Plugin gmond>
932  *   MCReceiveFrom "239.2.11.71" "8649"
933  *   <Metric "load_one">
934  *     Type "load"
935  *     [TypeInstance "foo"]
936  *     [DataSource "bar"]
937  *   </Metric>
938  * </Plugin>
939  */
940 static int gmond_config_set_string (oconfig_item_t *ci, char **str) /* {{{ */
941 {
942   char *tmp;
943
944   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
945   {
946     WARNING ("gmond plugin: The `%s' option needs "
947         "exactly one string argument.", ci->key);
948     return (-1);
949   }
950
951   tmp = strdup (ci->values[0].value.string);
952   if (tmp == NULL)
953   {
954     ERROR ("gmond plugin: strdup failed.");
955     return (-1);
956   }
957
958   sfree (*str);
959   *str = tmp;
960   return (0);
961 } /* }}} int gmond_config_set_string */
962
963 static int gmond_config_add_metric (oconfig_item_t *ci) /* {{{ */
964 {
965   metric_map_t *map;
966   int i;
967
968   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
969   {
970     WARNING ("gmond plugin: `Metric' blocks need "
971         "exactly one string argument.");
972     return (-1);
973   }
974
975   map = realloc (metric_map, (metric_map_len + 1) * sizeof (*metric_map));
976   if (map == NULL)
977   {
978     ERROR ("gmond plugin: realloc failed.");
979     return (-1);
980   }
981   metric_map = map;
982   map = metric_map + metric_map_len;
983
984   memset (map, 0, sizeof (*map));
985   map->type = NULL;
986   map->type_instance = NULL;
987   map->ds_name = NULL;
988   map->ds_type = -1;
989   map->ds_index = -1;
990
991   map->ganglia_name = strdup (ci->values[0].value.string);
992   if (map->ganglia_name == NULL)
993   {
994     ERROR ("gmond plugin: strdup failed.");
995     return (-1);
996   }
997
998   for (i = 0; i < ci->children_num; i++)
999   {
1000     oconfig_item_t *child = ci->children + i;
1001     if (strcasecmp ("Type", child->key) == 0)
1002       gmond_config_set_string (child, &map->type);
1003     else if (strcasecmp ("TypeInstance", child->key) == 0)
1004       gmond_config_set_string (child, &map->type_instance);
1005     else if (strcasecmp ("DataSource", child->key) == 0)
1006       gmond_config_set_string (child, &map->ds_name);
1007     else
1008     {
1009       WARNING ("gmond plugin: Unknown configuration option `%s' ignored.",
1010           child->key);
1011     }
1012   }
1013
1014   if (map->type == NULL)
1015   {
1016     ERROR ("gmond plugin: No type is set for metric %s.",
1017         map->ganglia_name);
1018     sfree (map->ganglia_name);
1019     sfree (map->type_instance);
1020     return (-1);
1021   }
1022
1023   metric_map_len++;
1024   return (0);
1025 } /* }}} int gmond_config_add_metric */
1026
1027 static int gmond_config_set_address (oconfig_item_t *ci, /* {{{ */
1028     char **ret_addr, char **ret_port)
1029 {
1030   char *addr;
1031   char *port;
1032
1033   if ((ci->values_num != 1) && (ci->values_num != 2))
1034   {
1035     WARNING ("gmond plugin: The `%s' config option needs "
1036         "one or two string arguments.",
1037         ci->key);
1038     return (-1);
1039   }
1040   if ((ci->values[0].type != OCONFIG_TYPE_STRING)
1041       || ((ci->values_num == 2)
1042         && (ci->values[1].type != OCONFIG_TYPE_STRING)))
1043   {
1044     WARNING ("gmond plugin: The `%s' config option needs "
1045         "one or two string arguments.",
1046         ci->key);
1047     return (-1);
1048   }
1049
1050   addr = strdup (ci->values[0].value.string);
1051   if (ci->values_num == 2)
1052     port = strdup (ci->values[1].value.string);
1053   else
1054     port = NULL;
1055
1056   if ((addr == NULL) || ((ci->values_num == 2) && (port == NULL)))
1057   {
1058     ERROR ("gmond plugin: strdup failed.");
1059     sfree (addr);
1060     sfree (port);
1061     return (-1);
1062   }
1063
1064   sfree (*ret_addr);
1065   sfree (*ret_port);
1066
1067   *ret_addr = addr;
1068   *ret_port = port;
1069
1070   return (0);
1071 } /* }}} int gmond_config_set_address */
1072
1073 static int gmond_config (oconfig_item_t *ci) /* {{{ */
1074 {
1075   int i;
1076
1077   for (i = 0; i < ci->children_num; i++)
1078   {
1079     oconfig_item_t *child = ci->children + i;
1080     if (strcasecmp ("MCReceiveFrom", child->key) == 0)
1081       gmond_config_set_address (child, &mc_receive_group, &mc_receive_port);
1082     else if (strcasecmp ("Metric", child->key) == 0)
1083       gmond_config_add_metric (child);
1084     else
1085     {
1086       WARNING ("gmond plugin: Unknown configuration option `%s' ignored.",
1087           child->key);
1088     }
1089   }
1090
1091   return (0);
1092 } /* }}} int gmond_config */
1093
1094 static int gmond_init (void) /* {{{ */
1095 {
1096   create_sockets (&mc_send_sockets, &mc_send_sockets_num,
1097       (mc_receive_group != NULL) ? mc_receive_group : MC_RECEIVE_GROUP_DEFAULT,
1098       (mc_receive_port != NULL) ? mc_receive_port : MC_RECEIVE_PORT_DEFAULT,
1099       /* listen = */ 0);
1100
1101   staging_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1102   if (staging_tree == NULL)
1103   {
1104     ERROR ("gmond plugin: c_avl_create failed.");
1105     return (-1);
1106   }
1107
1108   mc_receive_thread_start ();
1109
1110   return (0);
1111 } /* }}} int gmond_init */
1112
1113 static int gmond_shutdown (void) /* {{{ */
1114 {
1115   size_t i;
1116
1117   mc_receive_thread_stop ();
1118
1119   pthread_mutex_lock (&mc_send_sockets_lock);
1120   for (i = 0; i < mc_send_sockets_num; i++)
1121   {
1122     close (mc_send_sockets[i].fd);
1123     mc_send_sockets[i].fd = -1;
1124   }
1125   sfree (mc_send_sockets);
1126   mc_send_sockets_num = 0;
1127   pthread_mutex_unlock (&mc_send_sockets_lock);
1128
1129
1130   return (0);
1131 } /* }}} int gmond_shutdown */
1132
1133 void module_register (void)
1134 {
1135   plugin_register_complex_config ("gmond", gmond_config);
1136   plugin_register_init ("gmond", gmond_init);
1137   plugin_register_shutdown ("gmond", gmond_shutdown);
1138 }
1139
1140 /* vim: set sw=2 sts=2 et fdm=marker : */