665e2c87abedb4f1797d6efaa4fa8a2ba24b0259
[collectd.git] / src / rrdcached.c
1 /**
2  * collectd - src/rrdcached.c
3  * Copyright (C) 2008-2013  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/common/common.h"
31 #include "utils/rrdcreate/rrdcreate.h"
32
33 #undef HAVE_CONFIG_H
34 #include <rrd.h>
35 #include <rrd_client.h>
36
37 /*
38  * Private variables
39  */
40 static char *datadir;
41 static char *daemon_address;
42 static bool config_create_files = true;
43 static bool config_collect_stats = true;
44 static rrdcreate_config_t rrdcreate_config = {.stepsize = 0,
45                                               .heartbeat = 0,
46                                               .rrarows = 1200,
47                                               .xff = 0.1,
48                                               .timespans = NULL,
49                                               .timespans_num = 0,
50                                               .consolidation_functions = NULL,
51                                               .consolidation_functions_num = 0,
52                                               .async = 0};
53
54 /*
55  * Prototypes.
56  */
57 static int rc_write(const data_set_t *ds, const value_list_t *vl,
58                     __attribute__((unused)) user_data_t *ud);
59 static int rc_flush(__attribute__((unused)) cdtime_t timeout,
60                     const char *identifier,
61                     __attribute__((unused)) user_data_t *ud);
62
63 static int value_list_to_string(char *buffer, int buffer_len,
64                                 const data_set_t *ds, const value_list_t *vl) {
65   int offset;
66   int status;
67   time_t t;
68
69   assert(0 == strcmp(ds->type, vl->type));
70
71   memset(buffer, '\0', buffer_len);
72
73   t = CDTIME_T_TO_TIME_T(vl->time);
74   status = snprintf(buffer, buffer_len, "%lu", (unsigned long)t);
75   if ((status < 1) || (status >= buffer_len))
76     return -1;
77   offset = status;
78
79   for (size_t i = 0; i < ds->ds_num; i++) {
80     if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
81         (ds->ds[i].type != DS_TYPE_GAUGE) &&
82         (ds->ds[i].type != DS_TYPE_DERIVE) &&
83         (ds->ds[i].type != DS_TYPE_ABSOLUTE))
84       return -1;
85
86     if (ds->ds[i].type == DS_TYPE_COUNTER) {
87       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
88                         (uint64_t)vl->values[i].counter);
89     } else if (ds->ds[i].type == DS_TYPE_GAUGE) {
90       status = snprintf(buffer + offset, buffer_len - offset, ":%f",
91                         vl->values[i].gauge);
92     } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
93       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
94                         vl->values[i].derive);
95     } else /* if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */ {
96       status = snprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
97                         vl->values[i].absolute);
98     }
99
100     if ((status < 1) || (status >= (buffer_len - offset)))
101       return -1;
102
103     offset += status;
104   } /* for ds->ds_num */
105
106   return 0;
107 } /* int value_list_to_string */
108
109 static int value_list_to_filename(char *buffer, size_t buffer_size,
110                                   value_list_t const *vl) {
111   char const suffix[] = ".rrd";
112   int status;
113   size_t len;
114
115   if (datadir != NULL) {
116     size_t datadir_len = strlen(datadir) + 1;
117
118     if (datadir_len >= buffer_size)
119       return ENOMEM;
120
121     sstrncpy(buffer, datadir, buffer_size);
122     buffer[datadir_len - 1] = '/';
123     buffer[datadir_len] = 0;
124
125     buffer += datadir_len;
126     buffer_size -= datadir_len;
127   }
128
129   status = FORMAT_VL(buffer, buffer_size, vl);
130   if (status != 0)
131     return status;
132
133   len = strlen(buffer);
134   assert(len < buffer_size);
135   buffer += len;
136   buffer_size -= len;
137
138   if (buffer_size <= sizeof(suffix))
139     return ENOMEM;
140
141   memcpy(buffer, suffix, sizeof(suffix));
142   return 0;
143 } /* int value_list_to_filename */
144
145 static int rc_config_get_int_positive(oconfig_item_t const *ci, int *ret) {
146   int status;
147   int tmp = 0;
148
149   status = cf_util_get_int(ci, &tmp);
150   if (status != 0)
151     return status;
152   if (tmp < 0)
153     return EINVAL;
154
155   *ret = tmp;
156   return 0;
157 } /* int rc_config_get_int_positive */
158
159 static int rc_config_get_xff(oconfig_item_t const *ci, double *ret) {
160   double value;
161
162   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) {
163     ERROR("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
164           "in the range [0.0, 1.0)",
165           ci->key);
166     return EINVAL;
167   }
168
169   value = ci->values[0].value.number;
170   if ((value >= 0.0) && (value < 1.0)) {
171     *ret = value;
172     return 0;
173   }
174
175   ERROR("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
176         "in the range [0.0, 1.0)",
177         ci->key);
178   return EINVAL;
179 } /* int rc_config_get_xff */
180
181 static int rc_config_add_timespan(int timespan) {
182   int *tmp;
183
184   if (timespan <= 0)
185     return EINVAL;
186
187   tmp = realloc(rrdcreate_config.timespans,
188                 sizeof(*rrdcreate_config.timespans) *
189                     (rrdcreate_config.timespans_num + 1));
190   if (tmp == NULL)
191     return ENOMEM;
192   rrdcreate_config.timespans = tmp;
193
194   rrdcreate_config.timespans[rrdcreate_config.timespans_num] = timespan;
195   rrdcreate_config.timespans_num++;
196
197   return 0;
198 } /* int rc_config_add_timespan */
199
200 static int rc_config(oconfig_item_t *ci) {
201   for (int i = 0; i < ci->children_num; i++) {
202     oconfig_item_t const *child = ci->children + i;
203     const char *key = child->key;
204     int status = 0;
205
206     if (strcasecmp("DataDir", key) == 0) {
207       status = cf_util_get_string(child, &datadir);
208       if (status == 0) {
209         int len = strlen(datadir);
210
211         while ((len > 0) && (datadir[len - 1] == '/')) {
212           len--;
213           datadir[len] = 0;
214         }
215
216         if (len <= 0)
217           sfree(datadir);
218       }
219     } else if (strcasecmp("DaemonAddress", key) == 0)
220       status = cf_util_get_string(child, &daemon_address);
221     else if (strcasecmp("CreateFiles", key) == 0)
222       status = cf_util_get_boolean(child, &config_create_files);
223     else if (strcasecmp("CreateFilesAsync", key) == 0)
224       status = cf_util_get_boolean(child, &rrdcreate_config.async);
225     else if (strcasecmp("CollectStatistics", key) == 0)
226       status = cf_util_get_boolean(child, &config_collect_stats);
227     else if (strcasecmp("StepSize", key) == 0) {
228       int tmp = -1;
229
230       status = rc_config_get_int_positive(child, &tmp);
231       if (status == 0)
232         rrdcreate_config.stepsize = (unsigned long)tmp;
233     } else if (strcasecmp("HeartBeat", key) == 0)
234       status = rc_config_get_int_positive(child, &rrdcreate_config.heartbeat);
235     else if (strcasecmp("RRARows", key) == 0)
236       status = rc_config_get_int_positive(child, &rrdcreate_config.rrarows);
237     else if (strcasecmp("RRATimespan", key) == 0) {
238       int tmp = -1;
239       status = rc_config_get_int_positive(child, &tmp);
240       if (status == 0)
241         status = rc_config_add_timespan(tmp);
242     } else if (strcasecmp("XFF", key) == 0)
243       status = rc_config_get_xff(child, &rrdcreate_config.xff);
244     else {
245       WARNING("rrdcached plugin: Ignoring invalid option %s.", key);
246       continue;
247     }
248
249     if (status != 0)
250       WARNING("rrdcached plugin: Handling the \"%s\" option failed.", key);
251   }
252
253   if (daemon_address != NULL) {
254     plugin_register_write("rrdcached", rc_write, /* user_data = */ NULL);
255     plugin_register_flush("rrdcached", rc_flush, /* user_data = */ NULL);
256   }
257   return 0;
258 } /* int rc_config */
259
260 static int try_reconnect(void) {
261   int status;
262
263   rrdc_disconnect();
264
265   rrd_clear_error();
266   status = rrdc_connect(daemon_address);
267   if (status != 0) {
268     ERROR("rrdcached plugin: Failed to reconnect to RRDCacheD "
269           "at %s: %s (status=%d)",
270           daemon_address, rrd_get_error(), status);
271     return -1;
272   }
273
274   INFO("rrdcached plugin: Successfully reconnected to RRDCacheD "
275        "at %s",
276        daemon_address);
277   return 0;
278 } /* int try_reconnect */
279
280 static int rc_read(void) {
281   int status;
282   rrdc_stats_t *head;
283   bool retried = false;
284
285   value_list_t vl = VALUE_LIST_INIT;
286   vl.values = &(value_t){.gauge = NAN};
287   vl.values_len = 1;
288
289   if (daemon_address == NULL)
290     return -1;
291
292   if (!config_collect_stats)
293     return -1;
294
295   if ((strncmp("unix:", daemon_address, strlen("unix:")) != 0) &&
296       (daemon_address[0] != '/'))
297     sstrncpy(vl.host, daemon_address, sizeof(vl.host));
298   sstrncpy(vl.plugin, "rrdcached", sizeof(vl.plugin));
299
300   rrd_clear_error();
301   status = rrdc_connect(daemon_address);
302   if (status != 0) {
303     ERROR("rrdcached plugin: Failed to connect to RRDCacheD "
304           "at %s: %s (status=%d)",
305           daemon_address, rrd_get_error(), status);
306     return -1;
307   }
308
309   while (42) {
310     /* The RRD client lib does not provide any means for checking a
311      * connection, hence we'll have to retry upon failed operations. */
312     head = NULL;
313     rrd_clear_error();
314     status = rrdc_stats_get(&head);
315     if (status == 0)
316       break;
317
318     if (!retried) {
319       retried = true;
320       if (try_reconnect() == 0)
321         continue;
322       /* else: report the error and fail */
323     }
324
325     ERROR("rrdcached plugin: rrdc_stats_get failed: %s (status=%i).",
326           rrd_get_error(), status);
327     return -1;
328   }
329
330   for (rrdc_stats_t *ptr = head; ptr != NULL; ptr = ptr->next) {
331     if (ptr->type == RRDC_STATS_TYPE_GAUGE)
332       vl.values[0].gauge = (gauge_t)ptr->value.gauge;
333     else if (ptr->type == RRDC_STATS_TYPE_COUNTER)
334       vl.values[0].counter = (counter_t)ptr->value.counter;
335     else
336       continue;
337
338     if (strcasecmp("QueueLength", ptr->name) == 0) {
339       sstrncpy(vl.type, "queue_length", sizeof(vl.type));
340       sstrncpy(vl.type_instance, "", sizeof(vl.type_instance));
341     } else if (strcasecmp("UpdatesWritten", ptr->name) == 0) {
342       sstrncpy(vl.type, "operations", sizeof(vl.type));
343       sstrncpy(vl.type_instance, "write-updates", sizeof(vl.type_instance));
344     } else if (strcasecmp("DataSetsWritten", ptr->name) == 0) {
345       sstrncpy(vl.type, "operations", sizeof(vl.type));
346       sstrncpy(vl.type_instance, "write-data_sets", sizeof(vl.type_instance));
347     } else if (strcasecmp("TreeNodesNumber", ptr->name) == 0) {
348       sstrncpy(vl.type, "gauge", sizeof(vl.type));
349       sstrncpy(vl.type_instance, "tree_nodes", sizeof(vl.type_instance));
350     } else if (strcasecmp("TreeDepth", ptr->name) == 0) {
351       sstrncpy(vl.type, "gauge", sizeof(vl.type));
352       sstrncpy(vl.type_instance, "tree_depth", sizeof(vl.type_instance));
353     } else if (strcasecmp("FlushesReceived", ptr->name) == 0) {
354       sstrncpy(vl.type, "operations", sizeof(vl.type));
355       sstrncpy(vl.type_instance, "receive-flush", sizeof(vl.type_instance));
356     } else if (strcasecmp("JournalBytes", ptr->name) == 0) {
357       sstrncpy(vl.type, "counter", sizeof(vl.type));
358       sstrncpy(vl.type_instance, "journal-bytes", sizeof(vl.type_instance));
359     } else if (strcasecmp("JournalRotate", ptr->name) == 0) {
360       sstrncpy(vl.type, "counter", sizeof(vl.type));
361       sstrncpy(vl.type_instance, "journal-rotates", sizeof(vl.type_instance));
362     } else if (strcasecmp("UpdatesReceived", ptr->name) == 0) {
363       sstrncpy(vl.type, "operations", sizeof(vl.type));
364       sstrncpy(vl.type_instance, "receive-update", sizeof(vl.type_instance));
365     } else {
366       DEBUG("rrdcached plugin: rc_read: Unknown statistic `%s'.", ptr->name);
367       continue;
368     }
369
370     plugin_dispatch_values(&vl);
371   } /* for (ptr = head; ptr != NULL; ptr = ptr->next) */
372
373   rrdc_stats_free(head);
374
375   return 0;
376 } /* int rc_read */
377
378 static int rc_init(void) {
379   if (config_collect_stats)
380     plugin_register_read("rrdcached", rc_read);
381
382   return 0;
383 } /* int rc_init */
384
385 static int rc_write(const data_set_t *ds, const value_list_t *vl,
386                     user_data_t __attribute__((unused)) * user_data) {
387   char filename[PATH_MAX];
388   char values[512];
389   char *values_array[2];
390   int status;
391   bool retried = false;
392
393   if (daemon_address == NULL) {
394     ERROR("rrdcached plugin: daemon_address == NULL.");
395     plugin_unregister_write("rrdcached");
396     return -1;
397   }
398
399   if (strcmp(ds->type, vl->type) != 0) {
400     ERROR("rrdcached plugin: DS type does not match value list type");
401     return -1;
402   }
403
404   if (value_list_to_filename(filename, sizeof(filename), vl) != 0) {
405     ERROR("rrdcached plugin: value_list_to_filename failed.");
406     return -1;
407   }
408
409   if (value_list_to_string(values, sizeof(values), ds, vl) != 0) {
410     ERROR("rrdcached plugin: value_list_to_string failed.");
411     return -1;
412   }
413
414   values_array[0] = values;
415   values_array[1] = NULL;
416
417   if (config_create_files) {
418     struct stat statbuf;
419
420     status = stat(filename, &statbuf);
421     if (status != 0) {
422       if (errno != ENOENT) {
423         ERROR("rrdcached plugin: stat (%s) failed: %s", filename, STRERRNO);
424         return -1;
425       }
426
427       status = cu_rrd_create_file(filename, ds, vl, &rrdcreate_config);
428       if (status != 0) {
429         ERROR("rrdcached plugin: cu_rrd_create_file (%s) failed.", filename);
430         return -1;
431       } else if (rrdcreate_config.async)
432         return 0;
433     }
434   }
435
436   rrd_clear_error();
437   status = rrdc_connect(daemon_address);
438   if (status != 0) {
439     ERROR("rrdcached plugin: Failed to connect to RRDCacheD "
440           "at %s: %s (status=%d)",
441           daemon_address, rrd_get_error(), status);
442     return -1;
443   }
444
445   while (42) {
446     /* The RRD client lib does not provide any means for checking a
447      * connection, hence we'll have to retry upon failed operations. */
448     rrd_clear_error();
449     status = rrdc_update(filename, /* values_num = */ 1, (void *)values_array);
450     if (status == 0)
451       break;
452
453     if (!retried) {
454       retried = true;
455       if (try_reconnect() == 0)
456         continue;
457       /* else: report the error and fail */
458     }
459
460     ERROR("rrdcached plugin: rrdc_update (%s, [%s], 1) failed: %s (status=%i)",
461           filename, values_array[0], rrd_get_error(), status);
462     return -1;
463   }
464
465   return 0;
466 } /* int rc_write */
467
468 static int rc_flush(__attribute__((unused)) cdtime_t timeout, /* {{{ */
469                     const char *identifier,
470                     __attribute__((unused)) user_data_t *ud) {
471   char filename[PATH_MAX + 1];
472   int status;
473   bool retried = false;
474
475   if (identifier == NULL)
476     return EINVAL;
477
478   if (datadir != NULL)
479     snprintf(filename, sizeof(filename), "%s/%s.rrd", datadir, identifier);
480   else
481     snprintf(filename, sizeof(filename), "%s.rrd", identifier);
482
483   rrd_clear_error();
484   status = rrdc_connect(daemon_address);
485   if (status != 0) {
486     ERROR("rrdcached plugin: Failed to connect to RRDCacheD "
487           "at %s: %s (status=%d)",
488           daemon_address, rrd_get_error(), status);
489     return -1;
490   }
491
492   while (42) {
493     /* The RRD client lib does not provide any means for checking a
494      * connection, hence we'll have to retry upon failed operations. */
495     rrd_clear_error();
496     status = rrdc_flush(filename);
497     if (status == 0)
498       break;
499
500     if (!retried) {
501       retried = true;
502       if (try_reconnect() == 0)
503         continue;
504       /* else: report the error and fail */
505     }
506
507     ERROR("rrdcached plugin: rrdc_flush (%s) failed: %s (status=%i).", filename,
508           rrd_get_error(), status);
509     return -1;
510   }
511   DEBUG("rrdcached plugin: rrdc_flush (%s): Success.", filename);
512
513   return 0;
514 } /* }}} int rc_flush */
515
516 static int rc_shutdown(void) {
517   rrdc_disconnect();
518   return 0;
519 } /* int rc_shutdown */
520
521 void module_register(void) {
522   plugin_register_complex_config("rrdcached", rc_config);
523   plugin_register_init("rrdcached", rc_init);
524   plugin_register_shutdown("rrdcached", rc_shutdown);
525 } /* void module_register */