X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fgmond.c;h=2b299ca0d149674e067958c4b03ee133d5d1eb5f;hb=d59524524af36bf9e3eb6108d21744076c79701d;hp=09d713789f49402f1f8412486a56b9fd05c38ed0;hpb=24c2f247f6dc5c17b26d715346a380efababb08f;p=collectd.git diff --git a/src/gmond.c b/src/gmond.c index 09d71378..2b299ca0 100644 --- a/src/gmond.c +++ b/src/gmond.c @@ -1,6 +1,6 @@ /** * collectd - src/gmond.c - * Copyright (C) 2009,2010 Florian octo Forster + * Copyright (C) 2009-2015 Florian octo Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -25,17 +25,12 @@ **/ #include "collectd.h" + #include "plugin.h" #include "common.h" #include "configfile.h" #include "utils_avltree.h" -#if HAVE_PTHREAD_H -# include -#endif -#if HAVE_SYS_SOCKET_H -# include -#endif #if HAVE_NETDB_H # include #endif @@ -83,12 +78,12 @@ typedef struct staging_entry_s staging_entry_t; struct metric_map_s { - char *ganglia_name; - char *type; - char *type_instance; - char *ds_name; - int ds_type; - int ds_index; + char *ganglia_name; + char *type; + char *type_instance; + char *ds_name; + int ds_type; + size_t ds_index; }; typedef struct metric_map_s metric_map_t; @@ -166,7 +161,7 @@ static metric_map_t *metric_lookup (const char *key) /* {{{ */ return (NULL); /* Look up the DS type and ds_index. */ - if ((map[i].ds_type < 0) || (map[i].ds_index < 0)) /* {{{ */ + if (map[i].ds_type < 0) /* {{{ */ { const data_set_t *ds; @@ -191,7 +186,7 @@ static metric_map_t *metric_lookup (const char *key) /* {{{ */ } else { - int j; + size_t j; for (j = 0; j < ds->ds_num; j++) if (strcasecmp (ds->ds[j].name, map[i].ds_name) == 0) @@ -217,9 +212,7 @@ static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ size_t *ret_sockets_num, const char *node, const char *service, int listen) { - struct addrinfo ai_hints; struct addrinfo *ai_list; - struct addrinfo *ai_ptr; int ai_return; socket_entry_t *sockets = NULL; @@ -230,17 +223,12 @@ static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ if (*ret_sockets != NULL) return (EINVAL); - memset (&ai_hints, 0, sizeof (ai_hints)); - ai_hints.ai_flags = 0; -#ifdef AI_PASSIVE - ai_hints.ai_flags |= AI_PASSIVE; -#endif -#ifdef AI_ADDRCONFIG - ai_hints.ai_flags |= AI_ADDRCONFIG; -#endif - ai_hints.ai_family = AF_UNSPEC; - ai_hints.ai_socktype = SOCK_DGRAM; - ai_hints.ai_protocol = IPPROTO_UDP; + struct addrinfo ai_hints = { + .ai_family = AF_UNSPEC, + .ai_flags = AI_ADDRCONFIG | AI_PASSIVE, + .ai_protocol = IPPROTO_UDP, + .ai_socktype = SOCK_DGRAM + }; ai_return = getaddrinfo (node, service, &ai_hints, &ai_list); if (ai_return != 0) @@ -255,7 +243,7 @@ static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ return (-1); } - for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) /* {{{ */ + for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) /* {{{ */ { socket_entry_t *tmp; @@ -291,8 +279,14 @@ static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ { int yes = 1; - setsockopt (sockets[sockets_num].fd, SOL_SOCKET, SO_REUSEADDR, + status = setsockopt (sockets[sockets_num].fd, SOL_SOCKET, SO_REUSEADDR, (void *) &yes, sizeof (yes)); + if (status != 0) + { + char errbuf[1024]; + WARNING ("gmond plugin: setsockopt(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + } } status = bind (sockets[sockets_num].fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); @@ -308,7 +302,6 @@ static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ if (ai_ptr->ai_family == AF_INET) { struct sockaddr_in *addr; - struct ip_mreq mreq; int loop; addr = (struct sockaddr_in *) ai_ptr->ai_addr; @@ -320,19 +313,32 @@ static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ } loop = 1; - setsockopt (sockets[sockets_num].fd, IPPROTO_IP, IP_MULTICAST_LOOP, + status = setsockopt (sockets[sockets_num].fd, IPPROTO_IP, IP_MULTICAST_LOOP, (void *) &loop, sizeof (loop)); + if (status != 0) + { + char errbuf[1024]; + WARNING ("gmond plugin: setsockopt(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + } + + struct ip_mreq mreq = { + .imr_multiaddr.s_addr = addr->sin_addr.s_addr, + .imr_interface.s_addr = htonl (INADDR_ANY) + }; - memset (&mreq, 0, sizeof (mreq)); - mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr; - mreq.imr_interface.s_addr = htonl (INADDR_ANY); - setsockopt (sockets[sockets_num].fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + status = setsockopt (sockets[sockets_num].fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *) &mreq, sizeof (mreq)); + if (status != 0) + { + char errbuf[1024]; + WARNING ("gmond plugin: setsockopt(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + } } /* if (ai_ptr->ai_family == AF_INET) */ else if (ai_ptr->ai_family == AF_INET6) { struct sockaddr_in6 *addr; - struct ipv6_mreq mreq; int loop; addr = (struct sockaddr_in6 *) ai_ptr->ai_addr; @@ -344,15 +350,29 @@ static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ } loop = 1; - setsockopt (sockets[sockets_num].fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, + status = setsockopt (sockets[sockets_num].fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, (void *) &loop, sizeof (loop)); + if (status != 0) + { + char errbuf[1024]; + WARNING ("gmond plugin: setsockopt(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + } + + struct ipv6_mreq mreq = { + .ipv6mr_interface = 0 /* any */ + }; - memset (&mreq, 0, sizeof (mreq)); memcpy (&mreq.ipv6mr_multiaddr, &addr->sin6_addr, sizeof (addr->sin6_addr)); - mreq.ipv6mr_interface = 0; /* any */ - setsockopt (sockets[sockets_num].fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, + status = setsockopt (sockets[sockets_num].fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (void *) &mreq, sizeof (mreq)); + if (status != 0) + { + char errbuf[1024]; + WARNING ("gmond plugin: setsockopt(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + } } /* if (ai_ptr->ai_family == AF_INET6) */ sockets_num++; @@ -373,13 +393,10 @@ static int create_sockets (socket_entry_t **ret_sockets, /* {{{ */ static int request_meta_data (const char *host, const char *name) /* {{{ */ { - Ganglia_metadata_msg msg; - char buffer[BUFF_SIZE]; + Ganglia_metadata_msg msg = { 0 }; + char buffer[BUFF_SIZE] = { 0 }; unsigned int buffer_size; XDR xdr; - size_t i; - - memset (&msg, 0, sizeof (msg)); msg.id = gmetadata_request; msg.Ganglia_metadata_msg_u.grequest.metric_id.host = strdup (host); @@ -393,7 +410,6 @@ static int request_meta_data (const char *host, const char *name) /* {{{ */ return (-1); } - memset (buffer, 0, sizeof (buffer)); xdrmem_create (&xdr, buffer, sizeof (buffer), XDR_ENCODE); if (!xdr_Ganglia_metadata_msg (&xdr, &msg)) @@ -409,11 +425,20 @@ static int request_meta_data (const char *host, const char *name) /* {{{ */ host, name); pthread_mutex_lock (&mc_send_sockets_lock); - for (i = 0; i < mc_send_sockets_num; i++) - sendto (mc_send_sockets[i].fd, buffer, (size_t) buffer_size, + for (size_t i = 0; i < mc_send_sockets_num; i++) + { + ssize_t status = sendto (mc_send_sockets[i].fd, buffer, (size_t) buffer_size, /* flags = */ 0, (struct sockaddr *) &mc_send_sockets[i].addr, mc_send_sockets[i].addrlen); + if (status == -1) + { + char errbuf[1024]; + ERROR ("gmond plugin: sendto(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + continue; + } + } pthread_mutex_unlock (&mc_send_sockets_lock); sfree (msg.Ganglia_metadata_msg_u.grequest.metric_id.host); @@ -442,10 +467,9 @@ static staging_entry_t *staging_entry_get (const char *host, /* {{{ */ return (se); /* insert new entry */ - se = (staging_entry_t *) malloc (sizeof (*se)); + se = calloc (1, sizeof (*se)); if (se == NULL) return (NULL); - memset (se, 0, sizeof (*se)); sstrncpy (se->key, key, sizeof (se->key)); se->flags = 0; @@ -479,39 +503,9 @@ static staging_entry_t *staging_entry_get (const char *host, /* {{{ */ return (se); } /* }}} staging_entry_t *staging_entry_get */ -static int staging_entry_submit (const char *host, const char *name, /* {{{ */ - staging_entry_t *se) -{ - value_list_t vl; - value_t values[se->vl.values_len]; - - if (se->vl.interval == 0) - { - /* No meta data has been received for this metric yet. */ - se->flags = 0; - pthread_mutex_unlock (&staging_lock); - request_meta_data (host, name); - return (0); - } - - se->flags = 0; - - memcpy (values, se->vl.values, sizeof (values)); - memcpy (&vl, &se->vl, sizeof (vl)); - - /* Unlock before calling `plugin_dispatch_values'.. */ - pthread_mutex_unlock (&staging_lock); - - vl.values = values; - - plugin_dispatch_values (&vl); - - return (0); -} /* }}} int staging_entry_submit */ - static int staging_entry_update (const char *host, const char *name, /* {{{ */ const char *type, const char *type_instance, - int ds_index, int ds_type, value_t value) + size_t ds_index, int ds_type, value_t value) { const data_set_t *ds; staging_entry_t *se; @@ -525,7 +519,7 @@ static int staging_entry_update (const char *host, const char *name, /* {{{ */ if (ds->ds_num <= ds_index) { - ERROR ("gmond plugin: Invalid index %i: %s has only %i data source(s).", + ERROR ("gmond plugin: Invalid index %zu: %s has only %zu data source(s).", ds_index, ds->type, ds->ds_num); return (-1); } @@ -558,17 +552,30 @@ static int staging_entry_update (const char *host, const char *name, /* {{{ */ se->flags |= (0x01 << ds_index); - /* Check if all values have been set and submit if so. */ - if (se->flags == ((0x01 << se->vl.values_len) - 1)) + /* Check if all data sources have been set. If not, return here. */ + if (se->flags != ((0x01 << se->vl.values_len) - 1)) { - /* `staging_lock' is unlocked in `staging_entry_submit'. */ - staging_entry_submit (host, name, se); + pthread_mutex_unlock (&staging_lock); + return (0); } - else + + /* Check if the interval of this metric is known. If not, request meta data + * and return. */ + if (se->vl.interval == 0) { + /* No meta data has been received for this metric yet. */ + se->flags = 0; pthread_mutex_unlock (&staging_lock); + + request_meta_data (host, name); + return (0); } + plugin_dispatch_values (&se->vl); + + se->flags = 0; + pthread_mutex_unlock (&staging_lock); + return (0); } /* }}} int staging_entry_update */ @@ -698,7 +705,7 @@ static int mc_handle_metadata_msg (Ganglia_metadata_msg *msg) /* {{{ */ msg_meta = msg->Ganglia_metadata_msg_u.gfull; - if (msg_meta.metric.tmax <= 0) + if (msg_meta.metric.tmax == 0) return (-1); map = metric_lookup (msg_meta.metric_id.name); @@ -766,9 +773,8 @@ static int mc_handle_metric (void *buffer, size_t buffer_size) /* {{{ */ case gmetric_float: case gmetric_double: { - Ganglia_value_msg msg; + Ganglia_value_msg msg = { 0 }; - memset (&msg, 0, sizeof (msg)); if (xdr_Ganglia_value_msg (&xdr, &msg)) mc_handle_value_msg (&msg); break; @@ -777,8 +783,7 @@ static int mc_handle_metric (void *buffer, size_t buffer_size) /* {{{ */ case gmetadata_full: case gmetadata_request: { - Ganglia_metadata_msg msg; - memset (&msg, 0, sizeof (msg)); + Ganglia_metadata_msg msg = { 0 }; if (xdr_Ganglia_metadata_msg (&xdr, &msg)) mc_handle_metadata_msg (&msg); break; @@ -822,7 +827,6 @@ static void *mc_receive_thread (void *arg) /* {{{ */ { socket_entry_t *mc_receive_socket_entries; int status; - size_t i; mc_receive_socket_entries = NULL; status = create_sockets (&mc_receive_socket_entries, &mc_receive_sockets_num, @@ -840,7 +844,7 @@ static void *mc_receive_thread (void *arg) /* {{{ */ if (mc_receive_sockets == NULL) { ERROR ("gmond plugin: calloc failed."); - for (i = 0; i < mc_receive_sockets_num; i++) + for (size_t i = 0; i < mc_receive_sockets_num; i++) close (mc_receive_socket_entries[i].fd); free (mc_receive_socket_entries); mc_receive_socket_entries = NULL; @@ -848,7 +852,7 @@ static void *mc_receive_thread (void *arg) /* {{{ */ return ((void *) -1); } - for (i = 0; i < mc_receive_sockets_num; i++) + for (size_t i = 0; i < mc_receive_sockets_num; i++) { mc_receive_sockets[i].fd = mc_receive_socket_entries[i].fd; mc_receive_sockets[i].events = POLLIN | POLLPRI; @@ -868,13 +872,14 @@ static void *mc_receive_thread (void *arg) /* {{{ */ break; } - for (i = 0; i < mc_receive_sockets_num; i++) + for (size_t i = 0; i < mc_receive_sockets_num; i++) { if (mc_receive_sockets[i].revents != 0) mc_handle_socket (mc_receive_sockets + i); } } /* while (mc_receive_thread_loop != 0) */ + free (mc_receive_socket_entries); return ((void *) 0); } /* }}} void *mc_receive_thread */ @@ -917,7 +922,7 @@ static int mc_receive_thread_stop (void) /* {{{ */ return (0); } /* }}} int mc_receive_thread_stop */ -/* +/* * Config: * * @@ -955,7 +960,6 @@ static int gmond_config_set_string (oconfig_item_t *ci, char **str) /* {{{ */ static int gmond_config_add_metric (oconfig_item_t *ci) /* {{{ */ { metric_map_t *map; - int i; if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) { @@ -987,7 +991,7 @@ static int gmond_config_add_metric (oconfig_item_t *ci) /* {{{ */ return (-1); } - for (i = 0; i < ci->children_num; i++) + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; if (strcasecmp ("Type", child->key) == 0) @@ -1064,9 +1068,7 @@ static int gmond_config_set_address (oconfig_item_t *ci, /* {{{ */ static int gmond_config (oconfig_item_t *ci) /* {{{ */ { - int i; - - for (i = 0; i < ci->children_num; i++) + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; if (strcasecmp ("MCReceiveFrom", child->key) == 0) @@ -1090,7 +1092,7 @@ static int gmond_init (void) /* {{{ */ (mc_receive_port != NULL) ? mc_receive_port : MC_RECEIVE_PORT_DEFAULT, /* listen = */ 0); - staging_tree = c_avl_create ((void *) strcmp); + staging_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp); if (staging_tree == NULL) { ERROR ("gmond plugin: c_avl_create failed."); @@ -1104,12 +1106,10 @@ static int gmond_init (void) /* {{{ */ static int gmond_shutdown (void) /* {{{ */ { - size_t i; - mc_receive_thread_stop (); pthread_mutex_lock (&mc_send_sockets_lock); - for (i = 0; i < mc_send_sockets_num; i++) + for (size_t i = 0; i < mc_send_sockets_num; i++) { close (mc_send_sockets[i].fd); mc_send_sockets[i].fd = -1;