treewide: add blank line below collectd.h
[collectd.git] / src / rrdcached.c
1 /**
2  * collectd - src/rrdcached.c
3  * Copyright (C) 2008-2013  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "common.h"
31 #include "utils_rrdcreate.h"
32
33 #undef HAVE_CONFIG_H
34 #include <rrd.h>
35 #include <rrd_client.h>
36
37 /*
38  * Private variables
39  */
40 static char *datadir = NULL;
41 static char *daemon_address = NULL;
42 static _Bool config_create_files = 1;
43 static _Bool config_collect_stats = 1;
44 static rrdcreate_config_t rrdcreate_config =
45 {
46         /* stepsize = */ 0,
47         /* heartbeat = */ 0,
48         /* rrarows = */ 1200,
49         /* xff = */ 0.1,
50
51         /* timespans = */ NULL,
52         /* timespans_num = */ 0,
53
54         /* consolidation_functions = */ NULL,
55         /* consolidation_functions_num = */ 0,
56
57         /* async = */ 0
58 };
59
60 /*
61  * Prototypes.
62  */
63 static int rc_write (const data_set_t *ds, const value_list_t *vl,
64     user_data_t __attribute__((unused)) *user_data);
65 static int rc_flush (__attribute__((unused)) cdtime_t timeout,
66     const char *identifier, __attribute__((unused)) user_data_t *ud);
67
68 static int value_list_to_string (char *buffer, int buffer_len,
69     const data_set_t *ds, const value_list_t *vl)
70 {
71   int offset;
72   int status;
73   size_t i;
74   time_t t;
75
76   assert (0 == strcmp (ds->type, vl->type));
77
78   memset (buffer, '\0', buffer_len);
79
80   t = CDTIME_T_TO_TIME_T (vl->time);
81   status = ssnprintf (buffer, buffer_len, "%lu", (unsigned long) t);
82   if ((status < 1) || (status >= buffer_len))
83     return (-1);
84   offset = status;
85
86   for (i = 0; i < ds->ds_num; i++)
87   {
88     if ((ds->ds[i].type != DS_TYPE_COUNTER)
89         && (ds->ds[i].type != DS_TYPE_GAUGE)
90         && (ds->ds[i].type != DS_TYPE_DERIVE)
91         && (ds->ds[i].type != DS_TYPE_ABSOLUTE))
92       return (-1);
93
94     if (ds->ds[i].type == DS_TYPE_COUNTER)
95     {
96       status = ssnprintf (buffer + offset, buffer_len - offset,
97           ":%llu", vl->values[i].counter);
98     }
99     else if (ds->ds[i].type == DS_TYPE_GAUGE)
100     {
101       status = ssnprintf (buffer + offset, buffer_len - offset,
102           ":%f", vl->values[i].gauge);
103     }
104     else if (ds->ds[i].type == DS_TYPE_DERIVE) {
105       status = ssnprintf (buffer + offset, buffer_len - offset,
106           ":%"PRIi64, vl->values[i].derive);
107     }
108     else /* if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */ {
109       status = ssnprintf (buffer + offset, buffer_len - offset,
110           ":%"PRIu64, vl->values[i].absolute);
111
112     }
113
114     if ((status < 1) || (status >= (buffer_len - offset)))
115       return (-1);
116
117     offset += status;
118   } /* for ds->ds_num */
119
120   return (0);
121 } /* int value_list_to_string */
122
123 static int value_list_to_filename (char *buffer, size_t buffer_size,
124     value_list_t const *vl)
125 {
126   char const suffix[] = ".rrd";
127   int status;
128   size_t len;
129
130   if (datadir != NULL)
131   {
132     size_t datadir_len = strlen (datadir) + 1;
133
134     if (datadir_len >= buffer_size)
135       return (ENOMEM);
136
137     sstrncpy (buffer, datadir, buffer_size);
138     buffer[datadir_len - 1] = '/';
139     buffer[datadir_len] = 0;
140
141     buffer += datadir_len;
142     buffer_size -= datadir_len;
143   }
144
145   status = FORMAT_VL (buffer, buffer_size, vl);
146   if (status != 0)
147     return (status);
148
149   len = strlen (buffer);
150   assert (len < buffer_size);
151   buffer += len;
152   buffer_size -= len;
153
154   if (buffer_size <= sizeof (suffix))
155     return (ENOMEM);
156
157   memcpy (buffer, suffix, sizeof (suffix));
158   return (0);
159 } /* int value_list_to_filename */
160
161 static int rc_config_get_int_positive (oconfig_item_t const *ci, int *ret)
162 {
163   int status;
164   int tmp = 0;
165
166   status = cf_util_get_int (ci, &tmp);
167   if (status != 0)
168     return (status);
169   if (tmp < 0)
170     return (EINVAL);
171
172   *ret = tmp;
173   return (0);
174 } /* int rc_config_get_int_positive */
175
176 static int rc_config_get_xff (oconfig_item_t const *ci, double *ret)
177 {
178   double value;
179
180   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
181   {
182     ERROR ("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
183         "in the range [0.0, 1.0)", ci->key);
184     return (EINVAL);
185   }
186
187   value = ci->values[0].value.number;
188   if ((value >= 0.0) && (value < 1.0))
189   {
190     *ret = value;
191     return (0);
192   }
193
194   ERROR ("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
195       "in the range [0.0, 1.0)", ci->key);
196   return (EINVAL);
197 } /* int rc_config_get_xff */
198
199 static int rc_config_add_timespan (int timespan)
200 {
201   int *tmp;
202
203   if (timespan <= 0)
204     return (EINVAL);
205
206   tmp = realloc (rrdcreate_config.timespans,
207       sizeof (*rrdcreate_config.timespans)
208       * (rrdcreate_config.timespans_num + 1));
209   if (tmp == NULL)
210     return (ENOMEM);
211   rrdcreate_config.timespans = tmp;
212
213   rrdcreate_config.timespans[rrdcreate_config.timespans_num] = timespan;
214   rrdcreate_config.timespans_num++;
215
216   return (0);
217 } /* int rc_config_add_timespan */
218
219 static int rc_config (oconfig_item_t *ci)
220 {
221   int i;
222
223   for (i = 0; i < ci->children_num; i++)
224   {
225     oconfig_item_t const *child = ci->children + i;
226     const char *key = child->key;
227     int status = 0;
228
229     if (strcasecmp ("DataDir", key) == 0)
230     {
231       status = cf_util_get_string (child, &datadir);
232       if (status == 0)
233       {
234         int len = strlen (datadir);
235
236         while ((len > 0) && (datadir[len - 1] == '/'))
237         {
238           len--;
239           datadir[len] = 0;
240         }
241
242         if (len <= 0)
243           sfree (datadir);
244       }
245     }
246     else if (strcasecmp ("DaemonAddress", key) == 0)
247       status = cf_util_get_string (child, &daemon_address);
248     else if (strcasecmp ("CreateFiles", key) == 0)
249       status = cf_util_get_boolean (child, &config_create_files);
250     else if (strcasecmp ("CreateFilesAsync", key) == 0)
251       status = cf_util_get_boolean (child, &rrdcreate_config.async);
252     else if (strcasecmp ("CollectStatistics", key) == 0)
253       status = cf_util_get_boolean (child, &config_collect_stats);
254     else if (strcasecmp ("StepSize", key) == 0)
255     {
256       int tmp = -1;
257
258       status = rc_config_get_int_positive (child, &tmp);
259       if (status == 0)
260         rrdcreate_config.stepsize = (unsigned long) tmp;
261     }
262     else if (strcasecmp ("HeartBeat", key) == 0)
263       status = rc_config_get_int_positive (child, &rrdcreate_config.heartbeat);
264     else if (strcasecmp ("RRARows", key) == 0)
265       status = rc_config_get_int_positive (child, &rrdcreate_config.rrarows);
266     else if (strcasecmp ("RRATimespan", key) == 0)
267     {
268       int tmp = -1;
269       status = rc_config_get_int_positive (child, &tmp);
270       if (status == 0)
271         status = rc_config_add_timespan (tmp);
272     }
273     else if (strcasecmp ("XFF", key) == 0)
274       status = rc_config_get_xff (child, &rrdcreate_config.xff);
275     else
276     {
277       WARNING ("rrdcached plugin: Ignoring invalid option %s.", key);
278       continue;
279     }
280
281     if (status != 0)
282       WARNING ("rrdcached plugin: Handling the \"%s\" option failed.", key);
283   }
284
285   if (daemon_address != NULL)
286   {
287     plugin_register_write ("rrdcached", rc_write, /* user_data = */ NULL);
288     plugin_register_flush ("rrdcached", rc_flush, /* user_data = */ NULL);
289   }
290   return (0);
291 } /* int rc_config */
292
293 static int rc_read (void)
294 {
295   int status;
296   rrdc_stats_t *head;
297   rrdc_stats_t *ptr;
298
299   value_t values[1];
300   value_list_t vl = VALUE_LIST_INIT;
301
302   if (daemon_address == NULL)
303     return (-1);
304
305   if (!config_collect_stats)
306     return (-1);
307
308   vl.values = values;
309   vl.values_len = 1;
310
311   if ((strncmp ("unix:", daemon_address, strlen ("unix:")) == 0)
312       || (daemon_address[0] == '/'))
313     sstrncpy (vl.host, hostname_g, sizeof (vl.host));
314   else
315     sstrncpy (vl.host, daemon_address, sizeof (vl.host));
316   sstrncpy (vl.plugin, "rrdcached", sizeof (vl.plugin));
317
318   status = rrdc_connect (daemon_address);
319   if (status != 0)
320   {
321     ERROR ("rrdcached plugin: rrdc_connect (%s) failed with status %i.",
322         daemon_address, status);
323     return (-1);
324   }
325
326   head = NULL;
327   status = rrdc_stats_get (&head);
328   if (status != 0)
329   {
330     ERROR ("rrdcached plugin: rrdc_stats_get failed with status %i.", status);
331     return (-1);
332   }
333
334   for (ptr = head; ptr != NULL; ptr = ptr->next)
335   {
336     if (ptr->type == RRDC_STATS_TYPE_GAUGE)
337       values[0].gauge = (gauge_t) ptr->value.gauge;
338     else if (ptr->type == RRDC_STATS_TYPE_COUNTER)
339       values[0].counter = (counter_t) ptr->value.counter;
340     else
341       continue;
342
343     if (strcasecmp ("QueueLength", ptr->name) == 0)
344     {
345       sstrncpy (vl.type, "queue_length", sizeof (vl.type));
346       sstrncpy (vl.type_instance, "", sizeof (vl.type_instance));
347     }
348     else if (strcasecmp ("UpdatesWritten", ptr->name) == 0)
349     {
350       sstrncpy (vl.type, "operations", sizeof (vl.type));
351       sstrncpy (vl.type_instance, "write-updates", sizeof (vl.type_instance));
352     }
353     else if (strcasecmp ("DataSetsWritten", ptr->name) == 0)
354     {
355       sstrncpy (vl.type, "operations", sizeof (vl.type));
356       sstrncpy (vl.type_instance, "write-data_sets",
357           sizeof (vl.type_instance));
358     }
359     else if (strcasecmp ("TreeNodesNumber", ptr->name) == 0)
360     {
361       sstrncpy (vl.type, "gauge", sizeof (vl.type));
362       sstrncpy (vl.type_instance, "tree_nodes", sizeof (vl.type_instance));
363     }
364     else if (strcasecmp ("TreeDepth", ptr->name) == 0)
365     {
366       sstrncpy (vl.type, "gauge", sizeof (vl.type));
367       sstrncpy (vl.type_instance, "tree_depth", sizeof (vl.type_instance));
368     }
369     else if (strcasecmp ("FlushesReceived", ptr->name) == 0)
370     {
371       sstrncpy (vl.type, "operations", sizeof (vl.type));
372       sstrncpy (vl.type_instance, "receive-flush", sizeof (vl.type_instance));
373     }
374     else if (strcasecmp ("JournalBytes", ptr->name) == 0)
375     {
376       sstrncpy (vl.type, "counter", sizeof (vl.type));
377       sstrncpy (vl.type_instance, "journal-bytes", sizeof (vl.type_instance));
378     }
379     else if (strcasecmp ("JournalRotate", ptr->name) == 0)
380     {
381       sstrncpy (vl.type, "counter", sizeof (vl.type));
382       sstrncpy (vl.type_instance, "journal-rotates", sizeof (vl.type_instance));
383     }
384     else if (strcasecmp ("UpdatesReceived", ptr->name) == 0)
385     {
386       sstrncpy (vl.type, "operations", sizeof (vl.type));
387       sstrncpy (vl.type_instance, "receive-update", sizeof (vl.type_instance));
388     }
389     else
390     {
391       DEBUG ("rrdcached plugin: rc_read: Unknown statistic `%s'.", ptr->name);
392       continue;
393     }
394
395     plugin_dispatch_values (&vl);
396   } /* for (ptr = head; ptr != NULL; ptr = ptr->next) */
397
398   rrdc_stats_free (head);
399
400   return (0);
401 } /* int rc_read */
402
403 static int rc_init (void)
404 {
405   if (config_collect_stats)
406     plugin_register_read ("rrdcached", rc_read);
407
408   return (0);
409 } /* int rc_init */
410
411 static int rc_write (const data_set_t *ds, const value_list_t *vl,
412     user_data_t __attribute__((unused)) *user_data)
413 {
414   char filename[PATH_MAX];
415   char values[512];
416   char *values_array[2];
417   int status;
418
419   if (daemon_address == NULL)
420   {
421     ERROR ("rrdcached plugin: daemon_address == NULL.");
422     plugin_unregister_write ("rrdcached");
423     return (-1);
424   }
425
426   if (strcmp (ds->type, vl->type) != 0)
427   {
428     ERROR ("rrdcached plugin: DS type does not match value list type");
429     return (-1);
430   }
431
432   if (value_list_to_filename (filename, sizeof (filename), vl) != 0)
433   {
434     ERROR ("rrdcached plugin: value_list_to_filename failed.");
435     return (-1);
436   }
437
438   if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
439   {
440     ERROR ("rrdcached plugin: value_list_to_string failed.");
441     return (-1);
442   }
443
444   values_array[0] = values;
445   values_array[1] = NULL;
446
447   if (config_create_files)
448   {
449     struct stat statbuf;
450
451     status = stat (filename, &statbuf);
452     if (status != 0)
453     {
454       if (errno != ENOENT)
455       {
456         char errbuf[1024];
457         ERROR ("rrdcached plugin: stat (%s) failed: %s",
458             filename, sstrerror (errno, errbuf, sizeof (errbuf)));
459         return (-1);
460       }
461
462       status = cu_rrd_create_file (filename, ds, vl, &rrdcreate_config);
463       if (status != 0)
464       {
465         ERROR ("rrdcached plugin: cu_rrd_create_file (%s) failed.",
466             filename);
467         return (-1);
468       }
469       else if (rrdcreate_config.async)
470         return (0);
471     }
472   }
473
474   status = rrdc_connect (daemon_address);
475   if (status != 0)
476   {
477     ERROR ("rrdcached plugin: rrdc_connect (%s) failed with status %i.",
478         daemon_address, status);
479     return (-1);
480   }
481
482   status = rrdc_update (filename, /* values_num = */ 1, (void *) values_array);
483   if (status != 0)
484   {
485     ERROR ("rrdcached plugin: rrdc_update (%s, [%s], 1) failed with "
486         "status %i.",
487         filename, values_array[0], status);
488     return (-1);
489   }
490
491   return (0);
492 } /* int rc_write */
493
494 static int rc_flush (__attribute__((unused)) cdtime_t timeout, /* {{{ */
495     const char *identifier,
496     __attribute__((unused)) user_data_t *ud)
497 {
498   char filename[PATH_MAX + 1];
499   int status;
500
501   if (identifier == NULL)
502     return (EINVAL);
503
504   if (datadir != NULL)
505     ssnprintf (filename, sizeof (filename), "%s/%s.rrd", datadir, identifier);
506   else
507     ssnprintf (filename, sizeof (filename), "%s.rrd", identifier);
508
509   status = rrdc_connect (daemon_address);
510   if (status != 0)
511   {
512     ERROR ("rrdcached plugin: rrdc_connect (%s) failed with status %i.",
513         daemon_address, status);
514     return (-1);
515   }
516
517   status = rrdc_flush (filename);
518   if (status != 0)
519   {
520     ERROR ("rrdcached plugin: rrdc_flush (%s) failed with status %i.",
521         filename, status);
522     return (-1);
523   }
524   DEBUG ("rrdcached plugin: rrdc_flush (%s): Success.", filename);
525
526   return (0);
527 } /* }}} int rc_flush */
528
529 static int rc_shutdown (void)
530 {
531   rrdc_disconnect ();
532   return (0);
533 } /* int rc_shutdown */
534
535 void module_register (void)
536 {
537   plugin_register_complex_config ("rrdcached", rc_config);
538   plugin_register_init ("rrdcached", rc_init);
539   plugin_register_shutdown ("rrdcached", rc_shutdown);
540 } /* void module_register */
541
542 /*
543  * vim: set sw=2 sts=2 et :
544  */