tail_csv plugin: Rename "Instance" blocks to "File".
[collectd.git] / src / tail_csv.c
1 /**
2  * collectd - src/tail_csv.c
3  * Copyright (C) 2013 Kris Nielander
4  * Copyright (C) 2013 Florian Forster
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Kris Nielander <nielander at fox-it.com>
21  *   Florian Forster <octo at collectd.org>
22  **/
23
24 #include "collectd.h"
25 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
26 #include "common.h" /* auxiliary functions */
27 #include "utils_tail.h"
28
29 #include <sys/mman.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <stdlib.h>
33 #include <string.h>
34
35 struct metric_definition_s {
36     char *name;
37     char *type;
38     char *instance;
39     int data_source_type;
40     int index;
41     struct metric_definition_s *next;
42 };
43 typedef struct metric_definition_s metric_definition_t;
44
45 struct instance_definition_s {
46     char *instance;
47     char *path;
48     cu_tail_t *tail;
49     metric_definition_t **metric_list;
50     size_t metric_list_len;
51     cdtime_t interval;
52     struct instance_definition_s *next;
53 };
54 typedef struct instance_definition_s instance_definition_t;
55
56 /* Private */
57 static metric_definition_t *metric_head = NULL;
58
59 static int tcsv_submit (instance_definition_t *id,
60         metric_definition_t *md,
61         value_t v, cdtime_t t)
62 {
63     /* Registration variables */
64     value_list_t vl = VALUE_LIST_INIT;
65
66     /* Register */
67     vl.values_len = 1;
68     vl.values = &v;
69
70     sstrncpy(vl.host, hostname_g, sizeof (vl.host));
71     sstrncpy(vl.plugin, "tail_csv", sizeof(vl.plugin));
72     sstrncpy(vl.plugin_instance, id->instance, sizeof(vl.plugin_instance));
73     sstrncpy(vl.type, md->type, sizeof(vl.type));
74     if (md->instance != NULL)
75         sstrncpy(vl.type_instance, md->instance, sizeof(vl.type_instance));
76
77     vl.time = t;
78     vl.interval = id->interval;
79
80     DEBUG("tail_csv plugin: -> plugin_dispatch_values (&vl);");
81     plugin_dispatch_values(&vl);
82
83     return (0);
84 }
85
86 static cdtime_t parse_time (char const *tbuf)
87 {
88     double t;
89     char *endptr = 0;
90
91     errno = 0;
92     t = strtod (tbuf, &endptr);
93     if ((errno != 0) || (endptr == NULL) || (endptr[0] != 0))
94         return (cdtime ());
95
96     return (DOUBLE_TO_CDTIME_T (t));
97 }
98
99 static int tcsv_read_metric (instance_definition_t *id,
100         metric_definition_t *md,
101         char **fields, size_t fields_num)
102 {
103     value_t v;
104     cdtime_t t;
105     int status;
106
107     if (md->index >= fields_num)
108         return (EINVAL);
109
110     t = parse_time (fields[0]);
111
112     status = parse_value (fields[md->index], &v, md->data_source_type);
113     if (status != 0)
114         return (status);
115
116     return (tcsv_submit (id, md, v, t));
117 }
118
119 static int tcsv_read_buffer (instance_definition_t *id,
120         char *buffer, size_t buffer_size)
121 {
122     char **metrics;
123     size_t metrics_num;
124
125     char *ptr;
126     size_t i;
127
128     /* Remove newlines at the end of line. */
129     while (buffer_size > 0) {
130         if ((buffer[buffer_size - 1] == '\n')
131                 || (buffer[buffer_size - 1] == '\r')) {
132             buffer[buffer_size - 1] = 0;
133             buffer_size--;
134         } else {
135             break;
136         }
137     }
138
139     /* Ignore empty lines. */
140     if ((buffer_size == 0) || (buffer[0] == '#'))
141         return (0);
142
143     /* Count the number of fields. */
144     metrics_num = 1;
145     for (i = 0; i < buffer_size; i++) {
146         if (buffer[i] == ',')
147             metrics_num++;
148     }
149
150     if (metrics_num == 1) {
151         ERROR("tail_csv plugin: last line of `%s' does not contain "
152                 "enough values.", id->path);
153         return (-1);
154     }
155
156     /* Create a list of all values */
157     metrics = calloc (metrics_num, sizeof (*metrics));
158     if (metrics == NULL) {
159         ERROR ("tail_csv plugin: calloc failed.");
160         return (ENOMEM);
161     }
162
163     ptr = buffer;
164     metrics[0] = ptr;
165     i = 1;
166     for (ptr = buffer; *ptr != 0; ptr++) {
167         if (*ptr != ',')
168             continue;
169
170         *ptr = 0;
171         metrics[i] = ptr + 1;
172         i++;
173     }
174     assert (i == metrics_num);
175
176     /* Register values */
177     for (i = 0; i < id->metric_list_len; ++i){
178         metric_definition_t *md = id->metric_list[i];
179
180         if (((size_t) md->index) >= metrics_num) {
181             ERROR ("tail_csv plugin: Metric \"%s\": Request for index %i when "
182                     "only %zu fields are available.",
183                     md->name, md->index, metrics_num);
184             continue;
185         }
186
187         tcsv_read_metric (id, md, metrics, metrics_num);
188     }
189
190     /* Free up resources */
191     sfree (metrics);
192     return (0);
193 }
194
195 static int tcsv_read (user_data_t *ud) {
196     instance_definition_t *id;
197
198     id = ud->data;
199     DEBUG("tail_csv plugin: tcsv_read (instance = %s)", id->instance);
200
201     if (id->tail == NULL)
202     {
203         id->tail = cu_tail_create (id->path);
204         if (id->tail == NULL)
205         {
206             ERROR ("tail_csv plugin: cu_tail_create (\"%s\") failed.",
207                     id->path);
208             return (-1);
209         }
210     }
211
212     while (42)
213     {
214         char buffer[1024];
215         size_t buffer_len;
216         int status;
217
218         status = cu_tail_readline (id->tail, buffer, (int) sizeof (buffer));
219         if (status != 0)
220         {
221             ERROR ("tail_csv plugin: Instance \"%s\": cu_tail_readline failed "
222                     "with status %i.", id->instance, status);
223             return (-1);
224         }
225
226         buffer_len = strlen (buffer);
227         if (buffer_len == 0)
228             break;
229
230         tcsv_read_buffer (id, buffer, buffer_len);
231     }
232
233     return (0);
234 }
235
236 static void tcsv_metric_definition_destroy(void *arg){
237     metric_definition_t *md;
238
239     md = arg;
240     if (md == NULL)
241         return;
242
243     if (md->name != NULL)
244         DEBUG("tail_csv plugin: Destroying metric definition `%s'.", md->name);
245
246     sfree(md->name);
247     sfree(md->type);
248     sfree(md->instance);
249     sfree(md);
250 }
251
252 static int tcsv_config_add_metric_index(metric_definition_t *md, oconfig_item_t *ci){
253     if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)){
254         WARNING("tail_csv plugin: `Index' needs exactly one integer argument.");
255         return (-1);
256     }
257
258     md->index = (int)ci->values[0].value.number;
259     if (md->index <= 0){
260         WARNING("tail_csv plugin: `Index' must be higher than 0.");
261         return (-1);
262     }
263
264     return (0);
265 }
266
267 /* Parse metric  */
268 static int tcsv_config_add_metric(oconfig_item_t *ci){
269     metric_definition_t *md;
270     const data_set_t *ds;
271     int status = 0;
272     int i;
273
274     md = (metric_definition_t *)malloc(sizeof(*md));
275     if (md == NULL)
276         return (-1);
277     memset(md, 0, sizeof(*md));
278     md->name = NULL;
279     md->type = NULL;
280     md->instance = NULL;
281     md->next = NULL;
282
283     status = cf_util_get_string (ci, &md->name);
284     if (status != 0) {
285         sfree (md);
286         return (-1);
287     }
288
289     for (i = 0; i < ci->children_num; ++i){
290         oconfig_item_t *option = ci->children + i;
291         status = 0;
292
293         if (strcasecmp("Type", option->key) == 0)
294             status = cf_util_get_string(option, &md->type);
295         else if (strcasecmp("Instance", option->key) == 0)
296             status = cf_util_get_string(option, &md->instance);
297         else if (strcasecmp("Index", option->key) == 0)
298             status = tcsv_config_add_metric_index(md, option);
299         else {
300             WARNING("tail_csv plugin: Option `%s' not allowed here.", option->key);
301             status = -1;
302         }
303
304         if (status != 0)
305             break;
306     }
307
308     if (status != 0){
309         tcsv_metric_definition_destroy(md);
310         return (-1);
311     }
312
313     /* Verify all necessary options have been set. */
314     if (md->type == NULL){
315         WARNING("tail_csv plugin: Option `Type' must be set.");
316         status = -1;
317     } else if (md->index == 0){
318         WARNING("tail_csv plugin: Option `Index' must be set.");
319         status = -1;
320     }
321
322     if (status != 0){
323         tcsv_metric_definition_destroy(md);
324         return (-1);
325     }
326
327     /* Retrieve the data source type from the types db. */
328     ds = plugin_get_ds(md->type);
329     if (ds == NULL){
330         ERROR ("tail_csv plugin: Failed to look up type \"%s\". "
331                 "It may not be defined in the types.db file. "
332                 "Please read the types.db(5) manual page for more details.",
333                 md->type);
334         tcsv_metric_definition_destroy(md);
335         return (-1);
336     } else if (ds->ds_num != 1) {
337         ERROR ("tail_csv plugin: The type \"%s\" has %i data sources. "
338                 "Only types with a single data soure are supported.",
339                 ds->type, ds->ds_num);
340         return (-1);
341     } else {
342         md->data_source_type = ds->ds->type;
343     }
344
345     DEBUG("tail_csv plugin: md = { name = %s, type = %s, data_source_type = %d, index = %d }",
346         md->name, md->type, md->data_source_type, md->index);
347
348     if (metric_head == NULL)
349         metric_head = md;
350     else {
351         metric_definition_t *last;
352         last = metric_head;
353         while (last->next != NULL)
354             last = last->next;
355         last->next = md;
356     }
357
358     return (0);
359 }
360
361 static void tcsv_instance_definition_destroy(void *arg){
362     instance_definition_t *id;
363
364     id = arg;
365     if (id == NULL)
366         return;
367
368     if (id->instance != NULL)
369         DEBUG("tail_csv plugin: Destroying instance definition `%s'.", id->instance);
370
371     cu_tail_destroy (id->tail);
372     id->tail = NULL;
373
374     sfree(id->instance);
375     sfree(id->path);
376     sfree(id->metric_list);
377     sfree(id);
378 }
379
380 static int tcsv_config_add_instance_collect(instance_definition_t *id, oconfig_item_t *ci){
381     metric_definition_t *metric;
382     int i;
383
384     if (ci->values_num < 1){
385         WARNING("tail_csv plugin: The `Collect' config option needs at least one argument.");
386         return (-1);
387     }
388
389     /* Verify string arguments */
390     for (i = 0; i < ci->values_num; ++i)
391         if (ci->values[i].type != OCONFIG_TYPE_STRING){
392             WARNING("tail_csv plugin: All arguments to `Collect' must be strings.");
393             return (-1);
394         }
395
396     id->metric_list = (metric_definition_t **)malloc(sizeof(metric_definition_t *) * ci->values_num);
397     if (id->metric_list == NULL)
398         return (-1);
399
400     for (i = 0; i < ci->values_num; ++i){
401         for (metric = metric_head; metric != NULL; metric = metric->next)
402             if (strcasecmp(ci->values[i].value.string, metric->name) == 0)
403                 break;
404
405         if (metric == NULL){
406             WARNING("tail_csv plugin: `Collect' argument not found `%s'.", ci->values[i].value.string);
407             return (-1);
408         }
409
410         DEBUG("tail_csv plugin: id { instance=%s md->name=%s }", id->instance, metric->name);
411
412         id->metric_list[i] = metric;
413         id->metric_list_len++;
414     }
415
416     return (0);
417 }
418
419 /* Parse instance  */
420 static int tcsv_config_add_file(oconfig_item_t *ci)
421 {
422     instance_definition_t* id;
423     int status = 0;
424     int i;
425
426     /* Registration variables */
427     char cb_name[DATA_MAX_NAME_LEN];
428     user_data_t cb_data;
429     struct timespec cb_interval;
430
431     id = malloc(sizeof(*id));
432     if (id == NULL)
433         return (-1);
434     memset(id, 0, sizeof(*id));
435     id->instance = NULL;
436     id->path = NULL;
437     id->metric_list = NULL;
438     id->next = NULL;
439
440     status = cf_util_get_string (ci, &id->path);
441     if (status != 0) {
442         sfree (id);
443         return (status);
444     }
445
446     /* Use default interval. */
447     id->interval = plugin_get_interval();
448
449     for (i = 0; i < ci->children_num; ++i){
450         oconfig_item_t *option = ci->children + i;
451         status = 0;
452
453         if (strcasecmp("Instance", option->key) == 0)
454             status = cf_util_get_string(option, &id->instance);
455         else if (strcasecmp("Collect", option->key) == 0)
456             status = tcsv_config_add_instance_collect(id, option);
457         else if (strcasecmp("Interval", option->key) == 0)
458             cf_util_get_cdtime(option, &id->interval);
459         else {
460             WARNING("tail_csv plugin: Option `%s' not allowed here.", option->key);
461             status = -1;
462         }
463
464         if (status != 0)
465             break;
466     }
467
468     if (status != 0){
469         tcsv_instance_definition_destroy(id);
470         return (-1);
471     }
472
473     /* Verify all necessary options have been set. */
474     if (id->path == NULL){
475         WARNING("tail_csv plugin: Option `Path' must be set.");
476         status = -1;
477     } else if (id->metric_list == NULL){
478         WARNING("tail_csv plugin: Option `Collect' must be set.");
479         status = -1;
480    }
481
482     if (status != 0){
483         tcsv_instance_definition_destroy(id);
484         return (-1);
485     }
486
487     DEBUG("tail_csv plugin: id = { instance = %s, path = %s }", id->instance, id->path);
488
489     ssnprintf (cb_name, sizeof (cb_name), "tail_csv/%s", id->path);
490     memset(&cb_data, 0, sizeof(cb_data));
491     cb_data.data = id;
492     cb_data.free_func = tcsv_instance_definition_destroy;
493     CDTIME_T_TO_TIMESPEC(id->interval, &cb_interval);
494     status = plugin_register_complex_read(NULL, cb_name, tcsv_read, &cb_interval, &cb_data);
495
496     if (status != 0){
497         ERROR("tail_csv plugin: Registering complex read function failed.");
498         tcsv_instance_definition_destroy(id);
499         return (-1);
500     }
501
502     return (0);
503 }
504
505 /* Parse blocks */
506 static int tcsv_config(oconfig_item_t *ci){
507     int i;
508     for (i = 0; i < ci->children_num; ++i){
509         oconfig_item_t *child = ci->children + i;
510         if (strcasecmp("Metric", child->key) == 0)
511             tcsv_config_add_metric(child);
512         else if (strcasecmp("File", child->key) == 0)
513             tcsv_config_add_file(child);
514         else
515             WARNING("tail_csv plugin: Ignore unknown config option `%s'.", child->key);
516     }
517
518     return (0);
519 } /* int tcsv_config */
520
521 static int tcsv_shutdown(void){
522     metric_definition_t *metric_this;
523     metric_definition_t *metric_next;
524
525     metric_this = metric_head;
526     metric_head = NULL;
527
528     while (metric_this != NULL){
529         metric_next = metric_this->next;
530         tcsv_metric_definition_destroy(metric_this);
531         metric_this = metric_next;
532     }
533
534     return (0);
535 }
536
537 void module_register(void){
538     plugin_register_complex_config("tail_csv", tcsv_config);
539     plugin_register_shutdown("tail_csv", tcsv_shutdown);
540 }
541
542 /* vim: set sw=4 sts=4 et : */