memcached: Added Address option
[collectd.git] / src / memcached.c
1 /**
2  * collectd - src/memcached.c, based on src/hddtemp.c
3  * Copyright (C) 2007       Antony Dovgal
4  * Copyright (C) 2007-2012  Florian Forster
5  * Copyright (C) 2009       Doug MacEachern
6  * Copyright (C) 2009       Franck Lombardi
7  * Copyright (C) 2012       Nicolas Szalay
8  *
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; either version 2 of the License, or (at your
12  * option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write to the Free Software Foundation, Inc.,
21  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
22  *
23  * Authors:
24  *   Antony Dovgal <tony at daylessday dot org>
25  *   Florian octo Forster <octo at collectd.org>
26  *   Doug MacEachern <dougm at hyperic.com>
27  *   Franck Lombardi
28  *   Nicolas Szalay
29  **/
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "plugin.h"
35
36 #include <netdb.h>
37 #include <sys/un.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40
41 #define MEMCACHED_DEF_HOST "127.0.0.1"
42 #define MEMCACHED_DEF_PORT "11211"
43
44 struct memcached_s
45 {
46   char *name;
47   char *host;
48   char *socket;
49   char *connhost;
50   char *connport;
51 };
52 typedef struct memcached_s memcached_t;
53
54 static _Bool memcached_have_instances = 0;
55
56 static void memcached_free (void *arg)
57 {
58   memcached_t *st = arg;
59   if (st == NULL)
60     return;
61
62   sfree (st->name);
63   sfree (st->host);
64   sfree (st->socket);
65   sfree (st->connhost);
66   sfree (st->connport);
67   sfree (st);
68 }
69
70 static int memcached_connect_unix (memcached_t *st)
71 {
72   struct sockaddr_un serv_addr = { 0 };
73   int fd;
74
75   serv_addr.sun_family = AF_UNIX;
76   sstrncpy (serv_addr.sun_path, st->socket,
77       sizeof (serv_addr.sun_path));
78
79   /* create our socket descriptor */
80   fd = socket (AF_UNIX, SOCK_STREAM, 0);
81   if (fd < 0)
82   {
83     char errbuf[1024];
84     ERROR ("memcached plugin: memcached_connect_unix: socket(2) failed: %s",
85         sstrerror (errno, errbuf, sizeof (errbuf)));
86     return (-1);
87   }
88
89   /* connect to the memcached daemon */
90   int status = connect (fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
91   if (status != 0)
92   {
93       shutdown (fd, SHUT_RDWR);
94       close (fd);
95       fd = -1;
96   }
97
98   return (fd);
99 } /* int memcached_connect_unix */
100
101 static int memcached_connect_inet (memcached_t *st)
102 {
103   struct addrinfo *ai_list;
104   int status;
105   int fd = -1;
106
107   struct addrinfo ai_hints = {
108     .ai_family = AF_UNSPEC,
109     .ai_flags = AI_ADDRCONFIG,
110     .ai_socktype = SOCK_STREAM
111   };
112
113   status = getaddrinfo (st->connhost, st->connport, &ai_hints, &ai_list);
114   if (status != 0)
115   {
116     char errbuf[1024];
117     ERROR ("memcached plugin: memcached_connect_inet: "
118         "getaddrinfo(%s,%s) failed: %s",
119         st->connhost, st->connport,
120         (status == EAI_SYSTEM)
121         ? sstrerror (errno, errbuf, sizeof (errbuf))
122         : gai_strerror (status));
123     return (-1);
124   }
125
126   for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
127   {
128     /* create our socket descriptor */
129     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
130     if (fd < 0)
131     {
132       char errbuf[1024];
133       WARNING ("memcached plugin: memcached_connect_inet: "
134           "socket(2) failed: %s",
135           sstrerror (errno, errbuf, sizeof (errbuf)));
136       continue;
137     }
138
139     /* connect to the memcached daemon */
140     status = (int) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
141     if (status != 0)
142     {
143       shutdown (fd, SHUT_RDWR);
144       close (fd);
145       fd = -1;
146       continue;
147     }
148
149     /* A socket could be opened and connecting succeeded. We're done. */
150     break;
151   }
152
153   freeaddrinfo (ai_list);
154   return (fd);
155 } /* int memcached_connect_inet */
156
157 static int memcached_connect (memcached_t *st)
158 {
159   if (st->socket != NULL)
160     return (memcached_connect_unix (st));
161   else
162     return (memcached_connect_inet (st));
163 }
164
165 static int memcached_query_daemon (char *buffer, size_t buffer_size, memcached_t *st)
166 {
167   int fd, status;
168   size_t buffer_fill;
169
170   fd = memcached_connect (st);
171   if (fd < 0) {
172     ERROR ("memcached plugin: Instance \"%s\" could not connect to daemon.",
173         st->name);
174     return -1;
175   }
176
177   status = (int) swrite (fd, "stats\r\n", strlen ("stats\r\n"));
178   if (status != 0)
179   {
180     char errbuf[1024];
181     ERROR ("memcached plugin: write(2) failed: %s",
182         sstrerror (errno, errbuf, sizeof (errbuf)));
183     shutdown(fd, SHUT_RDWR);
184     close (fd);
185     return (-1);
186   }
187
188   /* receive data from the memcached daemon */
189   memset (buffer, 0, buffer_size);
190
191   buffer_fill = 0;
192   while ((status = (int) recv (fd, buffer + buffer_fill,
193           buffer_size - buffer_fill, /* flags = */ 0)) != 0)
194   {
195     char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
196     if (status < 0)
197     {
198       char errbuf[1024];
199
200       if ((errno == EAGAIN) || (errno == EINTR))
201           continue;
202
203       ERROR ("memcached: Error reading from socket: %s",
204           sstrerror (errno, errbuf, sizeof (errbuf)));
205       shutdown(fd, SHUT_RDWR);
206       close (fd);
207       return (-1);
208     }
209
210     buffer_fill += (size_t) status;
211     if (buffer_fill > buffer_size)
212     {
213       buffer_fill = buffer_size;
214       WARNING ("memcached plugin: Message was truncated.");
215       break;
216     }
217
218     /* If buffer ends in end_token, we have all the data. */
219     if (memcmp (buffer + buffer_fill - sizeof (end_token),
220           end_token, sizeof (end_token)) == 0)
221       break;
222   } /* while (recv) */
223
224   status = 0;
225   if (buffer_fill == 0)
226   {
227     WARNING ("memcached plugin: No data returned by memcached.");
228     status = -1;
229   }
230
231   shutdown(fd, SHUT_RDWR);
232   close(fd);
233   return (status);
234 } /* int memcached_query_daemon */
235
236 static void memcached_init_vl (value_list_t *vl, memcached_t const *st)
237 {
238   sstrncpy (vl->plugin, "memcached", sizeof (vl->plugin));
239   if (st->host != NULL)
240     sstrncpy (vl->host, st->host, sizeof (vl->host));
241   if (st->name != NULL)
242     sstrncpy (vl->plugin_instance, st->name, sizeof (vl->plugin_instance));
243 }
244
245 static void submit_derive (const char *type, const char *type_inst,
246     derive_t value, memcached_t *st)
247 {
248   value_list_t vl = VALUE_LIST_INIT;
249
250   memcached_init_vl (&vl, st);
251   vl.values = &(value_t) { .derive = value };
252   vl.values_len = 1;
253   sstrncpy (vl.type, type, sizeof (vl.type));
254   if (type_inst != NULL)
255     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
256
257   plugin_dispatch_values (&vl);
258 }
259
260 static void submit_derive2 (const char *type, const char *type_inst,
261     derive_t value0, derive_t value1, memcached_t *st)
262 {
263   value_list_t vl = VALUE_LIST_INIT;
264   value_t values[] = {
265     { .derive = value0 },
266     { .derive = value1 },
267   };
268
269   memcached_init_vl (&vl, st);
270   vl.values = values;
271   vl.values_len = STATIC_ARRAY_SIZE (values);
272   sstrncpy (vl.type, type, sizeof (vl.type));
273   if (type_inst != NULL)
274     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
275
276   plugin_dispatch_values (&vl);
277 }
278
279 static void submit_gauge (const char *type, const char *type_inst,
280     gauge_t value, memcached_t *st)
281 {
282   value_list_t vl = VALUE_LIST_INIT;
283
284   memcached_init_vl (&vl, st);
285   vl.values = &(value_t) { .gauge = value };
286   vl.values_len = 1;
287   sstrncpy (vl.type, type, sizeof (vl.type));
288   if (type_inst != NULL)
289     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
290
291   plugin_dispatch_values (&vl);
292 }
293
294 static void submit_gauge2 (const char *type, const char *type_inst,
295     gauge_t value0, gauge_t value1, memcached_t *st)
296 {
297   value_list_t vl = VALUE_LIST_INIT;
298   value_t values[] = {
299     { .gauge = value0 },
300     { .gauge = value1 },
301   };
302
303   memcached_init_vl (&vl, st);
304   vl.values = values;
305   vl.values_len = STATIC_ARRAY_SIZE (values);
306   sstrncpy (vl.type, type, sizeof (vl.type));
307   if (type_inst != NULL)
308     sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
309
310   plugin_dispatch_values (&vl);
311 }
312
313 static int memcached_read (user_data_t *user_data)
314 {
315   char buf[4096];
316   char *fields[3];
317   char *ptr;
318   char *line;
319   char *saveptr;
320   int fields_num;
321
322   gauge_t bytes_used = NAN;
323   gauge_t bytes_total = NAN;
324   gauge_t hits = NAN;
325   gauge_t gets = NAN;
326   gauge_t incr_hits = NAN;
327   derive_t incr = 0;
328   gauge_t decr_hits = NAN;
329   derive_t decr = 0;
330   derive_t rusage_user = 0;
331   derive_t rusage_syst = 0;
332   derive_t octets_rx = 0;
333   derive_t octets_tx = 0;
334
335   memcached_t *st;
336   st = user_data->data;
337
338   /* get data from daemon */
339   if (memcached_query_daemon (buf, sizeof (buf), st) < 0) {
340     return -1;
341   }
342
343 #define FIELD_IS(cnst) \
344   (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
345
346   ptr = buf;
347   saveptr = NULL;
348   while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
349   {
350     int name_len;
351
352     ptr = NULL;
353
354     fields_num = strsplit(line, fields, 3);
355     if (fields_num != 3)
356       continue;
357
358     name_len = strlen(fields[1]);
359     if (name_len == 0)
360       continue;
361
362     /*
363      * For an explanation on these fields please refer to
364      * <https://github.com/memcached/memcached/blob/master/doc/protocol.txt>
365      */
366
367     /*
368      * CPU time consumed by the memcached process
369      */
370     if (FIELD_IS ("rusage_user"))
371     {
372       rusage_user = atoll (fields[2]);
373     }
374     else if (FIELD_IS ("rusage_system"))
375     {
376       rusage_syst = atoll(fields[2]);
377     }
378
379     /*
380      * Number of threads of this instance
381      */
382     else if (FIELD_IS ("threads"))
383     {
384       submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]), st);
385     }
386
387     /*
388      * Number of items stored
389      */
390     else if (FIELD_IS ("curr_items"))
391     {
392       submit_gauge ("memcached_items", "current", atof (fields[2]), st);
393     }
394
395     /*
396      * Number of bytes used and available (total - used)
397      */
398     else if (FIELD_IS ("bytes"))
399     {
400       bytes_used = atof (fields[2]);
401     }
402     else if (FIELD_IS ("limit_maxbytes"))
403     {
404       bytes_total = atof(fields[2]);
405     }
406
407     /*
408      * Connections
409      */
410     else if (FIELD_IS ("curr_connections"))
411     {
412       submit_gauge ("memcached_connections", "current", atof (fields[2]), st);
413     }
414     else if (FIELD_IS ("listen_disabled_num"))
415     {
416       submit_derive ("connections", "listen_disabled", atof (fields[2]), st);
417     }
418
419     /*
420      * Commands
421      */
422     else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
423     {
424       const char *name = fields[1] + 4;
425       submit_derive ("memcached_command", name, atoll (fields[2]), st);
426       if (strcmp (name, "get") == 0)
427         gets = atof (fields[2]);
428     }
429
430     /*
431      * Increment/Decrement
432      */
433     else if (FIELD_IS("incr_misses"))
434     {
435       derive_t incr_count = atoll (fields[2]);
436       submit_derive ("memcached_ops", "incr_misses", incr_count, st);
437       incr += incr_count;
438     }
439     else if (FIELD_IS ("incr_hits"))
440     {
441       derive_t incr_count = atoll (fields[2]);
442       submit_derive ("memcached_ops", "incr_hits", incr_count, st);
443       incr_hits = atof (fields[2]);
444       incr += incr_count;
445     }
446     else if (FIELD_IS ("decr_misses"))
447     {
448       derive_t decr_count = atoll (fields[2]);
449       submit_derive ("memcached_ops", "decr_misses", decr_count, st);
450       decr += decr_count;
451     }
452     else if (FIELD_IS ("decr_hits"))
453     {
454       derive_t decr_count = atoll (fields[2]);
455       submit_derive ("memcached_ops", "decr_hits", decr_count, st);
456       decr_hits = atof (fields[2]);
457       decr += decr_count;
458     }
459
460     /*
461      * Operations on the cache, i. e. cache hits, cache misses and evictions of items
462      */
463     else if (FIELD_IS ("get_hits"))
464     {
465       submit_derive ("memcached_ops", "hits", atoll (fields[2]), st);
466       hits = atof (fields[2]);
467     }
468     else if (FIELD_IS ("get_misses"))
469     {
470       submit_derive ("memcached_ops", "misses", atoll (fields[2]), st);
471     }
472     else if (FIELD_IS ("evictions"))
473     {
474       submit_derive ("memcached_ops", "evictions", atoll (fields[2]), st);
475     }
476
477     /*
478      * Network traffic
479      */
480     else if (FIELD_IS ("bytes_read"))
481     {
482       octets_rx = atoll (fields[2]);
483     }
484     else if (FIELD_IS ("bytes_written"))
485     {
486       octets_tx = atoll (fields[2]);
487     }
488   } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
489
490   if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
491     submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used, st);
492
493   if ((rusage_user != 0) || (rusage_syst != 0))
494     submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst, st);
495
496   if ((octets_rx != 0) || (octets_tx != 0))
497     submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx, st);
498
499   if (!isnan (gets) && !isnan (hits))
500   {
501     gauge_t rate = NAN;
502
503     if (gets != 0.0)
504       rate = 100.0 * hits / gets;
505
506     submit_gauge ("percent", "hitratio", rate, st);
507   }
508
509   if (!isnan (incr_hits) && incr != 0)
510   {
511     gauge_t incr_rate = 100.0 * incr_hits / incr;
512     submit_gauge ("percent", "incr_hitratio", incr_rate, st);
513     submit_derive ("memcached_ops", "incr", incr, st);
514   }
515
516   if (!isnan (decr_hits) && decr != 0)
517   {
518     gauge_t decr_rate = 100.0 * decr_hits / decr;
519     submit_gauge ("percent", "decr_hitratio", decr_rate, st);
520     submit_derive ("memcached_ops", "decr", decr, st);
521   }
522
523   return 0;
524 } /* int memcached_read */
525
526 static int memcached_add_read_callback (memcached_t *st)
527 {
528   char callback_name[3*DATA_MAX_NAME_LEN];
529   int status;
530
531   ssnprintf (callback_name, sizeof (callback_name), "memcached/%s",
532       (st->name != NULL) ? st->name : "__legacy__");
533
534   /* If no <Address> used then:
535    * - connect to destination, specified by <Host>, if it presents.
536    * - Keep default hostname, if any:
537    *    - Legacy mode is used (no configuration options at all);
538    *    - "Host" option is not provided;
539    *    - "Host" option is set to "localhost" or "127.0.0.1".
540    *
541    * If <Address> used then host may be set to "localhost"
542    * or "127.0.0.1" explicitly.
543    */
544   if (st->connhost == NULL)
545   {
546     if (st->host)
547     {
548       st->connhost = strdup(st->host);
549       if (st->connhost == NULL)
550         return (ENOMEM);
551
552       if ((strcmp ("127.0.0.1", st->host) == 0)
553           || (strcmp ("localhost", st->host) == 0))
554         sfree(st->host);
555     }
556     else
557     {
558       st->connhost = strdup(MEMCACHED_DEF_HOST);
559       if (st->connhost == NULL)
560         return (ENOMEM);
561     }
562   }
563
564   if (st->connport == NULL)
565   {
566       st->connport = strdup(MEMCACHED_DEF_PORT);
567       if (st->connport == NULL)
568         return (ENOMEM);
569   }
570
571   assert (st->connhost != NULL);
572   assert (st->connport != NULL);
573
574   status = plugin_register_complex_read (/* group = */ "memcached",
575       /* name      = */ callback_name,
576       /* callback  = */ memcached_read,
577       /* interval  = */ 0,
578       &(user_data_t) {
579         .data = st,
580         .free_func = memcached_free,
581       });
582
583   return (status);
584 } /* int memcached_add_read_callback */
585
586 /* Configuration handling functiions
587  * <Plugin memcached>
588  *   <Instance "instance_name">
589  *     Host foo.zomg.com
590  *     Address 1.2.3.4
591  *     Port "1234"
592  *   </Instance>
593  * </Plugin>
594  */
595 static int config_add_instance(oconfig_item_t *ci)
596 {
597   memcached_t *st;
598   int status = 0;
599
600   /* Disable automatic generation of default instance in the init callback. */
601   memcached_have_instances = 1;
602
603   st = calloc (1, sizeof (*st));
604   if (st == NULL)
605   {
606     ERROR ("memcached plugin: calloc failed.");
607     return (ENOMEM);
608   }
609
610   st->name = NULL;
611   st->host = NULL;
612   st->socket = NULL;
613   st->connhost = NULL;
614   st->connport = NULL;
615
616   if (strcasecmp (ci->key, "Instance") == 0)
617     status = cf_util_get_string (ci, &st->name);
618
619   if (status != 0)
620   {
621     sfree (st);
622     return (status);
623   }
624
625   for (int i = 0; i < ci->children_num; i++)
626   {
627     oconfig_item_t *child = ci->children + i;
628
629     if (strcasecmp ("Socket", child->key) == 0)
630       status = cf_util_get_string (child, &st->socket);
631     else if (strcasecmp ("Host", child->key) == 0)
632       status = cf_util_get_string (child, &st->host);
633     else if (strcasecmp ("Address", child->key) == 0)
634       status = cf_util_get_string (child, &st->connhost);
635     else if (strcasecmp ("Port", child->key) == 0)
636       status = cf_util_get_service (child, &st->connport);
637     else
638     {
639       WARNING ("memcached plugin: Option `%s' not allowed here.",
640           child->key);
641       status = -1;
642     }
643
644     if (status != 0)
645       break;
646   }
647
648   if (status == 0)
649     status = memcached_add_read_callback (st);
650
651   if (status != 0)
652   {
653     memcached_free(st);
654     return (-1);
655   }
656
657   return (0);
658 }
659
660 static int memcached_config (oconfig_item_t *ci)
661 {
662   int status = 0;
663   _Bool have_instance_block = 0;
664
665   for (int i = 0; i < ci->children_num; i++)
666   {
667     oconfig_item_t *child = ci->children + i;
668
669     if (strcasecmp ("Instance", child->key) == 0)
670     {
671       config_add_instance (child);
672       have_instance_block = 1;
673     }
674     else if (!have_instance_block)
675     {
676       /* Non-instance option: Assume legacy configuration (without <Instance />
677        * blocks) and call config_add_instance() with the <Plugin /> block. */
678       return (config_add_instance (ci));
679     }
680     else
681       WARNING ("memcached plugin: The configuration option "
682           "\"%s\" is not allowed here. Did you "
683           "forget to add an <Instance /> block "
684           "around the configuration?",
685           child->key);
686   } /* for (ci->children) */
687
688   return (status);
689 }
690
691 static int memcached_init (void)
692 {
693   memcached_t *st;
694   int status;
695
696   if (memcached_have_instances)
697     return (0);
698
699   /* No instances were configured, lets start a default instance. */
700   st = calloc (1, sizeof (*st));
701   if (st == NULL)
702     return (ENOMEM);
703   st->name = NULL;
704   st->host = NULL;
705   st->socket = NULL;
706   st->connhost = NULL;
707   st->connport = NULL;
708
709   status = memcached_add_read_callback (st);
710   if (status == 0)
711     memcached_have_instances = 1;
712   else
713     memcached_free (st);
714
715   return (status);
716 } /* int memcached_init */
717
718 void module_register (void)
719 {
720   plugin_register_complex_config ("memcached", memcached_config);
721   plugin_register_init ("memcached", memcached_init);
722 }