Merge pull request #2618 from ajssmith/amqp1_dev1_branch
[collectd.git] / src / utils_rrdcreate.c
1 /**
2  * collectd - src/utils_rrdcreate.c
3  * Copyright (C) 2006-2013  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "utils_rrdcreate.h"
31
32 #include <pthread.h>
33 #include <rrd.h>
34
35 struct srrd_create_args_s {
36   char *filename;
37   unsigned long pdp_step;
38   time_t last_up;
39   int argc;
40   char **argv;
41 };
42 typedef struct srrd_create_args_s srrd_create_args_t;
43
44 struct async_create_file_s;
45 typedef struct async_create_file_s async_create_file_t;
46 struct async_create_file_s {
47   char *filename;
48   async_create_file_t *next;
49 };
50
51 /*
52  * Private variables
53  */
54 static int rra_timespans[] = {3600, 86400, 604800, 2678400, 31622400};
55 static int rra_timespans_num = STATIC_ARRAY_SIZE(rra_timespans);
56
57 static const char *const rra_types[] = {"AVERAGE", "MIN", "MAX"};
58 static int rra_types_num = STATIC_ARRAY_SIZE(rra_types);
59
60 #if !defined(HAVE_THREADSAFE_LIBRRD)
61 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
62 #endif
63
64 static async_create_file_t *async_creation_list;
65 static pthread_mutex_t async_creation_lock = PTHREAD_MUTEX_INITIALIZER;
66
67 /*
68  * Private functions
69  */
70 static void rra_free(int rra_num, char **rra_def) /* {{{ */
71 {
72   for (int i = 0; i < rra_num; i++) {
73     sfree(rra_def[i]);
74   }
75   sfree(rra_def);
76 } /* }}} void rra_free */
77
78 static void srrd_create_args_destroy(srrd_create_args_t *args) {
79   if (args == NULL)
80     return;
81
82   sfree(args->filename);
83   if (args->argv != NULL) {
84     for (int i = 0; i < args->argc; i++)
85       sfree(args->argv[i]);
86     sfree(args->argv);
87   }
88   sfree(args);
89 } /* void srrd_create_args_destroy */
90
91 static srrd_create_args_t *srrd_create_args_create(const char *filename,
92                                                    unsigned long pdp_step,
93                                                    time_t last_up, int argc,
94                                                    const char **argv) {
95   srrd_create_args_t *args;
96
97   args = calloc(1, sizeof(*args));
98   if (args == NULL) {
99     ERROR("srrd_create_args_create: calloc failed.");
100     return NULL;
101   }
102   args->filename = NULL;
103   args->pdp_step = pdp_step;
104   args->last_up = last_up;
105   args->argv = NULL;
106
107   args->filename = strdup(filename);
108   if (args->filename == NULL) {
109     ERROR("srrd_create_args_create: strdup failed.");
110     srrd_create_args_destroy(args);
111     return NULL;
112   }
113
114   args->argv = calloc((size_t)(argc + 1), sizeof(*args->argv));
115   if (args->argv == NULL) {
116     ERROR("srrd_create_args_create: calloc failed.");
117     srrd_create_args_destroy(args);
118     return NULL;
119   }
120
121   for (args->argc = 0; args->argc < argc; args->argc++) {
122     args->argv[args->argc] = strdup(argv[args->argc]);
123     if (args->argv[args->argc] == NULL) {
124       ERROR("srrd_create_args_create: strdup failed.");
125       srrd_create_args_destroy(args);
126       return NULL;
127     }
128   }
129   assert(args->argc == argc);
130   args->argv[args->argc] = NULL;
131
132   return args;
133 } /* srrd_create_args_t *srrd_create_args_create */
134
135 /* * * * * * * * * *
136  * WARNING:  Magic *
137  * * * * * * * * * */
138 static int rra_get(char ***ret, const value_list_t *vl, /* {{{ */
139                    const rrdcreate_config_t *cfg) {
140   char **rra_def;
141   int rra_num;
142
143   int *rts;
144   int rts_num;
145
146   int rra_max;
147
148   int cdp_num;
149   int cdp_len;
150
151   /* The stepsize we use here: If it is user-set, use it. If not, use the
152    * interval of the value-list. */
153   int ss;
154
155   if (cfg->rrarows <= 0) {
156     *ret = NULL;
157     return -1;
158   }
159
160   if ((cfg->xff < 0) || (cfg->xff >= 1.0)) {
161     *ret = NULL;
162     return -1;
163   }
164
165   if (cfg->stepsize > 0)
166     ss = cfg->stepsize;
167   else
168     ss = (int)CDTIME_T_TO_TIME_T(vl->interval);
169   if (ss <= 0) {
170     *ret = NULL;
171     return -1;
172   }
173
174   /* Use the configured timespans or fall back to the built-in defaults */
175   if (cfg->timespans_num != 0) {
176     rts = cfg->timespans;
177     rts_num = cfg->timespans_num;
178   } else {
179     rts = rra_timespans;
180     rts_num = rra_timespans_num;
181   }
182
183   rra_max = rts_num * rra_types_num;
184   assert(rra_max > 0);
185
186   if ((rra_def = calloc(rra_max + 1, sizeof(*rra_def))) == NULL)
187     return -1;
188   rra_num = 0;
189
190   cdp_len = 0;
191   for (int i = 0; i < rts_num; i++) {
192     int span = rts[i];
193
194     if ((span / ss) < cfg->rrarows)
195       span = ss * cfg->rrarows;
196
197     if (cdp_len == 0)
198       cdp_len = 1;
199     else
200       cdp_len = (int)floor(((double)span) / ((double)(cfg->rrarows * ss)));
201
202     cdp_num = (int)ceil(((double)span) / ((double)(cdp_len * ss)));
203
204     for (int j = 0; j < rra_types_num; j++) {
205       char buffer[128];
206       int status;
207
208       if (rra_num >= rra_max)
209         break;
210
211       status = snprintf(buffer, sizeof(buffer), "RRA:%s:%.10f:%u:%u",
212                         rra_types[j], cfg->xff, cdp_len, cdp_num);
213
214       if ((status < 0) || ((size_t)status >= sizeof(buffer))) {
215         ERROR("rra_get: Buffer would have been truncated.");
216         continue;
217       }
218
219       rra_def[rra_num++] = sstrdup(buffer);
220     }
221   }
222
223   if (rra_num <= 0) {
224     sfree(rra_def);
225     return 0;
226   }
227
228   *ret = rra_def;
229   return rra_num;
230 } /* }}} int rra_get */
231
232 static void ds_free(int ds_num, char **ds_def) /* {{{ */
233 {
234   for (int i = 0; i < ds_num; i++)
235     if (ds_def[i] != NULL)
236       free(ds_def[i]);
237   free(ds_def);
238 } /* }}} void ds_free */
239
240 static int ds_get(char ***ret, /* {{{ */
241                   const data_set_t *ds, const value_list_t *vl,
242                   const rrdcreate_config_t *cfg) {
243   char **ds_def;
244   size_t ds_num;
245
246   char min[32];
247   char max[32];
248   char buffer[128];
249
250   assert(ds->ds_num > 0);
251
252   ds_def = calloc(ds->ds_num, sizeof(*ds_def));
253   if (ds_def == NULL) {
254     ERROR("rrdtool plugin: calloc failed: %s", STRERRNO);
255     return -1;
256   }
257
258   for (ds_num = 0; ds_num < ds->ds_num; ds_num++) {
259     data_source_t *d = ds->ds + ds_num;
260     const char *type;
261     int status;
262
263     ds_def[ds_num] = NULL;
264
265     if (d->type == DS_TYPE_COUNTER)
266       type = "COUNTER";
267     else if (d->type == DS_TYPE_GAUGE)
268       type = "GAUGE";
269     else if (d->type == DS_TYPE_DERIVE)
270       type = "DERIVE";
271     else if (d->type == DS_TYPE_ABSOLUTE)
272       type = "ABSOLUTE";
273     else {
274       ERROR("rrdtool plugin: Unknown DS type: %i", d->type);
275       break;
276     }
277
278     if (isnan(d->min)) {
279       sstrncpy(min, "U", sizeof(min));
280     } else
281       snprintf(min, sizeof(min), "%f", d->min);
282
283     if (isnan(d->max)) {
284       sstrncpy(max, "U", sizeof(max));
285     } else
286       snprintf(max, sizeof(max), "%f", d->max);
287
288     status = snprintf(
289         buffer, sizeof(buffer), "DS:%s:%s:%i:%s:%s", d->name, type,
290         (cfg->heartbeat > 0) ? cfg->heartbeat
291                              : (int)CDTIME_T_TO_TIME_T(2 * vl->interval),
292         min, max);
293     if ((status < 1) || ((size_t)status >= sizeof(buffer)))
294       break;
295
296     ds_def[ds_num] = sstrdup(buffer);
297   } /* for ds_num = 0 .. ds->ds_num */
298
299   if (ds_num != ds->ds_num) {
300     ds_free(ds_num, ds_def);
301     return -1;
302   }
303
304   if (ds_num == 0) {
305     sfree(ds_def);
306     return 0;
307   }
308
309   *ret = ds_def;
310   return ds_num;
311 } /* }}} int ds_get */
312
313 #if HAVE_THREADSAFE_LIBRRD
314 static int srrd_create(const char *filename, /* {{{ */
315                        unsigned long pdp_step, time_t last_up, int argc,
316                        const char **argv) {
317   int status;
318   char *filename_copy;
319
320   if ((filename == NULL) || (argv == NULL))
321     return -EINVAL;
322
323   /* Some versions of librrd don't have the `const' qualifier for the first
324    * argument, so we have to copy the pointer here to avoid warnings. It sucks,
325    * but what else can we do? :(  -octo */
326   filename_copy = strdup(filename);
327   if (filename_copy == NULL) {
328     ERROR("srrd_create: strdup failed.");
329     return -ENOMEM;
330   }
331
332   optind = 0; /* bug in librrd? */
333   rrd_clear_error();
334
335   status = rrd_create_r(filename_copy, pdp_step, last_up, argc, (void *)argv);
336
337   if (status != 0) {
338     WARNING("rrdtool plugin: rrd_create_r (%s) failed: %s", filename,
339             rrd_get_error());
340   }
341
342   sfree(filename_copy);
343
344   return status;
345 } /* }}} int srrd_create */
346 /* #endif HAVE_THREADSAFE_LIBRRD */
347
348 #else  /* !HAVE_THREADSAFE_LIBRRD */
349 static int srrd_create(const char *filename, /* {{{ */
350                        unsigned long pdp_step, time_t last_up, int argc,
351                        const char **argv) {
352   int status;
353
354   int new_argc;
355   char **new_argv;
356
357   char pdp_step_str[16];
358   char last_up_str[16];
359
360   new_argc = 6 + argc;
361   new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
362   if (new_argv == NULL) {
363     ERROR("rrdtool plugin: malloc failed.");
364     return -1;
365   }
366
367   if (last_up == 0)
368     last_up = time(NULL) - 10;
369
370   snprintf(pdp_step_str, sizeof(pdp_step_str), "%lu", pdp_step);
371   snprintf(last_up_str, sizeof(last_up_str), "%lu", (unsigned long)last_up);
372
373   new_argv[0] = "create";
374   new_argv[1] = (void *)filename;
375   new_argv[2] = "-s";
376   new_argv[3] = pdp_step_str;
377   new_argv[4] = "-b";
378   new_argv[5] = last_up_str;
379
380   memcpy(new_argv + 6, argv, argc * sizeof(char *));
381   new_argv[new_argc] = NULL;
382
383   pthread_mutex_lock(&librrd_lock);
384   optind = 0; /* bug in librrd? */
385   rrd_clear_error();
386
387   status = rrd_create(new_argc, new_argv);
388   pthread_mutex_unlock(&librrd_lock);
389
390   if (status != 0) {
391     WARNING("rrdtool plugin: rrd_create (%s) failed: %s", filename,
392             rrd_get_error());
393   }
394
395   sfree(new_argv);
396
397   return status;
398 } /* }}} int srrd_create */
399 #endif /* !HAVE_THREADSAFE_LIBRRD */
400
401 static int lock_file(char const *filename) /* {{{ */
402 {
403   async_create_file_t *ptr;
404   struct stat sb;
405   int status;
406
407   pthread_mutex_lock(&async_creation_lock);
408
409   for (ptr = async_creation_list; ptr != NULL; ptr = ptr->next)
410     if (strcmp(filename, ptr->filename) == 0)
411       break;
412
413   if (ptr != NULL) {
414     pthread_mutex_unlock(&async_creation_lock);
415     return EEXIST;
416   }
417
418   status = stat(filename, &sb);
419   if ((status == 0) || (errno != ENOENT)) {
420     pthread_mutex_unlock(&async_creation_lock);
421     return EEXIST;
422   }
423
424   ptr = malloc(sizeof(*ptr));
425   if (ptr == NULL) {
426     pthread_mutex_unlock(&async_creation_lock);
427     return ENOMEM;
428   }
429
430   ptr->filename = strdup(filename);
431   if (ptr->filename == NULL) {
432     pthread_mutex_unlock(&async_creation_lock);
433     sfree(ptr);
434     return ENOMEM;
435   }
436
437   ptr->next = async_creation_list;
438   async_creation_list = ptr;
439
440   pthread_mutex_unlock(&async_creation_lock);
441
442   return 0;
443 } /* }}} int lock_file */
444
445 static int unlock_file(char const *filename) /* {{{ */
446 {
447   async_create_file_t *this;
448   async_create_file_t *prev;
449
450   pthread_mutex_lock(&async_creation_lock);
451
452   prev = NULL;
453   for (this = async_creation_list; this != NULL; this = this->next) {
454     if (strcmp(filename, this->filename) == 0)
455       break;
456     prev = this;
457   }
458
459   if (this == NULL) {
460     pthread_mutex_unlock(&async_creation_lock);
461     return ENOENT;
462   }
463
464   if (prev == NULL) {
465     assert(this == async_creation_list);
466     async_creation_list = this->next;
467   } else {
468     assert(this == prev->next);
469     prev->next = this->next;
470   }
471   this->next = NULL;
472
473   pthread_mutex_unlock(&async_creation_lock);
474
475   sfree(this->filename);
476   sfree(this);
477
478   return 0;
479 } /* }}} int unlock_file */
480
481 static void *srrd_create_thread(void *targs) /* {{{ */
482 {
483   srrd_create_args_t *args = targs;
484   char tmpfile[PATH_MAX];
485   int status;
486
487   status = lock_file(args->filename);
488   if (status != 0) {
489     if (status == EEXIST)
490       NOTICE("srrd_create_thread: File \"%s\" is already being created.",
491              args->filename);
492     else
493       ERROR("srrd_create_thread: Unable to lock file \"%s\".", args->filename);
494     srrd_create_args_destroy(args);
495     return 0;
496   }
497
498   snprintf(tmpfile, sizeof(tmpfile), "%s.async", args->filename);
499
500   status = srrd_create(tmpfile, args->pdp_step, args->last_up, args->argc,
501                        (void *)args->argv);
502   if (status != 0) {
503     WARNING("srrd_create_thread: srrd_create (%s) returned status %i.",
504             args->filename, status);
505     unlink(tmpfile);
506     unlock_file(args->filename);
507     srrd_create_args_destroy(args);
508     return 0;
509   }
510
511   status = rename(tmpfile, args->filename);
512   if (status != 0) {
513     ERROR("srrd_create_thread: rename (\"%s\", \"%s\") failed: %s", tmpfile,
514           args->filename, STRERRNO);
515     unlink(tmpfile);
516     unlock_file(args->filename);
517     srrd_create_args_destroy(args);
518     return 0;
519   }
520
521   DEBUG("srrd_create_thread: Successfully created RRD file \"%s\".",
522         args->filename);
523
524   unlock_file(args->filename);
525   srrd_create_args_destroy(args);
526
527   return 0;
528 } /* }}} void *srrd_create_thread */
529
530 static int srrd_create_async(const char *filename, /* {{{ */
531                              unsigned long pdp_step, time_t last_up, int argc,
532                              const char **argv) {
533   srrd_create_args_t *args;
534   pthread_t thread;
535   pthread_attr_t attr;
536   int status;
537
538   DEBUG("srrd_create_async: Creating \"%s\" in the background.", filename);
539
540   args = srrd_create_args_create(filename, pdp_step, last_up, argc, argv);
541   if (args == NULL)
542     return -1;
543
544   status = pthread_attr_init(&attr);
545   if (status != 0) {
546     srrd_create_args_destroy(args);
547     return -1;
548   }
549
550   status = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
551   if (status != 0) {
552     pthread_attr_destroy(&attr);
553     srrd_create_args_destroy(args);
554     return -1;
555   }
556
557   status = pthread_create(&thread, &attr, srrd_create_thread, args);
558   if (status != 0) {
559     ERROR("srrd_create_async: pthread_create failed: %s", STRERROR(status));
560     pthread_attr_destroy(&attr);
561     srrd_create_args_destroy(args);
562     return status;
563   }
564
565   pthread_attr_destroy(&attr);
566   /* args is freed in srrd_create_thread(). */
567   return 0;
568 } /* }}} int srrd_create_async */
569
570 /*
571  * Public functions
572  */
573 int cu_rrd_create_file(const char *filename, /* {{{ */
574                        const data_set_t *ds, const value_list_t *vl,
575                        const rrdcreate_config_t *cfg) {
576   char **argv;
577   int argc;
578   char **rra_def = NULL;
579   int rra_num;
580   char **ds_def = NULL;
581   int ds_num;
582   int status = 0;
583   time_t last_up;
584   unsigned long stepsize;
585
586   if (check_create_dir(filename))
587     return -1;
588
589   if ((rra_num = rra_get(&rra_def, vl, cfg)) < 1) {
590     ERROR("cu_rrd_create_file failed: Could not calculate RRAs");
591     return -1;
592   }
593
594   if ((ds_num = ds_get(&ds_def, ds, vl, cfg)) < 1) {
595     ERROR("cu_rrd_create_file failed: Could not calculate DSes");
596     rra_free(rra_num, rra_def);
597     return -1;
598   }
599
600   argc = ds_num + rra_num;
601
602   if ((argv = malloc(sizeof(*argv) * (argc + 1))) == NULL) {
603     ERROR("cu_rrd_create_file failed: %s", STRERRNO);
604     rra_free(rra_num, rra_def);
605     ds_free(ds_num, ds_def);
606     return -1;
607   }
608
609   memcpy(argv, ds_def, ds_num * sizeof(char *));
610   memcpy(argv + ds_num, rra_def, rra_num * sizeof(char *));
611   argv[ds_num + rra_num] = NULL;
612
613   last_up = CDTIME_T_TO_TIME_T(vl->time);
614   if (last_up <= 0)
615     last_up = time(NULL);
616   last_up -= 1;
617
618   if (cfg->stepsize > 0)
619     stepsize = cfg->stepsize;
620   else
621     stepsize = (unsigned long)CDTIME_T_TO_TIME_T(vl->interval);
622
623   if (cfg->async) {
624     status = srrd_create_async(filename, stepsize, last_up, argc,
625                                (const char **)argv);
626     if (status != 0)
627       WARNING("cu_rrd_create_file: srrd_create_async (%s) "
628               "returned status %i.",
629               filename, status);
630   } else /* synchronous */
631   {
632     status = lock_file(filename);
633     if (status != 0) {
634       if (status == EEXIST)
635         NOTICE("cu_rrd_create_file: File \"%s\" is already being created.",
636                filename);
637       else
638         ERROR("cu_rrd_create_file: Unable to lock file \"%s\".", filename);
639     } else {
640       status =
641           srrd_create(filename, stepsize, last_up, argc, (const char **)argv);
642
643       if (status != 0) {
644         WARNING("cu_rrd_create_file: srrd_create (%s) returned status %i.",
645                 filename, status);
646       } else {
647         DEBUG("cu_rrd_create_file: Successfully created RRD file \"%s\".",
648               filename);
649       }
650       unlock_file(filename);
651     }
652   }
653
654   free(argv);
655   ds_free(ds_num, ds_def);
656   rra_free(rra_num, rra_def);
657
658   return status;
659 } /* }}} int cu_rrd_create_file */