Merge branch 'collectd-5.8'
[collectd.git] / src / tail_csv.c
1 /**
2  * collectd - src/tail_csv.c
3  * Copyright (C) 2013 Kris Nielander
4  * Copyright (C) 2013 Florian Forster
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Kris Nielander <nielander at fox-it.com>
21  *   Florian Forster <octo at collectd.org>
22  **/
23
24 #include "collectd.h"
25
26 #include "common.h" /* auxiliary functions */
27 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
28 #include "utils_tail.h"
29
30 #include <fcntl.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <sys/mman.h>
34 #include <sys/stat.h>
35
36 struct metric_definition_s {
37   char *name;
38   char *type;
39   char *instance;
40   int data_source_type;
41   ssize_t value_from;
42   struct metric_definition_s *next;
43 };
44 typedef struct metric_definition_s metric_definition_t;
45
46 struct instance_definition_s {
47   char *plugin_name;
48   char *instance;
49   char *path;
50   cu_tail_t *tail;
51   metric_definition_t **metric_list;
52   size_t metric_list_len;
53   cdtime_t interval;
54   ssize_t time_from;
55   struct instance_definition_s *next;
56 };
57 typedef struct instance_definition_s instance_definition_t;
58
59 /* Private */
60 static metric_definition_t *metric_head;
61
62 static int tcsv_submit(instance_definition_t *id, metric_definition_t *md,
63                        value_t v, cdtime_t t) {
64   /* Registration variables */
65   value_list_t vl = VALUE_LIST_INIT;
66
67   /* Register */
68   vl.values_len = 1;
69   vl.values = &v;
70
71   sstrncpy(vl.plugin, (id->plugin_name != NULL) ? id->plugin_name : "tail_csv",
72            sizeof(vl.plugin));
73   if (id->instance != NULL)
74     sstrncpy(vl.plugin_instance, id->instance, sizeof(vl.plugin_instance));
75   sstrncpy(vl.type, md->type, sizeof(vl.type));
76   if (md->instance != NULL)
77     sstrncpy(vl.type_instance, md->instance, sizeof(vl.type_instance));
78
79   vl.time = t;
80   vl.interval = id->interval;
81
82   return plugin_dispatch_values(&vl);
83 }
84
85 static cdtime_t parse_time(char const *tbuf) {
86   double t;
87   char *endptr = NULL;
88
89   errno = 0;
90   t = strtod(tbuf, &endptr);
91   if ((errno != 0) || (endptr == NULL) || (endptr[0] != 0))
92     return cdtime();
93
94   return DOUBLE_TO_CDTIME_T(t);
95 }
96
97 static int tcsv_read_metric(instance_definition_t *id, metric_definition_t *md,
98                             char **fields, size_t fields_num) {
99   value_t v;
100   cdtime_t t = 0;
101   int status;
102
103   if (md->data_source_type == -1)
104     return EINVAL;
105
106   assert(md->value_from >= 0);
107   if (((size_t)md->value_from) >= fields_num)
108     return EINVAL;
109
110   status = parse_value(fields[md->value_from], &v, md->data_source_type);
111   if (status != 0)
112     return status;
113
114   if (id->time_from >= 0) {
115     if (((size_t)id->time_from) >= fields_num)
116       return EINVAL;
117     t = parse_time(fields[id->time_from]);
118   }
119
120   return tcsv_submit(id, md, v, t);
121 }
122
123 static bool tcsv_check_index(ssize_t index, size_t fields_num,
124                              char const *name) {
125   if (index < 0)
126     return true;
127   else if (((size_t)index) < fields_num)
128     return true;
129
130   ERROR("tail_csv plugin: Metric \"%s\": Request for index %zd when "
131         "only %" PRIsz " fields are available.",
132         name, index, fields_num);
133   return false;
134 }
135
136 static int tcsv_read_buffer(instance_definition_t *id, char *buffer,
137                             size_t buffer_size) {
138   char **metrics;
139   size_t metrics_num;
140
141   char *ptr;
142   size_t i;
143
144   /* Remove newlines at the end of line. */
145   while (buffer_size > 0) {
146     if ((buffer[buffer_size - 1] == '\n') ||
147         (buffer[buffer_size - 1] == '\r')) {
148       buffer[buffer_size - 1] = 0;
149       buffer_size--;
150     } else {
151       break;
152     }
153   }
154
155   /* Ignore empty lines. */
156   if ((buffer_size == 0) || (buffer[0] == '#'))
157     return 0;
158
159   /* Count the number of fields. */
160   metrics_num = 1;
161   for (i = 0; i < buffer_size; i++) {
162     if (buffer[i] == ',')
163       metrics_num++;
164   }
165
166   if (metrics_num == 1) {
167     ERROR("tail_csv plugin: last line of `%s' does not contain "
168           "enough values.",
169           id->path);
170     return -1;
171   }
172
173   /* Create a list of all values */
174   metrics = calloc(metrics_num, sizeof(*metrics));
175   if (metrics == NULL) {
176     ERROR("tail_csv plugin: calloc failed.");
177     return ENOMEM;
178   }
179
180   ptr = buffer;
181   metrics[0] = ptr;
182   i = 1;
183   for (ptr = buffer; *ptr != 0; ptr++) {
184     if (*ptr != ',')
185       continue;
186
187     *ptr = 0;
188     metrics[i] = ptr + 1;
189     i++;
190   }
191   assert(i == metrics_num);
192
193   /* Register values */
194   for (i = 0; i < id->metric_list_len; ++i) {
195     metric_definition_t *md = id->metric_list[i];
196
197     if (!tcsv_check_index(md->value_from, metrics_num, md->name) ||
198         !tcsv_check_index(id->time_from, metrics_num, md->name))
199       continue;
200
201     tcsv_read_metric(id, md, metrics, metrics_num);
202   }
203
204   /* Free up resources */
205   sfree(metrics);
206   return 0;
207 }
208
209 static int tcsv_read(user_data_t *ud) {
210   instance_definition_t *id;
211   id = ud->data;
212
213   if (id->tail == NULL) {
214     id->tail = cu_tail_create(id->path);
215     if (id->tail == NULL) {
216       ERROR("tail_csv plugin: cu_tail_create (\"%s\") failed.", id->path);
217       return -1;
218     }
219   }
220
221   while (42) {
222     char buffer[1024];
223     size_t buffer_len;
224     int status;
225
226     status = cu_tail_readline(id->tail, buffer, (int)sizeof(buffer));
227     if (status != 0) {
228       ERROR("tail_csv plugin: File \"%s\": cu_tail_readline failed "
229             "with status %i.",
230             id->path, status);
231       return -1;
232     }
233
234     buffer_len = strlen(buffer);
235     if (buffer_len == 0)
236       break;
237
238     tcsv_read_buffer(id, buffer, buffer_len);
239   }
240
241   return 0;
242 }
243
244 static void tcsv_metric_definition_destroy(void *arg) {
245   metric_definition_t *md;
246   metric_definition_t *next;
247
248   md = arg;
249   if (md == NULL)
250     return;
251
252   next = md->next;
253   md->next = NULL;
254
255   sfree(md->name);
256   sfree(md->type);
257   sfree(md->instance);
258   sfree(md);
259
260   tcsv_metric_definition_destroy(next);
261 }
262
263 static int tcsv_config_get_index(oconfig_item_t *ci, ssize_t *ret_index) {
264   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) {
265     WARNING("tail_csv plugin: The \"%s\" config option needs exactly one "
266             "integer argument.",
267             ci->key);
268     return -1;
269   }
270
271   if (ci->values[0].value.number < 0) {
272     WARNING("tail_csv plugin: The \"%s\" config option must be positive "
273             "(or zero).",
274             ci->key);
275     return -1;
276   }
277
278   *ret_index = (ssize_t)ci->values[0].value.number;
279   return 0;
280 }
281
282 /* Parse metric  */
283 static int tcsv_config_add_metric(oconfig_item_t *ci) {
284   metric_definition_t *md;
285   int status;
286
287   md = calloc(1, sizeof(*md));
288   if (md == NULL)
289     return -1;
290   md->name = NULL;
291   md->type = NULL;
292   md->instance = NULL;
293   md->data_source_type = -1;
294   md->value_from = -1;
295   md->next = NULL;
296
297   status = cf_util_get_string(ci, &md->name);
298   if (status != 0) {
299     sfree(md);
300     return -1;
301   }
302
303   for (int i = 0; i < ci->children_num; ++i) {
304     oconfig_item_t *option = ci->children + i;
305
306     if (strcasecmp("Type", option->key) == 0)
307       status = cf_util_get_string(option, &md->type);
308     else if (strcasecmp("Instance", option->key) == 0)
309       status = cf_util_get_string(option, &md->instance);
310     else if (strcasecmp("ValueFrom", option->key) == 0)
311       status = tcsv_config_get_index(option, &md->value_from);
312     else {
313       WARNING("tail_csv plugin: Option `%s' not allowed here.", option->key);
314       status = -1;
315     }
316
317     if (status != 0)
318       break;
319   }
320
321   if (status != 0) {
322     tcsv_metric_definition_destroy(md);
323     return -1;
324   }
325
326   /* Verify all necessary options have been set. */
327   if (md->type == NULL) {
328     WARNING("tail_csv plugin: Option `Type' must be set.");
329     status = -1;
330   } else if (md->value_from < 0) {
331     WARNING("tail_csv plugin: Option `ValueFrom' must be set.");
332     status = -1;
333   }
334   if (status != 0) {
335     tcsv_metric_definition_destroy(md);
336     return status;
337   }
338
339   if (metric_head == NULL)
340     metric_head = md;
341   else {
342     metric_definition_t *last;
343     last = metric_head;
344     while (last->next != NULL)
345       last = last->next;
346     last->next = md;
347   }
348
349   return 0;
350 }
351
352 static void tcsv_instance_definition_destroy(void *arg) {
353   instance_definition_t *id;
354
355   id = arg;
356   if (id == NULL)
357     return;
358
359   if (id->tail != NULL)
360     cu_tail_destroy(id->tail);
361   id->tail = NULL;
362
363   sfree(id->plugin_name);
364   sfree(id->instance);
365   sfree(id->path);
366   sfree(id->metric_list);
367   sfree(id);
368 }
369
370 static int tcsv_config_add_instance_collect(instance_definition_t *id,
371                                             oconfig_item_t *ci) {
372   metric_definition_t *metric;
373   metric_definition_t **metric_list;
374   size_t metric_list_size;
375
376   if (ci->values_num < 1) {
377     WARNING("tail_csv plugin: The `Collect' config option needs at least one "
378             "argument.");
379     return -1;
380   }
381
382   metric_list_size = id->metric_list_len + (size_t)ci->values_num;
383   metric_list =
384       realloc(id->metric_list, sizeof(*id->metric_list) * metric_list_size);
385   if (metric_list == NULL)
386     return -1;
387   id->metric_list = metric_list;
388
389   for (int i = 0; i < ci->values_num; i++) {
390     char *metric_name;
391
392     if (ci->values[i].type != OCONFIG_TYPE_STRING) {
393       WARNING("tail_csv plugin: All arguments to `Collect' must be strings.");
394       continue;
395     }
396     metric_name = ci->values[i].value.string;
397
398     for (metric = metric_head; metric != NULL; metric = metric->next)
399       if (strcasecmp(metric_name, metric->name) == 0)
400         break;
401
402     if (metric == NULL) {
403       WARNING("tail_csv plugin: `Collect' argument not found `%s'.",
404               metric_name);
405       continue;
406     }
407
408     id->metric_list[id->metric_list_len] = metric;
409     id->metric_list_len++;
410   }
411
412   return 0;
413 }
414
415 /* <File /> block */
416 static int tcsv_config_add_file(oconfig_item_t *ci) {
417   instance_definition_t *id;
418   int status = 0;
419
420   /* Registration variables */
421   char cb_name[DATA_MAX_NAME_LEN];
422
423   id = calloc(1, sizeof(*id));
424   if (id == NULL)
425     return -1;
426   id->plugin_name = NULL;
427   id->instance = NULL;
428   id->path = NULL;
429   id->metric_list = NULL;
430   id->time_from = -1;
431   id->next = NULL;
432
433   status = cf_util_get_string(ci, &id->path);
434   if (status != 0) {
435     sfree(id);
436     return status;
437   }
438
439   /* Use default interval. */
440   id->interval = plugin_get_interval();
441
442   for (int i = 0; i < ci->children_num; ++i) {
443     oconfig_item_t *option = ci->children + i;
444     status = 0;
445
446     if (strcasecmp("Instance", option->key) == 0)
447       status = cf_util_get_string(option, &id->instance);
448     else if (strcasecmp("Collect", option->key) == 0)
449       status = tcsv_config_add_instance_collect(id, option);
450     else if (strcasecmp("Interval", option->key) == 0)
451       cf_util_get_cdtime(option, &id->interval);
452     else if (strcasecmp("TimeFrom", option->key) == 0)
453       status = tcsv_config_get_index(option, &id->time_from);
454     else if (strcasecmp("Plugin", option->key) == 0)
455       status = cf_util_get_string(option, &id->plugin_name);
456     else {
457       WARNING("tail_csv plugin: Option `%s' not allowed here.", option->key);
458       status = -1;
459     }
460
461     if (status != 0)
462       break;
463   }
464
465   if (status != 0) {
466     tcsv_instance_definition_destroy(id);
467     return -1;
468   }
469
470   /* Verify all necessary options have been set. */
471   if (id->path == NULL) {
472     WARNING("tail_csv plugin: Option `Path' must be set.");
473     status = -1;
474   } else if (id->metric_list == NULL) {
475     WARNING("tail_csv plugin: Option `Collect' must be set.");
476     status = -1;
477   }
478
479   if (status != 0) {
480     tcsv_instance_definition_destroy(id);
481     return -1;
482   }
483
484   snprintf(cb_name, sizeof(cb_name), "tail_csv/%s", id->path);
485
486   status = plugin_register_complex_read(
487       NULL, cb_name, tcsv_read, id->interval,
488       &(user_data_t){
489           .data = id, .free_func = tcsv_instance_definition_destroy,
490       });
491   if (status != 0) {
492     ERROR("tail_csv plugin: Registering complex read function failed.");
493     return -1;
494   }
495
496   return 0;
497 }
498
499 /* Parse blocks */
500 static int tcsv_config(oconfig_item_t *ci) {
501   for (int i = 0; i < ci->children_num; ++i) {
502     oconfig_item_t *child = ci->children + i;
503     if (strcasecmp("Metric", child->key) == 0)
504       tcsv_config_add_metric(child);
505     else if (strcasecmp("File", child->key) == 0)
506       tcsv_config_add_file(child);
507     else
508       WARNING("tail_csv plugin: Ignore unknown config option `%s'.",
509               child->key);
510   }
511
512   return 0;
513 } /* int tcsv_config */
514
515 static int tcsv_init(void) { /* {{{ */
516   static bool have_init;
517   metric_definition_t *md;
518
519   if (have_init)
520     return 0;
521
522   for (md = metric_head; md != NULL; md = md->next) {
523     data_set_t const *ds;
524
525     /* Retrieve the data source type from the types db. */
526     ds = plugin_get_ds(md->type);
527     if (ds == NULL) {
528       ERROR("tail_csv plugin: Failed to look up type \"%s\" for "
529             "metric \"%s\". It may not be defined in the types.db "
530             "file. Please read the types.db(5) manual page for more "
531             "details.",
532             md->type, md->name);
533       continue;
534     } else if (ds->ds_num != 1) {
535       ERROR("tail_csv plugin: The type \"%s\" has %" PRIsz " data sources. "
536             "Only types with a single data source are supported.",
537             ds->type, ds->ds_num);
538       continue;
539     }
540
541     md->data_source_type = ds->ds->type;
542   }
543
544   return 0;
545 } /* }}} int tcsv_init */
546
547 static int tcsv_shutdown(void) {
548   tcsv_metric_definition_destroy(metric_head);
549   metric_head = NULL;
550
551   return 0;
552 }
553
554 void module_register(void) {
555   plugin_register_complex_config("tail_csv", tcsv_config);
556   plugin_register_init("tail_csv", tcsv_init);
557   plugin_register_shutdown("tail_csv", tcsv_shutdown);
558 }