memcached: Fix CPU usage reporting
[collectd.git] / src / memcached.c
1 /**
2  * collectd - src/memcached.c, based on src/hddtemp.c
3  * Copyright (C) 2007       Antony Dovgal
4  * Copyright (C) 2007-2012  Florian Forster
5  * Copyright (C) 2009       Doug MacEachern
6  * Copyright (C) 2009       Franck Lombardi
7  * Copyright (C) 2012       Nicolas Szalay
8  *
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; either version 2 of the License, or (at your
12  * option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write to the Free Software Foundation, Inc.,
21  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
22  *
23  * Authors:
24  *   Antony Dovgal <tony at daylessday dot org>
25  *   Florian octo Forster <octo at collectd.org>
26  *   Doug MacEachern <dougm at hyperic.com>
27  *   Franck Lombardi
28  *   Nicolas Szalay
29  **/
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "plugin.h"
35
36 #include <netdb.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <sys/un.h>
40
41 #define MEMCACHED_DEF_HOST "127.0.0.1"
42 #define MEMCACHED_DEF_PORT "11211"
43
44 struct memcached_s {
45   char *name;
46   char *socket;
47   char *host;
48   char *port;
49 };
50 typedef struct memcached_s memcached_t;
51
52 static _Bool memcached_have_instances = 0;
53
54 static void memcached_free(void *arg) {
55   memcached_t *st = arg;
56   if (st == NULL)
57     return;
58
59   sfree(st->name);
60   sfree(st->socket);
61   sfree(st->host);
62   sfree(st->port);
63   sfree(st);
64 }
65
66 static int memcached_connect_unix(memcached_t *st) {
67   struct sockaddr_un serv_addr = {0};
68   int fd;
69
70   serv_addr.sun_family = AF_UNIX;
71   sstrncpy(serv_addr.sun_path, st->socket, sizeof(serv_addr.sun_path));
72
73   /* create our socket descriptor */
74   fd = socket(AF_UNIX, SOCK_STREAM, 0);
75   if (fd < 0) {
76     char errbuf[1024];
77     ERROR("memcached plugin: memcached_connect_unix: socket(2) failed: %s",
78           sstrerror(errno, errbuf, sizeof(errbuf)));
79     return (-1);
80   }
81
82   /* connect to the memcached daemon */
83   int status = connect(fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
84   if (status != 0) {
85     shutdown(fd, SHUT_RDWR);
86     close(fd);
87     fd = -1;
88   }
89
90   return (fd);
91 } /* int memcached_connect_unix */
92
93 static int memcached_connect_inet(memcached_t *st) {
94   const char *host;
95   const char *port;
96
97   struct addrinfo *ai_list;
98   int status;
99   int fd = -1;
100
101   host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
102   port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
103
104   struct addrinfo ai_hints = {.ai_family = AF_UNSPEC,
105                               .ai_flags = AI_ADDRCONFIG,
106                               .ai_socktype = SOCK_STREAM};
107
108   status = getaddrinfo(host, port, &ai_hints, &ai_list);
109   if (status != 0) {
110     char errbuf[1024];
111     ERROR("memcached plugin: memcached_connect_inet: "
112           "getaddrinfo(%s,%s) failed: %s",
113           host, port,
114           (status == EAI_SYSTEM) ? sstrerror(errno, errbuf, sizeof(errbuf))
115                                  : gai_strerror(status));
116     return (-1);
117   }
118
119   for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL;
120        ai_ptr = ai_ptr->ai_next) {
121     /* create our socket descriptor */
122     fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
123     if (fd < 0) {
124       char errbuf[1024];
125       WARNING("memcached plugin: memcached_connect_inet: "
126               "socket(2) failed: %s",
127               sstrerror(errno, errbuf, sizeof(errbuf)));
128       continue;
129     }
130
131     /* connect to the memcached daemon */
132     status = (int)connect(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
133     if (status != 0) {
134       shutdown(fd, SHUT_RDWR);
135       close(fd);
136       fd = -1;
137       continue;
138     }
139
140     /* A socket could be opened and connecting succeeded. We're done. */
141     break;
142   }
143
144   freeaddrinfo(ai_list);
145   return (fd);
146 } /* int memcached_connect_inet */
147
148 static int memcached_connect(memcached_t *st) {
149   if (st->socket != NULL)
150     return (memcached_connect_unix(st));
151   else
152     return (memcached_connect_inet(st));
153 }
154
155 static int memcached_query_daemon(char *buffer, size_t buffer_size,
156                                   memcached_t *st) {
157   int fd, status;
158   size_t buffer_fill;
159
160   fd = memcached_connect(st);
161   if (fd < 0) {
162     ERROR("memcached plugin: Instance \"%s\" could not connect to daemon.",
163           st->name);
164     return -1;
165   }
166
167   status = (int)swrite(fd, "stats\r\n", strlen("stats\r\n"));
168   if (status != 0) {
169     char errbuf[1024];
170     ERROR("memcached plugin: write(2) failed: %s",
171           sstrerror(errno, errbuf, sizeof(errbuf)));
172     shutdown(fd, SHUT_RDWR);
173     close(fd);
174     return (-1);
175   }
176
177   /* receive data from the memcached daemon */
178   memset(buffer, 0, buffer_size);
179
180   buffer_fill = 0;
181   while ((status = (int)recv(fd, buffer + buffer_fill,
182                              buffer_size - buffer_fill, /* flags = */ 0)) !=
183          0) {
184     char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
185     if (status < 0) {
186       char errbuf[1024];
187
188       if ((errno == EAGAIN) || (errno == EINTR))
189         continue;
190
191       ERROR("memcached: Error reading from socket: %s",
192             sstrerror(errno, errbuf, sizeof(errbuf)));
193       shutdown(fd, SHUT_RDWR);
194       close(fd);
195       return (-1);
196     }
197
198     buffer_fill += (size_t)status;
199     if (buffer_fill > buffer_size) {
200       buffer_fill = buffer_size;
201       WARNING("memcached plugin: Message was truncated.");
202       break;
203     }
204
205     /* If buffer ends in end_token, we have all the data. */
206     if (memcmp(buffer + buffer_fill - sizeof(end_token), end_token,
207                sizeof(end_token)) == 0)
208       break;
209   } /* while (recv) */
210
211   status = 0;
212   if (buffer_fill == 0) {
213     WARNING("memcached plugin: No data returned by memcached.");
214     status = -1;
215   }
216
217   shutdown(fd, SHUT_RDWR);
218   close(fd);
219   return (status);
220 } /* int memcached_query_daemon */
221
222 static void memcached_init_vl(value_list_t *vl, memcached_t const *st) {
223   char const *host = st->host;
224
225   /* Set vl->host to hostname_g, if:
226    * - Legacy mode is used.
227    * - "Socket" option is given (doc: "Host option is ignored").
228    * - "Host" option is not provided.
229    * - "Host" option is set to "localhost" or "127.0.0.1". */
230   if ((strcmp(st->name, "__legacy__") == 0) || (st->socket != NULL) ||
231       (st->host == NULL) || (strcmp("127.0.0.1", st->host) == 0) ||
232       (strcmp("localhost", st->host) == 0))
233     host = hostname_g;
234
235   sstrncpy(vl->plugin, "memcached", sizeof(vl->plugin));
236   sstrncpy(vl->host, host, sizeof(vl->host));
237   if (strcmp(st->name, "__legacy__") != 0)
238     sstrncpy(vl->plugin_instance, st->name, sizeof(vl->plugin_instance));
239 }
240
241 static void submit_derive(const char *type, const char *type_inst,
242                           derive_t value, memcached_t *st) {
243   value_t values[1];
244   value_list_t vl = VALUE_LIST_INIT;
245   memcached_init_vl(&vl, st);
246
247   values[0].derive = value;
248
249   vl.values = values;
250   vl.values_len = 1;
251   sstrncpy(vl.type, type, sizeof(vl.type));
252   if (type_inst != NULL)
253     sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
254
255   plugin_dispatch_values(&vl);
256 }
257
258 static void submit_derive2(const char *type, const char *type_inst,
259                            derive_t value0, derive_t value1, memcached_t *st) {
260   value_t values[2];
261   value_list_t vl = VALUE_LIST_INIT;
262   memcached_init_vl(&vl, st);
263
264   values[0].derive = value0;
265   values[1].derive = value1;
266
267   vl.values = values;
268   vl.values_len = 2;
269   sstrncpy(vl.type, type, sizeof(vl.type));
270   if (type_inst != NULL)
271     sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
272
273   plugin_dispatch_values(&vl);
274 }
275
276 static void submit_gauge(const char *type, const char *type_inst, gauge_t value,
277                          memcached_t *st) {
278   value_t values[1];
279   value_list_t vl = VALUE_LIST_INIT;
280   memcached_init_vl(&vl, st);
281
282   values[0].gauge = value;
283
284   vl.values = values;
285   vl.values_len = 1;
286   sstrncpy(vl.type, type, sizeof(vl.type));
287   if (type_inst != NULL)
288     sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
289
290   plugin_dispatch_values(&vl);
291 }
292
293 static void submit_gauge2(const char *type, const char *type_inst,
294                           gauge_t value0, gauge_t value1, memcached_t *st) {
295   value_t values[2];
296   value_list_t vl = VALUE_LIST_INIT;
297   memcached_init_vl(&vl, st);
298
299   values[0].gauge = value0;
300   values[1].gauge = value1;
301
302   vl.values = values;
303   vl.values_len = 2;
304   sstrncpy(vl.type, type, sizeof(vl.type));
305   if (type_inst != NULL)
306     sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
307
308   plugin_dispatch_values(&vl);
309 }
310
311 static int memcached_read(user_data_t *user_data) {
312   char buf[4096];
313   char *fields[3];
314   char *ptr;
315   char *line;
316   char *saveptr;
317   int fields_num;
318
319   gauge_t bytes_used = NAN;
320   gauge_t bytes_total = NAN;
321   gauge_t hits = NAN;
322   gauge_t gets = NAN;
323   gauge_t incr_hits = NAN;
324   derive_t incr = 0;
325   gauge_t decr_hits = NAN;
326   derive_t decr = 0;
327   derive_t rusage_user = 0;
328   derive_t rusage_syst = 0;
329   derive_t octets_rx = 0;
330   derive_t octets_tx = 0;
331
332   memcached_t *st;
333   st = user_data->data;
334
335   /* get data from daemon */
336   if (memcached_query_daemon(buf, sizeof(buf), st) < 0) {
337     return -1;
338   }
339
340 #define FIELD_IS(cnst)                                                         \
341   (((sizeof(cnst) - 1) == name_len) && (strcmp(cnst, fields[1]) == 0))
342
343   ptr = buf;
344   saveptr = NULL;
345   while ((line = strtok_r(ptr, "\n\r", &saveptr)) != NULL) {
346     int name_len;
347
348     ptr = NULL;
349
350     fields_num = strsplit(line, fields, 3);
351     if (fields_num != 3)
352       continue;
353
354     name_len = strlen(fields[1]);
355     if (name_len == 0)
356       continue;
357
358     /*
359      * For an explanation on these fields please refer to
360      * <https://github.com/memcached/memcached/blob/master/doc/protocol.txt>
361      */
362
363     /*
364      * CPU time consumed by the memcached process
365      */
366     if (FIELD_IS("rusage_user")) {
367       /* Convert to useconds */
368       rusage_user = atof(fields[2]) * 1000000;
369     } else if (FIELD_IS("rusage_system")) {
370       rusage_syst = atof(fields[2]) * 1000000;
371     }
372
373     /*
374      * Number of threads of this instance
375      */
376     else if (FIELD_IS("threads")) {
377       submit_gauge2("ps_count", NULL, NAN, atof(fields[2]), st);
378     }
379
380     /*
381      * Number of items stored
382      */
383     else if (FIELD_IS("curr_items")) {
384       submit_gauge("memcached_items", "current", atof(fields[2]), st);
385     }
386
387     /*
388      * Number of bytes used and available (total - used)
389      */
390     else if (FIELD_IS("bytes")) {
391       bytes_used = atof(fields[2]);
392     } else if (FIELD_IS("limit_maxbytes")) {
393       bytes_total = atof(fields[2]);
394     }
395
396     /*
397      * Connections
398      */
399     else if (FIELD_IS("curr_connections")) {
400       submit_gauge("memcached_connections", "current", atof(fields[2]), st);
401     } else if (FIELD_IS("listen_disabled_num")) {
402       submit_derive("connections", "listen_disabled", atof(fields[2]), st);
403     }
404
405     /*
406      * Commands
407      */
408     else if ((name_len > 4) && (strncmp(fields[1], "cmd_", 4) == 0)) {
409       const char *name = fields[1] + 4;
410       submit_derive("memcached_command", name, atoll(fields[2]), st);
411       if (strcmp(name, "get") == 0)
412         gets = atof(fields[2]);
413     }
414
415     /*
416      * Increment/Decrement
417      */
418     else if (FIELD_IS("incr_misses")) {
419       derive_t incr_count = atoll(fields[2]);
420       submit_derive("memcached_ops", "incr_misses", incr_count, st);
421       incr += incr_count;
422     } else if (FIELD_IS("incr_hits")) {
423       derive_t incr_count = atoll(fields[2]);
424       submit_derive("memcached_ops", "incr_hits", incr_count, st);
425       incr_hits = atof(fields[2]);
426       incr += incr_count;
427     } else if (FIELD_IS("decr_misses")) {
428       derive_t decr_count = atoll(fields[2]);
429       submit_derive("memcached_ops", "decr_misses", decr_count, st);
430       decr += decr_count;
431     } else if (FIELD_IS("decr_hits")) {
432       derive_t decr_count = atoll(fields[2]);
433       submit_derive("memcached_ops", "decr_hits", decr_count, st);
434       decr_hits = atof(fields[2]);
435       decr += decr_count;
436     }
437
438     /*
439      * Operations on the cache, i. e. cache hits, cache misses and evictions of
440      * items
441      */
442     else if (FIELD_IS("get_hits")) {
443       submit_derive("memcached_ops", "hits", atoll(fields[2]), st);
444       hits = atof(fields[2]);
445     } else if (FIELD_IS("get_misses")) {
446       submit_derive("memcached_ops", "misses", atoll(fields[2]), st);
447     } else if (FIELD_IS("evictions")) {
448       submit_derive("memcached_ops", "evictions", atoll(fields[2]), st);
449     }
450
451     /*
452      * Network traffic
453      */
454     else if (FIELD_IS("bytes_read")) {
455       octets_rx = atoll(fields[2]);
456     } else if (FIELD_IS("bytes_written")) {
457       octets_tx = atoll(fields[2]);
458     }
459   } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
460
461   if (!isnan(bytes_used) && !isnan(bytes_total) && (bytes_used <= bytes_total))
462     submit_gauge2("df", "cache", bytes_used, bytes_total - bytes_used, st);
463
464   if ((rusage_user != 0) || (rusage_syst != 0))
465     submit_derive2("ps_cputime", NULL, rusage_user, rusage_syst, st);
466
467   if ((octets_rx != 0) || (octets_tx != 0))
468     submit_derive2("memcached_octets", NULL, octets_rx, octets_tx, st);
469
470   if (!isnan(gets) && !isnan(hits)) {
471     gauge_t rate = NAN;
472
473     if (gets != 0.0)
474       rate = 100.0 * hits / gets;
475
476     submit_gauge("percent", "hitratio", rate, st);
477   }
478
479   if (!isnan(incr_hits) && incr != 0) {
480     gauge_t incr_rate = 100.0 * incr_hits / incr;
481     submit_gauge("percent", "incr_hitratio", incr_rate, st);
482     submit_derive("memcached_ops", "incr", incr, st);
483   }
484
485   if (!isnan(decr_hits) && decr != 0) {
486     gauge_t decr_rate = 100.0 * decr_hits / decr;
487     submit_gauge("percent", "decr_hitratio", decr_rate, st);
488     submit_derive("memcached_ops", "decr", decr, st);
489   }
490
491   return 0;
492 } /* int memcached_read */
493
494 static int memcached_add_read_callback(memcached_t *st) {
495   char callback_name[3 * DATA_MAX_NAME_LEN];
496   int status;
497
498   assert(st->name != NULL);
499   ssnprintf(callback_name, sizeof(callback_name), "memcached/%s", st->name);
500
501   user_data_t ud = {.data = st, .free_func = memcached_free};
502
503   status = plugin_register_complex_read(/* group = */ "memcached",
504                                         /* name      = */ callback_name,
505                                         /* callback  = */ memcached_read,
506                                         /* interval  = */ 0,
507                                         /* user_data = */ &ud);
508   return (status);
509 } /* int memcached_add_read_callback */
510
511 /* Configuration handling functiions
512  * <Plugin memcached>
513  *   <Instance "instance_name">
514  *     Host foo.zomg.com
515  *     Port "1234"
516  *   </Instance>
517  * </Plugin>
518  */
519 static int config_add_instance(oconfig_item_t *ci) {
520   memcached_t *st;
521   int status = 0;
522
523   /* Disable automatic generation of default instance in the init callback. */
524   memcached_have_instances = 1;
525
526   st = calloc(1, sizeof(*st));
527   if (st == NULL) {
528     ERROR("memcached plugin: calloc failed.");
529     return (-1);
530   }
531
532   st->name = NULL;
533   st->socket = NULL;
534   st->host = NULL;
535   st->port = NULL;
536
537   if (strcasecmp(ci->key, "Plugin") == 0) /* default instance */
538     st->name = sstrdup("__legacy__");
539   else /* <Instance /> block */
540     status = cf_util_get_string(ci, &st->name);
541   if (status != 0) {
542     sfree(st);
543     return (status);
544   }
545   assert(st->name != NULL);
546
547   for (int i = 0; i < ci->children_num; i++) {
548     oconfig_item_t *child = ci->children + i;
549
550     if (strcasecmp("Socket", child->key) == 0)
551       status = cf_util_get_string(child, &st->socket);
552     else if (strcasecmp("Host", child->key) == 0)
553       status = cf_util_get_string(child, &st->host);
554     else if (strcasecmp("Port", child->key) == 0)
555       status = cf_util_get_service(child, &st->port);
556     else {
557       WARNING("memcached plugin: Option `%s' not allowed here.", child->key);
558       status = -1;
559     }
560
561     if (status != 0)
562       break;
563   }
564
565   if (status == 0)
566     status = memcached_add_read_callback(st);
567
568   if (status != 0) {
569     memcached_free(st);
570     return (-1);
571   }
572
573   return (0);
574 }
575
576 static int memcached_config(oconfig_item_t *ci) {
577   int status = 0;
578   _Bool have_instance_block = 0;
579
580   for (int i = 0; i < ci->children_num; i++) {
581     oconfig_item_t *child = ci->children + i;
582
583     if (strcasecmp("Instance", child->key) == 0) {
584       config_add_instance(child);
585       have_instance_block = 1;
586     } else if (!have_instance_block) {
587       /* Non-instance option: Assume legacy configuration (without <Instance />
588        * blocks) and call config_add_instance() with the <Plugin /> block. */
589       return (config_add_instance(ci));
590     } else
591       WARNING("memcached plugin: The configuration option "
592               "\"%s\" is not allowed here. Did you "
593               "forget to add an <Instance /> block "
594               "around the configuration?",
595               child->key);
596   } /* for (ci->children) */
597
598   return (status);
599 }
600
601 static int memcached_init(void) {
602   memcached_t *st;
603   int status;
604
605   if (memcached_have_instances)
606     return (0);
607
608   /* No instances were configured, lets start a default instance. */
609   st = calloc(1, sizeof(*st));
610   if (st == NULL)
611     return (ENOMEM);
612   st->name = sstrdup("__legacy__");
613   st->socket = NULL;
614   st->host = NULL;
615   st->port = NULL;
616
617   status = memcached_add_read_callback(st);
618   if (status == 0)
619     memcached_have_instances = 1;
620   else
621     memcached_free(st);
622
623   return (status);
624 } /* int memcached_init */
625
626 void module_register(void) {
627   plugin_register_complex_config("memcached", memcached_config);
628   plugin_register_init("memcached", memcached_init);
629 }