Merge branch 'collectd-5.6' into collectd-5.7
[collectd.git] / src / tail_csv.c
1 /**
2  * collectd - src/tail_csv.c
3  * Copyright (C) 2013 Kris Nielander
4  * Copyright (C) 2013 Florian Forster
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Kris Nielander <nielander at fox-it.com>
21  *   Florian Forster <octo at collectd.org>
22  **/
23
24 #include "collectd.h"
25
26 #include "common.h" /* auxiliary functions */
27 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
28 #include "utils_tail.h"
29
30 #include <fcntl.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <sys/mman.h>
34 #include <sys/stat.h>
35
36 struct metric_definition_s {
37   char *name;
38   char *type;
39   char *instance;
40   int data_source_type;
41   ssize_t value_from;
42   struct metric_definition_s *next;
43 };
44 typedef struct metric_definition_s metric_definition_t;
45
46 struct instance_definition_s {
47   char *instance;
48   char *path;
49   cu_tail_t *tail;
50   metric_definition_t **metric_list;
51   size_t metric_list_len;
52   cdtime_t interval;
53   ssize_t time_from;
54   struct instance_definition_s *next;
55 };
56 typedef struct instance_definition_s instance_definition_t;
57
58 /* Private */
59 static metric_definition_t *metric_head = NULL;
60
61 static int tcsv_submit(instance_definition_t *id, metric_definition_t *md,
62                        value_t v, cdtime_t t) {
63   /* Registration variables */
64   value_list_t vl = VALUE_LIST_INIT;
65
66   /* Register */
67   vl.values_len = 1;
68   vl.values = &v;
69
70   sstrncpy(vl.plugin, "tail_csv", sizeof(vl.plugin));
71   if (id->instance != NULL)
72     sstrncpy(vl.plugin_instance, id->instance, sizeof(vl.plugin_instance));
73   sstrncpy(vl.type, md->type, sizeof(vl.type));
74   if (md->instance != NULL)
75     sstrncpy(vl.type_instance, md->instance, sizeof(vl.type_instance));
76
77   vl.time = t;
78   vl.interval = id->interval;
79
80   return (plugin_dispatch_values(&vl));
81 }
82
83 static cdtime_t parse_time(char const *tbuf) {
84   double t;
85   char *endptr = NULL;
86
87   errno = 0;
88   t = strtod(tbuf, &endptr);
89   if ((errno != 0) || (endptr == NULL) || (endptr[0] != 0))
90     return (cdtime());
91
92   return (DOUBLE_TO_CDTIME_T(t));
93 }
94
95 static int tcsv_read_metric(instance_definition_t *id, metric_definition_t *md,
96                             char **fields, size_t fields_num) {
97   value_t v;
98   cdtime_t t = 0;
99   int status;
100
101   if (md->data_source_type == -1)
102     return (EINVAL);
103
104   assert(md->value_from >= 0);
105   if (((size_t)md->value_from) >= fields_num)
106     return (EINVAL);
107
108   status = parse_value(fields[md->value_from], &v, md->data_source_type);
109   if (status != 0)
110     return (status);
111
112   if (id->time_from >= 0) {
113     if (((size_t)id->time_from) >= fields_num)
114       return (EINVAL);
115     t = parse_time(fields[id->time_from]);
116   }
117
118   return (tcsv_submit(id, md, v, t));
119 }
120
121 static _Bool tcsv_check_index(ssize_t index, size_t fields_num,
122                               char const *name) {
123   if (index < 0)
124     return 1;
125   else if (((size_t)index) < fields_num)
126     return 1;
127
128   ERROR("tail_csv plugin: Metric \"%s\": Request for index %zd when "
129         "only %zu fields are available.",
130         name, index, fields_num);
131   return (0);
132 }
133
134 static int tcsv_read_buffer(instance_definition_t *id, char *buffer,
135                             size_t buffer_size) {
136   char **metrics;
137   size_t metrics_num;
138
139   char *ptr;
140   size_t i;
141
142   /* Remove newlines at the end of line. */
143   while (buffer_size > 0) {
144     if ((buffer[buffer_size - 1] == '\n') ||
145         (buffer[buffer_size - 1] == '\r')) {
146       buffer[buffer_size - 1] = 0;
147       buffer_size--;
148     } else {
149       break;
150     }
151   }
152
153   /* Ignore empty lines. */
154   if ((buffer_size == 0) || (buffer[0] == '#'))
155     return (0);
156
157   /* Count the number of fields. */
158   metrics_num = 1;
159   for (i = 0; i < buffer_size; i++) {
160     if (buffer[i] == ',')
161       metrics_num++;
162   }
163
164   if (metrics_num == 1) {
165     ERROR("tail_csv plugin: last line of `%s' does not contain "
166           "enough values.",
167           id->path);
168     return (-1);
169   }
170
171   /* Create a list of all values */
172   metrics = calloc(metrics_num, sizeof(*metrics));
173   if (metrics == NULL) {
174     ERROR("tail_csv plugin: calloc failed.");
175     return (ENOMEM);
176   }
177
178   ptr = buffer;
179   metrics[0] = ptr;
180   i = 1;
181   for (ptr = buffer; *ptr != 0; ptr++) {
182     if (*ptr != ',')
183       continue;
184
185     *ptr = 0;
186     metrics[i] = ptr + 1;
187     i++;
188   }
189   assert(i == metrics_num);
190
191   /* Register values */
192   for (i = 0; i < id->metric_list_len; ++i) {
193     metric_definition_t *md = id->metric_list[i];
194
195     if (!tcsv_check_index(md->value_from, metrics_num, md->name) ||
196         !tcsv_check_index(id->time_from, metrics_num, md->name))
197       continue;
198
199     tcsv_read_metric(id, md, metrics, metrics_num);
200   }
201
202   /* Free up resources */
203   sfree(metrics);
204   return (0);
205 }
206
207 static int tcsv_read(user_data_t *ud) {
208   instance_definition_t *id;
209   id = ud->data;
210
211   if (id->tail == NULL) {
212     id->tail = cu_tail_create(id->path);
213     if (id->tail == NULL) {
214       ERROR("tail_csv plugin: cu_tail_create (\"%s\") failed.", id->path);
215       return (-1);
216     }
217   }
218
219   while (42) {
220     char buffer[1024];
221     size_t buffer_len;
222     int status;
223
224     status = cu_tail_readline(id->tail, buffer, (int)sizeof(buffer));
225     if (status != 0) {
226       ERROR("tail_csv plugin: File \"%s\": cu_tail_readline failed "
227             "with status %i.",
228             id->path, status);
229       return (-1);
230     }
231
232     buffer_len = strlen(buffer);
233     if (buffer_len == 0)
234       break;
235
236     tcsv_read_buffer(id, buffer, buffer_len);
237   }
238
239   return (0);
240 }
241
242 static void tcsv_metric_definition_destroy(void *arg) {
243   metric_definition_t *md;
244   metric_definition_t *next;
245
246   md = arg;
247   if (md == NULL)
248     return;
249
250   next = md->next;
251   md->next = NULL;
252
253   sfree(md->name);
254   sfree(md->type);
255   sfree(md->instance);
256   sfree(md);
257
258   tcsv_metric_definition_destroy(next);
259 }
260
261 static int tcsv_config_get_index(oconfig_item_t *ci, ssize_t *ret_index) {
262   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) {
263     WARNING("tail_csv plugin: The \"%s\" config option needs exactly one "
264             "integer argument.",
265             ci->key);
266     return (-1);
267   }
268
269   if (ci->values[0].value.number < 0) {
270     WARNING("tail_csv plugin: The \"%s\" config option must be positive "
271             "(or zero).",
272             ci->key);
273     return (-1);
274   }
275
276   *ret_index = (ssize_t)ci->values[0].value.number;
277   return (0);
278 }
279
280 /* Parse metric  */
281 static int tcsv_config_add_metric(oconfig_item_t *ci) {
282   metric_definition_t *md;
283   int status;
284
285   md = calloc(1, sizeof(*md));
286   if (md == NULL)
287     return (-1);
288   md->name = NULL;
289   md->type = NULL;
290   md->instance = NULL;
291   md->data_source_type = -1;
292   md->value_from = -1;
293   md->next = NULL;
294
295   status = cf_util_get_string(ci, &md->name);
296   if (status != 0) {
297     sfree(md);
298     return (-1);
299   }
300
301   for (int i = 0; i < ci->children_num; ++i) {
302     oconfig_item_t *option = ci->children + i;
303
304     if (strcasecmp("Type", option->key) == 0)
305       status = cf_util_get_string(option, &md->type);
306     else if (strcasecmp("Instance", option->key) == 0)
307       status = cf_util_get_string(option, &md->instance);
308     else if (strcasecmp("ValueFrom", option->key) == 0)
309       status = tcsv_config_get_index(option, &md->value_from);
310     else {
311       WARNING("tail_csv plugin: Option `%s' not allowed here.", option->key);
312       status = -1;
313     }
314
315     if (status != 0)
316       break;
317   }
318
319   if (status != 0) {
320     tcsv_metric_definition_destroy(md);
321     return (-1);
322   }
323
324   /* Verify all necessary options have been set. */
325   if (md->type == NULL) {
326     WARNING("tail_csv plugin: Option `Type' must be set.");
327     status = -1;
328   } else if (md->value_from < 0) {
329     WARNING("tail_csv plugin: Option `ValueFrom' must be set.");
330     status = -1;
331   }
332   if (status != 0) {
333     tcsv_metric_definition_destroy(md);
334     return (status);
335   }
336
337   if (metric_head == NULL)
338     metric_head = md;
339   else {
340     metric_definition_t *last;
341     last = metric_head;
342     while (last->next != NULL)
343       last = last->next;
344     last->next = md;
345   }
346
347   return (0);
348 }
349
350 static void tcsv_instance_definition_destroy(void *arg) {
351   instance_definition_t *id;
352
353   id = arg;
354   if (id == NULL)
355     return;
356
357   if (id->tail != NULL)
358     cu_tail_destroy(id->tail);
359   id->tail = NULL;
360
361   sfree(id->instance);
362   sfree(id->path);
363   sfree(id->metric_list);
364   sfree(id);
365 }
366
367 static int tcsv_config_add_instance_collect(instance_definition_t *id,
368                                             oconfig_item_t *ci) {
369   metric_definition_t *metric;
370   metric_definition_t **metric_list;
371   size_t metric_list_size;
372
373   if (ci->values_num < 1) {
374     WARNING("tail_csv plugin: The `Collect' config option needs at least one "
375             "argument.");
376     return (-1);
377   }
378
379   metric_list_size = id->metric_list_len + (size_t)ci->values_num;
380   metric_list =
381       realloc(id->metric_list, sizeof(*id->metric_list) * metric_list_size);
382   if (metric_list == NULL)
383     return (-1);
384   id->metric_list = metric_list;
385
386   for (int i = 0; i < ci->values_num; i++) {
387     char *metric_name;
388
389     if (ci->values[i].type != OCONFIG_TYPE_STRING) {
390       WARNING("tail_csv plugin: All arguments to `Collect' must be strings.");
391       continue;
392     }
393     metric_name = ci->values[i].value.string;
394
395     for (metric = metric_head; metric != NULL; metric = metric->next)
396       if (strcasecmp(metric_name, metric->name) == 0)
397         break;
398
399     if (metric == NULL) {
400       WARNING("tail_csv plugin: `Collect' argument not found `%s'.",
401               metric_name);
402       continue;
403     }
404
405     id->metric_list[id->metric_list_len] = metric;
406     id->metric_list_len++;
407   }
408
409   return (0);
410 }
411
412 /* <File /> block */
413 static int tcsv_config_add_file(oconfig_item_t *ci) {
414   instance_definition_t *id;
415   int status = 0;
416
417   /* Registration variables */
418   char cb_name[DATA_MAX_NAME_LEN];
419
420   id = calloc(1, sizeof(*id));
421   if (id == NULL)
422     return (-1);
423   id->instance = NULL;
424   id->path = NULL;
425   id->metric_list = NULL;
426   id->time_from = -1;
427   id->next = NULL;
428
429   status = cf_util_get_string(ci, &id->path);
430   if (status != 0) {
431     sfree(id);
432     return (status);
433   }
434
435   /* Use default interval. */
436   id->interval = plugin_get_interval();
437
438   for (int i = 0; i < ci->children_num; ++i) {
439     oconfig_item_t *option = ci->children + i;
440     status = 0;
441
442     if (strcasecmp("Instance", option->key) == 0)
443       status = cf_util_get_string(option, &id->instance);
444     else if (strcasecmp("Collect", option->key) == 0)
445       status = tcsv_config_add_instance_collect(id, option);
446     else if (strcasecmp("Interval", option->key) == 0)
447       cf_util_get_cdtime(option, &id->interval);
448     else if (strcasecmp("TimeFrom", option->key) == 0)
449       status = tcsv_config_get_index(option, &id->time_from);
450     else {
451       WARNING("tail_csv plugin: Option `%s' not allowed here.", option->key);
452       status = -1;
453     }
454
455     if (status != 0)
456       break;
457   }
458
459   if (status != 0) {
460     tcsv_instance_definition_destroy(id);
461     return (-1);
462   }
463
464   /* Verify all necessary options have been set. */
465   if (id->path == NULL) {
466     WARNING("tail_csv plugin: Option `Path' must be set.");
467     status = -1;
468   } else if (id->metric_list == NULL) {
469     WARNING("tail_csv plugin: Option `Collect' must be set.");
470     status = -1;
471   }
472
473   if (status != 0) {
474     tcsv_instance_definition_destroy(id);
475     return (-1);
476   }
477
478   ssnprintf(cb_name, sizeof(cb_name), "tail_csv/%s", id->path);
479
480   status = plugin_register_complex_read(
481       NULL, cb_name, tcsv_read, id->interval,
482       &(user_data_t){
483           .data = id, .free_func = tcsv_instance_definition_destroy,
484       });
485   if (status != 0) {
486     ERROR("tail_csv plugin: Registering complex read function failed.");
487     tcsv_instance_definition_destroy(id);
488     return (-1);
489   }
490
491   return (0);
492 }
493
494 /* Parse blocks */
495 static int tcsv_config(oconfig_item_t *ci) {
496   for (int i = 0; i < ci->children_num; ++i) {
497     oconfig_item_t *child = ci->children + i;
498     if (strcasecmp("Metric", child->key) == 0)
499       tcsv_config_add_metric(child);
500     else if (strcasecmp("File", child->key) == 0)
501       tcsv_config_add_file(child);
502     else
503       WARNING("tail_csv plugin: Ignore unknown config option `%s'.",
504               child->key);
505   }
506
507   return (0);
508 } /* int tcsv_config */
509
510 static int tcsv_init(void) { /* {{{ */
511   static _Bool have_init = 0;
512   metric_definition_t *md;
513
514   if (have_init)
515     return (0);
516
517   for (md = metric_head; md != NULL; md = md->next) {
518     data_set_t const *ds;
519
520     /* Retrieve the data source type from the types db. */
521     ds = plugin_get_ds(md->type);
522     if (ds == NULL) {
523       ERROR("tail_csv plugin: Failed to look up type \"%s\" for "
524             "metric \"%s\". It may not be defined in the types.db "
525             "file. Please read the types.db(5) manual page for more "
526             "details.",
527             md->type, md->name);
528       continue;
529     } else if (ds->ds_num != 1) {
530       ERROR("tail_csv plugin: The type \"%s\" has %zu data sources. "
531             "Only types with a single data source are supported.",
532             ds->type, ds->ds_num);
533       continue;
534     }
535
536     md->data_source_type = ds->ds->type;
537   }
538
539   return (0);
540 } /* }}} int tcsv_init */
541
542 static int tcsv_shutdown(void) {
543   tcsv_metric_definition_destroy(metric_head);
544   metric_head = NULL;
545
546   return (0);
547 }
548
549 void module_register(void) {
550   plugin_register_complex_config("tail_csv", tcsv_config);
551   plugin_register_init("tail_csv", tcsv_init);
552   plugin_register_shutdown("tail_csv", tcsv_shutdown);
553 }
554
555 /* vim: set sw=4 sts=4 et : */