X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fpinba.c;h=a6fd06fe2d56352a0a7e29cba18b394dd32559d2;hb=8fa66d76900da4206f9066737c974dcfd2bd2e34;hp=f5eb69dcfc9e532d2a3a919b5b224450b20fa107;hpb=8cf783a19214263efe4160d909e9f97001ff5184;p=collectd.git diff --git a/src/pinba.c b/src/pinba.c index f5eb69dc..a6fd06fe 100644 --- a/src/pinba.c +++ b/src/pinba.c @@ -2,6 +2,7 @@ * collectd - src/pinba.c (based on code from pinba_engine 0.0.5) * Copyright (c) 2007-2009 Antony Dovgal * Copyright (C) 2010 Phoenix Kayo + * Copyright (C) 2010 Florian Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -19,6 +20,7 @@ * Authors: * Antony Dovgal * Phoenix Kayo + * Florian Forster **/ #include "collectd.h" @@ -33,111 +35,116 @@ #include "pinba.pb-c.h" -typedef uint8_t u_char; - -#include - /* - * Service declaration section + * Defines */ #ifndef PINBA_UDP_BUFFER_SIZE # define PINBA_UDP_BUFFER_SIZE 65536 #endif -#ifndef PINBA_DEFAULT_ADDRESS -# define PINBA_DEFAULT_ADDRESS "127.0.0.1" /* FIXME */ +#ifndef PINBA_DEFAULT_NODE +# define PINBA_DEFAULT_NODE "::0" #endif -#ifndef PINBA_DEFAULT_PORT -# define PINBA_DEFAULT_PORT 12345 /* FIXME */ +#ifndef PINBA_DEFAULT_SERVICE +# define PINBA_DEFAULT_SERVICE "30002" #endif #ifndef PINBA_MAX_SOCKETS # define PINBA_MAX_SOCKETS 16 #endif -#ifndef NI_MAXSERV -# define NI_MAXSERV 32 -#endif - /* * Private data structures */ -typedef struct _pinba_statres_ pinba_statres; -struct _pinba_statres_ { - const char *name; - double req_per_sec; - double req_time; - double ru_utime; - double ru_stime; - double doc_size; - double mem_peak; -}; - -struct pinba_socket_s { - int listen_sock; - struct event *accept_event; - +/* {{{ */ +struct pinba_socket_s +{ struct pollfd fd[PINBA_MAX_SOCKETS]; nfds_t fd_num; }; typedef struct pinba_socket_s pinba_socket_t; -typedef double pinba_time_t; -typedef uint32_t pinba_size_t; - -static struct event_base *temp_base = NULL; - -static pinba_socket_t *temp_sock = NULL; - -static pthread_t temp_thrd; +/* Fixed point counter value. n is the decimal part multiplied by 10^9. */ +struct float_counter_s +{ + uint64_t i; + uint64_t n; /* nanos */ +}; +typedef struct float_counter_s float_counter_t; -typedef struct pinba_statnode_s pinba_statnode_t; struct pinba_statnode_s { - /* collector name */ + /* collector name, used as plugin instance */ char *name; + /* query data */ char *host; char *server; char *script; - /* collected data */ - pinba_time_t last_coll; - pinba_size_t req_count; - pinba_time_t req_time; - pinba_time_t ru_utime; - pinba_time_t ru_stime; - pinba_size_t doc_size; - pinba_size_t mem_peak; + + derive_t req_count; + + float_counter_t req_time; + float_counter_t ru_utime; + float_counter_t ru_stime; + + derive_t doc_size; + gauge_t mem_peak; }; +typedef struct pinba_statnode_s pinba_statnode_t; +/* }}} */ +/* + * Module global variables + */ +/* {{{ */ static pinba_statnode_t *stat_nodes = NULL; static unsigned int stat_nodes_num = 0; static pthread_mutex_t stat_nodes_lock; -char service_status=0; -char *service_address = PINBA_DEFAULT_ADDRESS; -unsigned int service_port=PINBA_DEFAULT_PORT; +static char *conf_node = NULL; +static char *conf_service = NULL; -static pinba_time_t now (void) /* {{{ */ +static _Bool collector_thread_running = 0; +static _Bool collector_thread_do_shutdown = 0; +static pthread_t collector_thread_id; +/* }}} */ + +/* + * Functions + */ +static void float_counter_add (float_counter_t *fc, float val) /* {{{ */ { - static struct timeval tv; - - gettimeofday (&tv, /* tz = */ NULL); - - return (double)tv.tv_sec+((double)tv.tv_usec/(double)1000000); -} /* }}} */ + uint64_t tmp; + + if (val < 0.0) + return; + + tmp = (uint64_t) val; + val -= (double) tmp; + + fc->i += tmp; + fc->n += (uint64_t) ((val * 1000000000.0) + .5); -static void service_statnode_reset (pinba_statnode_t *node) /* {{{ */ + if (fc->n >= 1000000000) + { + fc->i += 1; + fc->n -= 1000000000; + assert (fc->n < 1000000000); + } +} /* }}} void float_counter_add */ + +static derive_t float_counter_get (const float_counter_t *fc, /* {{{ */ + uint64_t factor) { - node->last_coll=now(); - node->req_count=0; - node->req_time=0.0; - node->ru_utime=0.0; - node->ru_stime=0.0; - node->doc_size=0; - node->mem_peak=0; -} /* }}} void service_statnode_reset */ + derive_t ret; + + ret = (derive_t) (fc->i * factor); + ret += (derive_t) (fc->n / (1000000000 / factor)); + + return (ret); +} /* }}} derive_t float_counter_get */ static void strset (char **str, const char *new) /* {{{ */ { @@ -160,83 +167,44 @@ static void service_statnode_add(const char *name, /* {{{ */ const char *script) { pinba_statnode_t *node; - DEBUG("adding node `%s' to collector { %s, %s, %s }", name, host?host:"", server?server:"", script?script:""); - stat_nodes=realloc(stat_nodes, sizeof(pinba_statnode_t)*(stat_nodes_num+1)); - if(!stat_nodes){ - ERROR("Realloc failed!"); - exit(-1); + node = realloc (stat_nodes, + sizeof (*stat_nodes) * (stat_nodes_num + 1)); + if (node == NULL) + { + ERROR ("pinba plugin: realloc failed"); + return; } - - node=&stat_nodes[stat_nodes_num]; - - /* reset stat data */ - service_statnode_reset(node); + stat_nodes = node; + + node = stat_nodes + stat_nodes_num; + memset (node, 0, sizeof (*node)); /* reset strings */ - node->name=NULL; - node->host=NULL; - node->server=NULL; - node->script=NULL; + node->name = NULL; + node->host = NULL; + node->server = NULL; + node->script = NULL; + + node->mem_peak = NAN; /* fill query data */ - strset(&node->name, name); - strset(&node->host, host); - strset(&node->server, server); - strset(&node->script, script); + strset (&node->name, name); + strset (&node->host, host); + strset (&node->server, server); + strset (&node->script, script); /* increment counter */ stat_nodes_num++; } /* }}} void service_statnode_add */ -static void service_statnode_free (void) -{ - unsigned int i; - - if(stat_nodes_num < 1) - return; - - for (i = 0; i < stat_nodes_num; i++) - { - sfree (stat_nodes[i].name); - sfree (stat_nodes[i].host); - sfree (stat_nodes[i].server); - sfree (stat_nodes[i].script); - } - - sfree (stat_nodes); - stat_nodes_num = 0; - - pthread_mutex_destroy (&stat_nodes_lock); -} - -static void service_statnode_init (void) -{ - /* only total info collect by default */ - service_statnode_free(); - - DEBUG("initializing collector.."); - pthread_mutex_init(&stat_nodes_lock, 0); -} - -static void service_statnode_begin (void) -{ - service_statnode_init(); - pthread_mutex_lock(&stat_nodes_lock); - - service_statnode_add("total", NULL, NULL, NULL); -} - -static void service_statnode_end (void) -{ - pthread_mutex_unlock(&stat_nodes_lock); -} - -static unsigned int service_statnode_collect (pinba_statres *res, /* {{{ */ +/* Copy the data from the global "stat_nodes" list into the buffer pointed to + * by "res", doing the derivation in the process. Returns the next index or + * zero if the end of the list has been reached. */ +static unsigned int service_statnode_collect (pinba_statnode_t *res, /* {{{ */ unsigned int index) { - pinba_statnode_t* node; - pinba_time_t delta; + pinba_statnode_t *node; if (stat_nodes_num == 0) return 0; @@ -251,39 +219,34 @@ static unsigned int service_statnode_collect (pinba_statres *res, /* {{{ */ pthread_mutex_unlock (&stat_nodes_lock); return 0; } - + node = stat_nodes + index; - delta = now() - node->last_coll; - - res->name = node->name; - res->req_per_sec = node->req_count / delta; - - if (node->req_count == 0) - node->req_count = 1; - - res->req_time = node->req_time / node->req_count; - res->ru_utime = node->ru_utime / node->req_count; - res->ru_stime = node->ru_stime / node->req_count; - res->ru_stime = node->ru_stime / node->req_count; - res->doc_size = node->doc_size / node->req_count; - res->mem_peak = node->mem_peak / node->req_count; + memcpy (res, node, sizeof (*res)); + + /* reset node */ + node->mem_peak = NAN; - service_statnode_reset (node); return (index + 1); } /* }}} unsigned int service_statnode_collect */ -static void service_statnode_process (pinba_statnode_t *node, +static void service_statnode_process (pinba_statnode_t *node, /* {{{ */ Pinba__Request* request) { node->req_count++; - node->req_time+=request->request_time; - node->ru_utime+=request->ru_utime; - node->ru_stime+=request->ru_stime; - node->doc_size+=request->document_size; - node->mem_peak+=request->memory_peak; -} - -static void service_process_request (Pinba__Request *request) + + float_counter_add (&node->req_time, request->request_time); + float_counter_add (&node->ru_utime, request->ru_utime); + float_counter_add (&node->ru_stime, request->ru_stime); + + node->doc_size += request->document_size; + + if (isnan (node->mem_peak) + || (node->mem_peak < ((gauge_t) request->memory_peak))) + node->mem_peak = (gauge_t) request->memory_peak; + +} /* }}} void service_statnode_process */ + +static void service_process_request (Pinba__Request *request) /* {{{ */ { unsigned int i; @@ -291,118 +254,43 @@ static void service_process_request (Pinba__Request *request) for (i = 0; i < stat_nodes_num; i++) { - if(stat_nodes[i].host && strcmp(request->hostname, stat_nodes[i].host)) + if ((stat_nodes[i].host != NULL) + && (strcmp (request->hostname, stat_nodes[i].host) != 0)) continue; - if(stat_nodes[i].server && strcmp(request->server_name, stat_nodes[i].server)) + + if ((stat_nodes[i].server != NULL) + && (strcmp (request->server_name, stat_nodes[i].server) != 0)) continue; - if(stat_nodes[i].script && strcmp(request->script_name, stat_nodes[i].script)) + + if ((stat_nodes[i].script != NULL) + && (strcmp (request->script_name, stat_nodes[i].script) != 0)) continue; service_statnode_process(&stat_nodes[i], request); } pthread_mutex_unlock(&stat_nodes_lock); -} +} /* }}} void service_process_request */ -static void *pinba_main (void *arg) +static int pb_del_socket (pinba_socket_t *s, /* {{{ */ + nfds_t index) { - DEBUG("entering listen-loop.."); - - service_status=1; - event_base_dispatch(temp_base); - - /* unreachable */ - return NULL; -} + if (index >= s->fd_num) + return (EINVAL); -static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */ -{ - if (!socket) - return; - - if (socket->listen_sock >= 0) - { - close(socket->listen_sock); - socket->listen_sock = -1; - } - - if (socket->accept_event) + close (s->fd[index].fd); + s->fd[index].fd = -1; + + /* When deleting the last element in the list, no memmove is necessary. */ + if (index < (s->fd_num - 1)) { - event_del(socket->accept_event); - free(socket->accept_event); - socket->accept_event = NULL; + memmove (&s->fd[index], &s->fd[index + 1], + sizeof (s->fd[0]) * (s->fd_num - (index + 1))); } - - free(socket); -} /* }}} void pinba_socket_free */ -static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */ - size_t buffer_size) -{ - Pinba__Request *request; - - request = pinba__request__unpack (NULL, buffer_size, buffer); - - if (!request) - return (-1); - - service_process_request(request); - pinba__request__free_unpacked (request, NULL); - + s->fd_num--; return (0); -} /* }}} int pinba_process_stats_packet */ - -static void pinba_udp_read_callback_fn (int sock, short event, void *arg) /* {{{ */ -{ - uint8_t buffer[PINBA_UDP_BUFFER_SIZE]; - size_t buffer_size; - int status; - - if ((event & EV_READ) == 0) - return; - - while (42) - { - buffer_size = sizeof (buffer); - status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0); - if (status < 0) - { - char errbuf[1024]; - - if ((errno == EINTR) -#ifdef EWOULDBLOCK - || (errno == EWOULDBLOCK) -#endif - || (errno == EAGAIN)) - { - continue; - } - - WARNING("pinba plugin: recvfrom(2) failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return; - } - else if (status == 0) - { - DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero."); - return; - } - else /* if (status > 0) */ - { - assert (((size_t) status) < buffer_size); - buffer_size = (size_t) status; - buffer[buffer_size] = 0; - - status = pinba_process_stats_packet (buffer, buffer_size); - if (status != 0) - DEBUG("pinba plugin: Parsing packet failed."); - return; - } - - /* not reached */ - assert (23 == 42); - } /* while (42) */ -} /* }}} void pinba_udp_read_callback_fn */ +} /* }}} int pb_del_socket */ static int pb_add_socket (pinba_socket_t *s, /* {{{ */ const struct addrinfo *ai) @@ -455,7 +343,7 @@ static int pb_add_socket (pinba_socket_t *s, /* {{{ */ } /* }}} int pb_add_socket */ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ - int listen_port) + const char *service) { pinba_socket_t *s; struct addrinfo *ai_list; @@ -463,9 +351,6 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ struct addrinfo ai_hints; int status; - char service[NI_MAXSERV]; /* FIXME */ - snprintf (service, sizeof (service), "%i", listen_port); - memset (&ai_hints, 0, sizeof (ai_hints)); ai_hints.ai_flags = AI_PASSIVE; ai_hints.ai_family = AF_UNSPEC; @@ -474,6 +359,12 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ ai_hints.ai_canonname = NULL; ai_hints.ai_next = NULL; + if (node == NULL) + node = PINBA_DEFAULT_NODE; + + if (service == NULL) + service = PINBA_DEFAULT_SERVICE; + ai_list = NULL; status = getaddrinfo (node, service, &ai_hints, &ai_list); @@ -486,7 +377,7 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ assert (ai_list != NULL); s = malloc (sizeof (*s)); - if (s != NULL) + if (s == NULL) { freeaddrinfo (ai_list); ERROR ("pinba plugin: malloc failed."); @@ -513,232 +404,347 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ return (s); } /* }}} pinba_socket_open */ -static int service_cleanup (void) +static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */ { - DEBUG("closing socket.."); - if(temp_sock){ - pthread_mutex_lock(&stat_nodes_lock); - pinba_socket_free(temp_sock); - pthread_mutex_unlock(&stat_nodes_lock); - } - - DEBUG("shutdowning event.."); - event_base_free(temp_base); - - DEBUG("shutting down.."); + nfds_t i; - return (0); -} - -static int service_start(void) -{ - DEBUG("starting up.."); - - DEBUG("initializing event.."); - temp_base = event_base_new(); - - DEBUG("opening socket.."); - - temp_sock = pinba_socket_open(service_address, service_port); - - if (!temp_sock) { - service_cleanup(); - return 1; - } + if (!socket) + return; - if (pthread_create(&temp_thrd, NULL, pinba_main, NULL)) { - service_cleanup(); - return 1; + for (i = 0; i < socket->fd_num; i++) + { + if (socket->fd[i].fd < 0) + continue; + close (socket->fd[i].fd); + socket->fd[i].fd = -1; } - return 0; -} + sfree(socket); +} /* }}} void pinba_socket_free */ -static int service_stop (void) +static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */ + size_t buffer_size) { - pthread_cancel(temp_thrd); - pthread_join(temp_thrd, NULL); - service_status=0; - DEBUG("terminating listen-loop.."); + Pinba__Request *request; - service_cleanup(); + request = pinba__request__unpack (NULL, buffer_size, buffer); - return 0; -} + if (!request) + return (-1); -static void service_config (const char *address, unsigned int port) /* {{{ */ + service_process_request(request); + pinba__request__free_unpacked (request, NULL); + + return (0); +} /* }}} int pinba_process_stats_packet */ + +static int pinba_udp_read_callback_fn (int sock) /* {{{ */ { - int need_restart = 0; + uint8_t buffer[PINBA_UDP_BUFFER_SIZE]; + size_t buffer_size; + int status; - if (address && service_address && (strcmp(service_address, address) != 0)) + while (42) { - strset (&service_address, address); - need_restart++; - } + buffer_size = sizeof (buffer); + status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0); + if (status < 0) + { + char errbuf[1024]; + + if ((errno == EINTR) +#ifdef EWOULDBLOCK + || (errno == EWOULDBLOCK) +#endif + || (errno == EAGAIN)) + { + continue; + } + + WARNING("pinba plugin: recvfrom(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } + else if (status == 0) + { + DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero."); + return (-1); + } + else /* if (status > 0) */ + { + assert (((size_t) status) < buffer_size); + buffer_size = (size_t) status; + buffer[buffer_size] = 0; + + status = pinba_process_stats_packet (buffer, buffer_size); + if (status != 0) + DEBUG("pinba plugin: Parsing packet failed."); + return (status); + } + } /* while (42) */ + + /* not reached */ + assert (23 == 42); + return (-1); +} /* }}} void pinba_udp_read_callback_fn */ + +static int receive_loop (void) /* {{{ */ +{ + pinba_socket_t *s; - if ((port > 0) && (port < 65536) && (service_port != port)) + s = pinba_socket_open (conf_node, conf_service); + if (s == NULL) { - service_port=port; - need_restart++; + ERROR ("pinba plugin: Collector thread is exiting prematurely."); + return (-1); } - if(service_status && need_restart) + while (!collector_thread_do_shutdown) { - service_stop(); - service_start(); - } -} /* }}} void service_config */ + int status; + nfds_t i; + + if (s->fd_num < 1) + break; + + status = poll (s->fd, s->fd_num, /* timeout = */ 1000); + if (status == 0) /* timeout */ + { + continue; + } + else if (status < 0) + { + char errbuf[1024]; + + if ((errno == EINTR) || (errno == EAGAIN)) + continue; + + ERROR ("pinba plugin: poll(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + pinba_socket_free (s); + return (-1); + } + + for (i = 0; i < s->fd_num; i++) + { + if (s->fd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) + { + pb_del_socket (s, i); + i--; + } + else if (s->fd[i].revents & (POLLIN | POLLPRI)) + { + pinba_udp_read_callback_fn (s->fd[i].fd); + } + } /* for (s->fd) */ + } /* while (!collector_thread_do_shutdown) */ + + pinba_socket_free (s); + s = NULL; + + return (0); +} /* }}} int receive_loop */ + +static void *collector_thread (void *arg) /* {{{ */ +{ + receive_loop (); + + memset (&collector_thread_id, 0, sizeof (collector_thread_id)); + collector_thread_running = 0; + pthread_exit (NULL); + return (NULL); +} /* }}} void *collector_thread */ /* * Plugin declaration section */ - -static int config_set (char **var, const char *value) +static int pinba_config_view (const oconfig_item_t *ci) /* {{{ */ { - /* code from nginx plugin for collectd */ - if (*var != NULL) { - free (*var); - *var = NULL; + char *name = NULL; + char *host = NULL; + char *server = NULL; + char *script = NULL; + int status; + int i; + + status = cf_util_get_string (ci, &name); + if (status != 0) + return (status); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Host", child->key) == 0) + status = cf_util_get_string (child, &host); + else if (strcasecmp ("Server", child->key) == 0) + status = cf_util_get_string (child, &server); + else if (strcasecmp ("Script", child->key) == 0) + status = cf_util_get_string (child, &script); + else + { + WARNING ("pinba plugin: Unknown config option: %s", child->key); + status = -1; + } + + if (status != 0) + break; } - - if ((*var = strdup (value)) == NULL) return (1); - else return (0); -} -static int plugin_config (oconfig_item_t *ci) + if (status == 0) + service_statnode_add (name, host, server, script); + + sfree (name); + sfree (host); + sfree (server); + sfree (script); + + return (status); +} /* }}} int pinba_config_view */ + +static int plugin_config (oconfig_item_t *ci) /* {{{ */ { - unsigned int i, o; - int pinba_port = 0; - char *pinba_address = NULL; - - INFO("Pinba Configure.."); + int i; - service_statnode_begin(); - - /* Set default values */ - config_set(&pinba_address, PINBA_DEFAULT_ADDRESS); - pinba_port = PINBA_DEFAULT_PORT; - - for (i = 0; i < ci->children_num; i++) { + /* The lock should not be necessary in the config callback, but let's be + * sure.. */ + pthread_mutex_lock (&stat_nodes_lock); + + for (i = 0; i < ci->children_num; i++) + { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Address", child->key) == 0) { - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING)){ - WARNING ("pinba plugin: `Address' needs exactly one string argument."); - return (-1); - } - config_set(&pinba_address, child->values[0].value.string); - } else if (strcasecmp ("Port", child->key) == 0) { - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_NUMBER)){ - WARNING ("pinba plugin: `Port' needs exactly one number argument."); - return (-1); - } - pinba_port=child->values[0].value.number; - } else if (strcasecmp ("View", child->key) == 0) { - const char *name=NULL, *host=NULL, *server=NULL, *script=NULL; - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING) || strlen(child->values[0].value.string)==0){ - WARNING ("pinba plugin: `View' needs exactly one non-empty string argument."); - return (-1); - } - name = child->values[0].value.string; - for(o=0; ochildren_num; o++){ - oconfig_item_t *node = child->children + o; - if (strcasecmp ("Host", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Host' needs exactly one non-empty string argument."); - return (-1); - } - host = node->values[0].value.string; - } else if (strcasecmp ("Server", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Server' needs exactly one non-empty string argument."); - return (-1); - } - server = node->values[0].value.string; - } else if (strcasecmp ("Script", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Script' needs exactly one non-empty string argument."); - return (-1); - } - script = node->values[0].value.string; - } else { - WARNING ("pinba plugin: In `' context allowed only `Host', `Server' and `Script' options but not the `%s'.", node->key); - return (-1); - } - } - /* add new statnode */ - service_statnode_add(name, host, server, script); - } else { - WARNING ("pinba plugin: In `' context allowed only `Address', `Port' and `Observe' options but not the `%s'.", child->key); - return (-1); - } + + if (strcasecmp ("Address", child->key) == 0) + cf_util_get_string (child, &conf_node); + else if (strcasecmp ("Port", child->key) == 0) + cf_util_get_string (child, &conf_service); + else if (strcasecmp ("View", child->key) == 0) + pinba_config_view (child); + else + WARNING ("pinba plugin: Unknown config option: %s", child->key); } + + pthread_mutex_unlock(&stat_nodes_lock); - service_statnode_end(); - - service_config(pinba_address, pinba_port); return (0); -} /* int pinba_config */ +} /* }}} int pinba_config */ -static int plugin_init (void) +static int plugin_init (void) /* {{{ */ { - INFO("Pinba Starting.."); - service_start(); - return 0; -} + int status; + + if (stat_nodes == NULL) + { + /* Collect the "total" data by default. */ + service_statnode_add ("total", + /* host = */ NULL, + /* server = */ NULL, + /* script = */ NULL); + } -static int plugin_shutdown (void) + if (collector_thread_running) + return (0); + + status = pthread_create (&collector_thread_id, + /* attrs = */ NULL, + collector_thread, + /* args = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("pinba plugin: pthread_create(3) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } + collector_thread_running = 1; + + return (0); +} /* }}} */ + +static int plugin_shutdown (void) /* {{{ */ { - INFO("Pinba Stopping.."); - service_stop(); - service_statnode_free(); - return 0; -} + if (collector_thread_running) + { + int status; -static int plugin_submit (const char *plugin_instance, - const char *type, - const pinba_statres *res) { - value_t values[6]; + DEBUG ("pinba plugin: Shutting down collector thread."); + collector_thread_do_shutdown = 1; + + status = pthread_join (collector_thread_id, /* retval = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("pinba plugin: pthread_join(3) failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + } + + collector_thread_running = 0; + collector_thread_do_shutdown = 0; + } /* if (collector_thread_running) */ + + return (0); +} /* }}} int plugin_shutdown */ + +static int plugin_submit (const pinba_statnode_t *res) /* {{{ */ +{ + value_t value; value_list_t vl = VALUE_LIST_INIT; - values[0].gauge = res->req_per_sec; - values[1].gauge = res->req_time; - values[2].gauge = res->ru_utime; - values[3].gauge = res->ru_stime; - values[4].gauge = res->doc_size; - values[5].gauge = res->mem_peak; - - vl.values = values; - vl.values_len = 6; + vl.values = &value; + vl.values_len = 1; sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "pinba", sizeof (vl.plugin)); - sstrncpy (vl.plugin_instance, plugin_instance, - sizeof(vl.plugin_instance)); - sstrncpy (vl.type, type, sizeof (vl.type)); - INFO("Pinba Dispatch"); + sstrncpy (vl.plugin_instance, res->name, sizeof (vl.plugin_instance)); + + value.derive = res->req_count; + sstrncpy (vl.type, "total_requests", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->req_time, /* factor = */ 1000); + sstrncpy (vl.type, "total_time_in_ms", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = res->doc_size; + sstrncpy (vl.type, "total_bytes", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->ru_utime, /* factor = */ 100); + sstrncpy (vl.type, "cpu", sizeof (vl.type)); + sstrncpy (vl.type_instance, "user", sizeof (vl.type_instance)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->ru_stime, /* factor = */ 100); + sstrncpy (vl.type, "cpu", sizeof (vl.type)); + sstrncpy (vl.type_instance, "system", sizeof (vl.type_instance)); + plugin_dispatch_values (&vl); + + value.gauge = res->mem_peak; + sstrncpy (vl.type, "memory", sizeof (vl.type)); + sstrncpy (vl.type_instance, "peak", sizeof (vl.type_instance)); plugin_dispatch_values (&vl); return (0); -} +} /* }}} int plugin_submit */ -static int plugin_read (void) +static int plugin_read (void) /* {{{ */ { unsigned int i=0; - static pinba_statres res; + pinba_statnode_t data; - while ((i = service_statnode_collect (&res, i)) != 0) + while ((i = service_statnode_collect (&data, i)) != 0) { - plugin_submit(res.name, "pinba_view", &res); + plugin_submit (&data); } return 0; -} +} /* }}} int plugin_read */ -void module_register (void) +void module_register (void) /* {{{ */ { plugin_register_complex_config ("pinba", plugin_config); plugin_register_init ("pinba", plugin_init); plugin_register_read ("pinba", plugin_read); plugin_register_shutdown ("pinba", plugin_shutdown); -} /* void module_register */ +} /* }}} void module_register */ /* vim: set sw=2 sts=2 et fdm=marker : */