Fix compile time issues
[collectd.git] / src / tail_csv.c
1 /**
2  * collectd - src/tail_csv.c
3  * Copyright (C) 2013 Kris Nielander
4  * Copyright (C) 2013 Florian Forster
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Kris Nielander <nielander at fox-it.com>
21  *   Florian Forster <octo at collectd.org>
22  **/
23
24 #include "collectd.h"
25
26 #include "plugin.h"              /* plugin_register_*, plugin_dispatch_values */
27 #include "utils/common/common.h" /* auxiliary functions */
28 #include "utils/tail/tail.h"
29
30 #include <fcntl.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <sys/mman.h>
34 #include <sys/stat.h>
35
36 struct metric_definition_s {
37   char *name;
38   char *type;
39   char *instance;
40   int data_source_type;
41   ssize_t value_from;
42   struct metric_definition_s *next;
43 };
44 typedef struct metric_definition_s metric_definition_t;
45
46 struct instance_definition_s {
47   char *plugin_name;
48   char *instance;
49   char *path;
50   cu_tail_t *tail;
51   metric_definition_t **metric_list;
52   size_t metric_list_len;
53   ssize_t time_from;
54   struct instance_definition_s *next;
55 };
56 typedef struct instance_definition_s instance_definition_t;
57
58 /* Private */
59 static metric_definition_t *metric_head;
60
61 static int tcsv_submit(instance_definition_t *id, metric_definition_t *md,
62                        value_t v, cdtime_t t) {
63   /* Registration variables */
64   value_list_t vl = VALUE_LIST_INIT;
65
66   /* Register */
67   vl.values_len = 1;
68   vl.values = &v;
69
70   sstrncpy(vl.plugin, (id->plugin_name != NULL) ? id->plugin_name : "tail_csv",
71            sizeof(vl.plugin));
72   if (id->instance != NULL)
73     sstrncpy(vl.plugin_instance, id->instance, sizeof(vl.plugin_instance));
74   sstrncpy(vl.type, md->type, sizeof(vl.type));
75   if (md->instance != NULL)
76     sstrncpy(vl.type_instance, md->instance, sizeof(vl.type_instance));
77
78   vl.time = t;
79
80   return plugin_dispatch_values(&vl);
81 }
82
83 static cdtime_t parse_time(char const *tbuf) {
84   double t;
85   char *endptr = NULL;
86
87   errno = 0;
88   t = strtod(tbuf, &endptr);
89   if ((errno != 0) || (endptr == NULL) || (endptr[0] != 0))
90     return cdtime();
91
92   return DOUBLE_TO_CDTIME_T(t);
93 }
94
95 static int tcsv_read_metric(instance_definition_t *id, metric_definition_t *md,
96                             char **fields, size_t fields_num) {
97   value_t v;
98   cdtime_t t = 0;
99   int status;
100
101   if (md->data_source_type == -1)
102     return EINVAL;
103
104   assert(md->value_from >= 0);
105   if (((size_t)md->value_from) >= fields_num)
106     return EINVAL;
107
108   status = parse_value(fields[md->value_from], &v, md->data_source_type);
109   if (status != 0)
110     return status;
111
112   if (id->time_from >= 0) {
113     if (((size_t)id->time_from) >= fields_num)
114       return EINVAL;
115     t = parse_time(fields[id->time_from]);
116   }
117
118   return tcsv_submit(id, md, v, t);
119 }
120
121 static bool tcsv_check_index(ssize_t index, size_t fields_num,
122                              char const *name) {
123   if (index < 0)
124     return true;
125   else if (((size_t)index) < fields_num)
126     return true;
127
128   ERROR("tail_csv plugin: Metric \"%s\": Request for index %zd when "
129         "only %" PRIsz " fields are available.",
130         name, index, fields_num);
131   return false;
132 }
133
134 static int tcsv_read_buffer(instance_definition_t *id, char *buffer,
135                             size_t buffer_size) {
136   char **metrics;
137   size_t metrics_num;
138
139   char *ptr;
140   size_t i;
141
142   /* Remove newlines at the end of line. */
143   while (buffer_size > 0) {
144     if ((buffer[buffer_size - 1] == '\n') ||
145         (buffer[buffer_size - 1] == '\r')) {
146       buffer[buffer_size - 1] = '\0';
147       buffer_size--;
148     } else {
149       break;
150     }
151   }
152
153   /* Ignore empty lines. */
154   if ((buffer_size == 0) || (buffer[0] == '#'))
155     return 0;
156
157   /* Count the number of fields. */
158   metrics_num = 1;
159   for (i = 0; i < buffer_size; i++) {
160     if (buffer[i] == ',')
161       metrics_num++;
162   }
163
164   if (metrics_num == 1) {
165     ERROR("tail_csv plugin: last line of `%s' does not contain "
166           "enough values.",
167           id->path);
168     return -1;
169   }
170
171   /* Create a list of all values */
172   metrics = calloc(metrics_num, sizeof(*metrics));
173   if (metrics == NULL) {
174     ERROR("tail_csv plugin: calloc failed.");
175     return ENOMEM;
176   }
177
178   ptr = buffer;
179   metrics[0] = ptr;
180   i = 1;
181   for (ptr = buffer; *ptr != 0; ptr++) {
182     if (*ptr != ',')
183       continue;
184
185     *ptr = 0;
186     metrics[i] = ptr + 1;
187     i++;
188   }
189   assert(i == metrics_num);
190
191   /* Register values */
192   for (i = 0; i < id->metric_list_len; ++i) {
193     metric_definition_t *md = id->metric_list[i];
194
195     if (!tcsv_check_index(md->value_from, metrics_num, md->name) ||
196         !tcsv_check_index(id->time_from, metrics_num, md->name))
197       continue;
198
199     tcsv_read_metric(id, md, metrics, metrics_num);
200   }
201
202   /* Free up resources */
203   sfree(metrics);
204   return 0;
205 }
206
207 static int tcsv_read(user_data_t *ud) {
208   instance_definition_t *id;
209   id = ud->data;
210
211   if (id->tail == NULL) {
212     id->tail = cu_tail_create(id->path);
213     if (id->tail == NULL) {
214       ERROR("tail_csv plugin: cu_tail_create (\"%s\") failed.", id->path);
215       return -1;
216     }
217   }
218
219   while (42) {
220     char buffer[1024];
221     size_t buffer_len;
222     int status;
223
224     status = cu_tail_readline(id->tail, buffer, (int)sizeof(buffer));
225     if (status != 0) {
226       ERROR("tail_csv plugin: File \"%s\": cu_tail_readline failed "
227             "with status %i.",
228             id->path, status);
229       return -1;
230     }
231
232     buffer_len = strlen(buffer);
233     if (buffer_len == 0)
234       break;
235
236     tcsv_read_buffer(id, buffer, buffer_len);
237   }
238
239   return 0;
240 }
241
242 static void tcsv_metric_definition_destroy(void *arg) {
243   metric_definition_t *md;
244   metric_definition_t *next;
245
246   md = arg;
247   if (md == NULL)
248     return;
249
250   next = md->next;
251   md->next = NULL;
252
253   sfree(md->name);
254   sfree(md->type);
255   sfree(md->instance);
256   sfree(md);
257
258   tcsv_metric_definition_destroy(next);
259 }
260
261 static int tcsv_config_get_index(oconfig_item_t *ci, ssize_t *ret_index) {
262   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) {
263     WARNING("tail_csv plugin: The \"%s\" config option needs exactly one "
264             "integer argument.",
265             ci->key);
266     return -1;
267   }
268
269   if (ci->values[0].value.number < 0) {
270     WARNING("tail_csv plugin: The \"%s\" config option must be positive "
271             "(or zero).",
272             ci->key);
273     return -1;
274   }
275
276   *ret_index = (ssize_t)ci->values[0].value.number;
277   return 0;
278 }
279
280 /* Parse metric  */
281 static int tcsv_config_add_metric(oconfig_item_t *ci) {
282   metric_definition_t *md;
283   int status;
284
285   md = calloc(1, sizeof(*md));
286   if (md == NULL)
287     return -1;
288   md->name = NULL;
289   md->type = NULL;
290   md->instance = NULL;
291   md->data_source_type = -1;
292   md->value_from = -1;
293   md->next = NULL;
294
295   status = cf_util_get_string(ci, &md->name);
296   if (status != 0) {
297     sfree(md);
298     return -1;
299   }
300
301   for (int i = 0; i < ci->children_num; ++i) {
302     oconfig_item_t *option = ci->children + i;
303
304     if (strcasecmp("Type", option->key) == 0)
305       status = cf_util_get_string(option, &md->type);
306     else if (strcasecmp("Instance", option->key) == 0)
307       status = cf_util_get_string(option, &md->instance);
308     else if (strcasecmp("ValueFrom", option->key) == 0)
309       status = tcsv_config_get_index(option, &md->value_from);
310     else {
311       WARNING("tail_csv plugin: Option `%s' not allowed here.", option->key);
312       status = -1;
313     }
314
315     if (status != 0)
316       break;
317   }
318
319   if (status != 0) {
320     tcsv_metric_definition_destroy(md);
321     return -1;
322   }
323
324   /* Verify all necessary options have been set. */
325   if (md->type == NULL) {
326     WARNING("tail_csv plugin: Option `Type' must be set.");
327     status = -1;
328   } else if (md->value_from < 0) {
329     WARNING("tail_csv plugin: Option `ValueFrom' must be set.");
330     status = -1;
331   }
332   if (status != 0) {
333     tcsv_metric_definition_destroy(md);
334     return status;
335   }
336
337   if (metric_head == NULL)
338     metric_head = md;
339   else {
340     metric_definition_t *last;
341     last = metric_head;
342     while (last->next != NULL)
343       last = last->next;
344     last->next = md;
345   }
346
347   return 0;
348 }
349
350 static void tcsv_instance_definition_destroy(void *arg) {
351   instance_definition_t *id;
352
353   id = arg;
354   if (id == NULL)
355     return;
356
357   if (id->tail != NULL)
358     cu_tail_destroy(id->tail);
359   id->tail = NULL;
360
361   sfree(id->plugin_name);
362   sfree(id->instance);
363   sfree(id->path);
364   sfree(id->metric_list);
365   sfree(id);
366 }
367
368 static int tcsv_config_add_instance_collect(instance_definition_t *id,
369                                             oconfig_item_t *ci) {
370   metric_definition_t *metric;
371   metric_definition_t **metric_list;
372   size_t metric_list_size;
373
374   if (ci->values_num < 1) {
375     WARNING("tail_csv plugin: The `Collect' config option needs at least one "
376             "argument.");
377     return -1;
378   }
379
380   metric_list_size = id->metric_list_len + (size_t)ci->values_num;
381   metric_list =
382       realloc(id->metric_list, sizeof(*id->metric_list) * metric_list_size);
383   if (metric_list == NULL)
384     return -1;
385   id->metric_list = metric_list;
386
387   for (int i = 0; i < ci->values_num; i++) {
388     char *metric_name;
389
390     if (ci->values[i].type != OCONFIG_TYPE_STRING) {
391       WARNING("tail_csv plugin: All arguments to `Collect' must be strings.");
392       continue;
393     }
394     metric_name = ci->values[i].value.string;
395
396     for (metric = metric_head; metric != NULL; metric = metric->next)
397       if (strcasecmp(metric_name, metric->name) == 0)
398         break;
399
400     if (metric == NULL) {
401       WARNING("tail_csv plugin: `Collect' argument not found `%s'.",
402               metric_name);
403       continue;
404     }
405
406     id->metric_list[id->metric_list_len] = metric;
407     id->metric_list_len++;
408   }
409
410   return 0;
411 }
412
413 /* <File /> block */
414 static int tcsv_config_add_file(oconfig_item_t *ci) {
415   instance_definition_t *id;
416   int status = 0;
417
418   /* Registration variables */
419   cdtime_t interval = 0;
420   char cb_name[DATA_MAX_NAME_LEN];
421
422   id = calloc(1, sizeof(*id));
423   if (id == NULL)
424     return -1;
425   id->plugin_name = NULL;
426   id->instance = NULL;
427   id->path = NULL;
428   id->metric_list = NULL;
429   id->time_from = -1;
430   id->next = NULL;
431
432   status = cf_util_get_string(ci, &id->path);
433   if (status != 0) {
434     sfree(id);
435     return status;
436   }
437
438   for (int i = 0; i < ci->children_num; ++i) {
439     oconfig_item_t *option = ci->children + i;
440     status = 0;
441
442     if (strcasecmp("Instance", option->key) == 0)
443       status = cf_util_get_string(option, &id->instance);
444     else if (strcasecmp("Collect", option->key) == 0)
445       status = tcsv_config_add_instance_collect(id, option);
446     else if (strcasecmp("Interval", option->key) == 0)
447       cf_util_get_cdtime(option, &interval);
448     else if (strcasecmp("TimeFrom", option->key) == 0)
449       status = tcsv_config_get_index(option, &id->time_from);
450     else if (strcasecmp("Plugin", option->key) == 0)
451       status = cf_util_get_string(option, &id->plugin_name);
452     else {
453       WARNING("tail_csv plugin: Option `%s' not allowed here.", option->key);
454       status = -1;
455     }
456
457     if (status != 0)
458       break;
459   }
460
461   if (status != 0) {
462     tcsv_instance_definition_destroy(id);
463     return -1;
464   }
465
466   /* Verify all necessary options have been set. */
467   if (id->path == NULL) {
468     WARNING("tail_csv plugin: Option `Path' must be set.");
469     status = -1;
470   } else if (id->metric_list == NULL) {
471     WARNING("tail_csv plugin: Option `Collect' must be set.");
472     status = -1;
473   }
474
475   if (status != 0) {
476     tcsv_instance_definition_destroy(id);
477     return -1;
478   }
479
480   snprintf(cb_name, sizeof(cb_name), "tail_csv/%s", id->path);
481
482   status = plugin_register_complex_read(
483       NULL, cb_name, tcsv_read, interval,
484       &(user_data_t){
485           .data = id,
486           .free_func = tcsv_instance_definition_destroy,
487       });
488   if (status != 0) {
489     ERROR("tail_csv plugin: Registering complex read function failed.");
490     return -1;
491   }
492
493   return 0;
494 }
495
496 /* Parse blocks */
497 static int tcsv_config(oconfig_item_t *ci) {
498   for (int i = 0; i < ci->children_num; ++i) {
499     oconfig_item_t *child = ci->children + i;
500     if (strcasecmp("Metric", child->key) == 0)
501       tcsv_config_add_metric(child);
502     else if (strcasecmp("File", child->key) == 0)
503       tcsv_config_add_file(child);
504     else
505       WARNING("tail_csv plugin: Ignore unknown config option `%s'.",
506               child->key);
507   }
508
509   return 0;
510 } /* int tcsv_config */
511
512 static int tcsv_init(void) { /* {{{ */
513   static bool have_init;
514   metric_definition_t *md;
515
516   if (have_init)
517     return 0;
518
519   for (md = metric_head; md != NULL; md = md->next) {
520     data_set_t const *ds;
521
522     /* Retrieve the data source type from the types db. */
523     ds = plugin_get_ds(md->type);
524     if (ds == NULL) {
525       ERROR("tail_csv plugin: Failed to look up type \"%s\" for "
526             "metric \"%s\". It may not be defined in the types.db "
527             "file. Please read the types.db(5) manual page for more "
528             "details.",
529             md->type, md->name);
530       continue;
531     } else if (ds->ds_num != 1) {
532       ERROR("tail_csv plugin: The type \"%s\" has %" PRIsz " data sources. "
533             "Only types with a single data source are supported.",
534             ds->type, ds->ds_num);
535       continue;
536     }
537
538     md->data_source_type = ds->ds->type;
539   }
540
541   return 0;
542 } /* }}} int tcsv_init */
543
544 static int tcsv_shutdown(void) {
545   tcsv_metric_definition_destroy(metric_head);
546   metric_head = NULL;
547
548   return 0;
549 }
550
551 void module_register(void) {
552   plugin_register_complex_config("tail_csv", tcsv_config);
553   plugin_register_init("tail_csv", tcsv_init);
554   plugin_register_shutdown("tail_csv", tcsv_shutdown);
555 }